mirror of
https://github.com/corda/corda.git
synced 2025-06-18 23:28:21 +00:00
CORDA-847 - AMQP RPC
* Client and server support for amqp * Observable (and supporting) serialisers Unit Tests * Fixing tests * Test fixes * CORDA-847 - Update api doc with additon of @CordaSerializable annotation * TestFixes * review comments * TestFixes * Test Fix * Test Fix * Test Fix * Test Fix * Test Fix * Test Fix * TestFix * Test Fix * Review Comments
This commit is contained in:
@ -76,10 +76,16 @@ public final class net.corda.core.concurrent.ConcurrencyUtils extends java.lang.
|
|||||||
@NotNull
|
@NotNull
|
||||||
public static final String shortCircuitedTaskFailedMessage = "Short-circuited task failed:"
|
public static final String shortCircuitedTaskFailedMessage = "Short-circuited task failed:"
|
||||||
##
|
##
|
||||||
|
<<<<<<< HEAD
|
||||||
public interface net.corda.core.concurrent.CordaFuture extends java.util.concurrent.Future
|
public interface net.corda.core.concurrent.CordaFuture extends java.util.concurrent.Future
|
||||||
public abstract void then(kotlin.jvm.functions.Function1<? super net.corda.core.concurrent.CordaFuture<V>, ? extends W>)
|
public abstract void then(kotlin.jvm.functions.Function1<? super net.corda.core.concurrent.CordaFuture<V>, ? extends W>)
|
||||||
@NotNull
|
@NotNull
|
||||||
public abstract java.util.concurrent.CompletableFuture<V> toCompletableFuture()
|
public abstract java.util.concurrent.CompletableFuture<V> toCompletableFuture()
|
||||||
|
=======
|
||||||
|
@net.corda.core.serialization.CordaSerializable public interface net.corda.core.concurrent.CordaFuture extends java.util.concurrent.Future
|
||||||
|
public abstract void then(kotlin.jvm.functions.Function1)
|
||||||
|
@org.jetbrains.annotations.NotNull public abstract concurrent.CompletableFuture toCompletableFuture()
|
||||||
|
>>>>>>> CORDA-847 - Update api doc with additon of @CordaSerializable annotation
|
||||||
##
|
##
|
||||||
@CordaSerializable
|
@CordaSerializable
|
||||||
public final class net.corda.core.context.Actor extends java.lang.Object
|
public final class net.corda.core.context.Actor extends java.lang.Object
|
||||||
@ -2615,8 +2621,12 @@ public final class net.corda.core.messaging.DataFeed extends java.lang.Object
|
|||||||
public int hashCode()
|
public int hashCode()
|
||||||
public String toString()
|
public String toString()
|
||||||
##
|
##
|
||||||
|
<<<<<<< HEAD
|
||||||
@DoNotImplement
|
@DoNotImplement
|
||||||
public interface net.corda.core.messaging.FlowHandle extends java.lang.AutoCloseable
|
public interface net.corda.core.messaging.FlowHandle extends java.lang.AutoCloseable
|
||||||
|
=======
|
||||||
|
@net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public interface net.corda.core.messaging.FlowHandle extends java.lang.AutoCloseable
|
||||||
|
>>>>>>> CORDA-847 - Update api doc with additon of @CordaSerializable annotation
|
||||||
public abstract void close()
|
public abstract void close()
|
||||||
@NotNull
|
@NotNull
|
||||||
public abstract net.corda.core.flows.StateMachineRunId getId()
|
public abstract net.corda.core.flows.StateMachineRunId getId()
|
||||||
@ -2642,8 +2652,12 @@ public final class net.corda.core.messaging.FlowHandleImpl extends java.lang.Obj
|
|||||||
public int hashCode()
|
public int hashCode()
|
||||||
public String toString()
|
public String toString()
|
||||||
##
|
##
|
||||||
|
<<<<<<< HEAD
|
||||||
@DoNotImplement
|
@DoNotImplement
|
||||||
public interface net.corda.core.messaging.FlowProgressHandle extends net.corda.core.messaging.FlowHandle
|
public interface net.corda.core.messaging.FlowProgressHandle extends net.corda.core.messaging.FlowHandle
|
||||||
|
=======
|
||||||
|
@net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public interface net.corda.core.messaging.FlowProgressHandle extends net.corda.core.messaging.FlowHandle
|
||||||
|
>>>>>>> CORDA-847 - Update api doc with additon of @CordaSerializable annotation
|
||||||
public abstract void close()
|
public abstract void close()
|
||||||
@NotNull
|
@NotNull
|
||||||
public abstract rx.Observable<String> getProgress()
|
public abstract rx.Observable<String> getProgress()
|
||||||
|
@ -32,9 +32,9 @@ class BlacklistKotlinClosureTest {
|
|||||||
driver(DriverParameters(startNodesInProcess = true)) {
|
driver(DriverParameters(startNodesInProcess = true)) {
|
||||||
val rpc = startNode(providedName = ALICE_NAME).getOrThrow().rpc
|
val rpc = startNode(providedName = ALICE_NAME).getOrThrow().rpc
|
||||||
val packet = Packet { EVIL }
|
val packet = Packet { EVIL }
|
||||||
assertThatExceptionOfType(KryoException::class.java)
|
assertThatExceptionOfType(RPCException::class.java)
|
||||||
.isThrownBy { rpc.startFlow(::FlowC, packet) }
|
.isThrownBy { rpc.startFlow(::FlowC, packet) }
|
||||||
.withMessageContaining("is not annotated or on the whitelist, so cannot be used in serialization")
|
.withMessageContaining("is not on the whitelist or annotated with @CordaSerializable")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1,6 +1,6 @@
|
|||||||
package net.corda.client.rpc
|
package net.corda.client.rpc
|
||||||
|
|
||||||
import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme
|
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||||
import net.corda.client.rpc.internal.RPCClient
|
import net.corda.client.rpc.internal.RPCClient
|
||||||
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
||||||
import net.corda.core.context.Actor
|
import net.corda.core.context.Actor
|
||||||
@ -11,7 +11,7 @@ import net.corda.core.utilities.NetworkHostAndPort
|
|||||||
import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport
|
import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport
|
||||||
import net.corda.nodeapi.ConnectionDirection
|
import net.corda.nodeapi.ConnectionDirection
|
||||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||||
import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT
|
import net.corda.nodeapi.internal.serialization.AMQP_RPC_CLIENT_CONTEXT
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -111,7 +111,9 @@ class CordaRPCClient private constructor(
|
|||||||
private val haAddressPool: List<NetworkHostAndPort> = emptyList()
|
private val haAddressPool: List<NetworkHostAndPort> = emptyList()
|
||||||
) {
|
) {
|
||||||
@JvmOverloads
|
@JvmOverloads
|
||||||
constructor(hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) : this(hostAndPort, configuration, null)
|
constructor(hostAndPort: NetworkHostAndPort,
|
||||||
|
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default())
|
||||||
|
: this(hostAndPort, configuration, null)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode.
|
* @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode.
|
||||||
@ -146,7 +148,7 @@ class CordaRPCClient private constructor(
|
|||||||
effectiveSerializationEnv
|
effectiveSerializationEnv
|
||||||
} catch (e: IllegalStateException) {
|
} catch (e: IllegalStateException) {
|
||||||
try {
|
try {
|
||||||
KryoClientSerializationScheme.initialiseSerialization(classLoader)
|
AMQPClientSerializationScheme.initialiseSerialization()
|
||||||
} catch (e: IllegalStateException) {
|
} catch (e: IllegalStateException) {
|
||||||
// Race e.g. two of these constructed in parallel, ignore.
|
// Race e.g. two of these constructed in parallel, ignore.
|
||||||
}
|
}
|
||||||
@ -158,12 +160,12 @@ class CordaRPCClient private constructor(
|
|||||||
RPCClient(
|
RPCClient(
|
||||||
tcpTransport(ConnectionDirection.Outbound(), hostAndPort, config = sslConfiguration),
|
tcpTransport(ConnectionDirection.Outbound(), hostAndPort, config = sslConfiguration),
|
||||||
configuration,
|
configuration,
|
||||||
if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT)
|
if (classLoader != null) AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else AMQP_RPC_CLIENT_CONTEXT)
|
||||||
} else {
|
} else {
|
||||||
RPCClient(haAddressPool,
|
RPCClient(haAddressPool,
|
||||||
sslConfiguration,
|
sslConfiguration,
|
||||||
configuration,
|
configuration,
|
||||||
if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT)
|
if (classLoader != null) AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else AMQP_RPC_CLIENT_CONTEXT)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,7 +10,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
|
|||||||
import net.corda.client.rpc.CordaRPCClientConfiguration
|
import net.corda.client.rpc.CordaRPCClientConfiguration
|
||||||
import net.corda.client.rpc.RPCException
|
import net.corda.client.rpc.RPCException
|
||||||
import net.corda.client.rpc.RPCSinceVersion
|
import net.corda.client.rpc.RPCSinceVersion
|
||||||
import net.corda.client.rpc.internal.serialization.kryo.RpcClientObservableSerializer
|
import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer
|
||||||
import net.corda.core.context.Actor
|
import net.corda.core.context.Actor
|
||||||
import net.corda.core.context.Trace
|
import net.corda.core.context.Trace
|
||||||
import net.corda.core.context.Trace.InvocationId
|
import net.corda.core.context.Trace.InvocationId
|
||||||
|
@ -0,0 +1,63 @@
|
|||||||
|
package net.corda.client.rpc.internal.serialization.amqp
|
||||||
|
|
||||||
|
import net.corda.core.cordapp.Cordapp
|
||||||
|
import net.corda.core.serialization.ClassWhitelist
|
||||||
|
import net.corda.core.serialization.SerializationContext
|
||||||
|
import net.corda.core.serialization.SerializationCustomSerializer
|
||||||
|
import net.corda.core.serialization.internal.SerializationEnvironment
|
||||||
|
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
|
||||||
|
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||||
|
import net.corda.nodeapi.internal.serialization.*
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.amqpMagic
|
||||||
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.custom.RxNotificationSerializer
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When set as the serialization scheme for a process, sets it to be the Corda AMQP implementation.
|
||||||
|
* This scheme is for use by the RPC Client calls.
|
||||||
|
*/
|
||||||
|
class AMQPClientSerializationScheme(
|
||||||
|
cordappCustomSerializers: Set<SerializationCustomSerializer<*,*>>,
|
||||||
|
serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>
|
||||||
|
) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) {
|
||||||
|
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, ConcurrentHashMap())
|
||||||
|
|
||||||
|
@Suppress("UNUSED")
|
||||||
|
constructor() : this(emptySet(), ConcurrentHashMap())
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
/** Call from main only. */
|
||||||
|
fun initialiseSerialization() {
|
||||||
|
nodeSerializationEnv = createSerializationEnv()
|
||||||
|
}
|
||||||
|
|
||||||
|
fun createSerializationEnv(): SerializationEnvironment {
|
||||||
|
return SerializationEnvironmentImpl(
|
||||||
|
SerializationFactoryImpl().apply {
|
||||||
|
registerScheme(AMQPClientSerializationScheme(emptyList()))
|
||||||
|
},
|
||||||
|
storageContext = AMQP_STORAGE_CONTEXT,
|
||||||
|
p2pContext = AMQP_P2P_CONTEXT,
|
||||||
|
rpcClientContext = AMQP_RPC_CLIENT_CONTEXT,
|
||||||
|
rpcServerContext = AMQP_RPC_SERVER_CONTEXT)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase) =
|
||||||
|
magic == amqpMagic && (
|
||||||
|
target == SerializationContext.UseCase.RPCClient || target == SerializationContext.UseCase.P2P)
|
||||||
|
|
||||||
|
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||||
|
return SerializerFactory(context.whitelist, ClassLoader.getSystemClassLoader()).apply {
|
||||||
|
register(RpcClientObservableSerializer)
|
||||||
|
register(RpcClientCordaFutureSerializer(this))
|
||||||
|
register(RxNotificationSerializer(this))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||||
|
throw UnsupportedOperationException()
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,35 @@
|
|||||||
|
package net.corda.client.rpc.internal.serialization.amqp
|
||||||
|
|
||||||
|
import net.corda.core.concurrent.CordaFuture
|
||||||
|
import net.corda.core.toFuture
|
||||||
|
import net.corda.core.toObservable
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
||||||
|
import rx.Observable
|
||||||
|
import java.io.NotSerializableException
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Serializer for [CordaFuture] instances that can only deserialize such objects (just as the server
|
||||||
|
* side can only serialize them). Futures will have been converted to an Rx [Observable] for serialization.
|
||||||
|
*/
|
||||||
|
class RpcClientCordaFutureSerializer (factory: SerializerFactory)
|
||||||
|
: CustomSerializer.Proxy<CordaFuture<*>, RpcClientCordaFutureSerializer.FutureProxy>(
|
||||||
|
CordaFuture::class.java,
|
||||||
|
RpcClientCordaFutureSerializer.FutureProxy::class.java, factory
|
||||||
|
) {
|
||||||
|
override fun fromProxy(proxy: FutureProxy): CordaFuture<*> {
|
||||||
|
try {
|
||||||
|
return proxy.observable.toFuture()
|
||||||
|
} catch (e: NotSerializableException) {
|
||||||
|
throw NotSerializableException("Failed to deserialize Future from proxy Observable - ${e.message}\n").apply {
|
||||||
|
initCause(e.cause)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun toProxy(obj: CordaFuture<*>): FutureProxy {
|
||||||
|
throw UnsupportedOperationException()
|
||||||
|
}
|
||||||
|
|
||||||
|
data class FutureProxy(val observable: Observable<*>)
|
||||||
|
}
|
@ -0,0 +1,127 @@
|
|||||||
|
package net.corda.client.rpc.internal.serialization.amqp
|
||||||
|
|
||||||
|
|
||||||
|
import net.corda.client.rpc.internal.ObservableContext
|
||||||
|
import net.corda.core.context.Trace
|
||||||
|
import net.corda.core.serialization.SerializationContext
|
||||||
|
import net.corda.nodeapi.RPCApi
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.*
|
||||||
|
import org.apache.qpid.proton.codec.Data
|
||||||
|
import rx.Notification
|
||||||
|
import rx.Observable
|
||||||
|
import rx.subjects.UnicastSubject
|
||||||
|
import java.io.NotSerializableException
|
||||||
|
import java.lang.reflect.Type
|
||||||
|
import java.time.Instant
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
import javax.transaction.NotSupportedException
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Serializer for Rx[Observable] instances for the RPC Client library. Can only be used to deserialize such objects,
|
||||||
|
* just as the corresponding RPC server side code can only serialize them. Observables are only notionally serialized,
|
||||||
|
* what is actually sent is a reference to the observable that can then be subscribed to.
|
||||||
|
*/
|
||||||
|
object RpcClientObservableSerializer : CustomSerializer.Implements<Observable<*>>(Observable::class.java) {
|
||||||
|
private object RpcObservableContextKey
|
||||||
|
|
||||||
|
fun createContext(
|
||||||
|
serializationContext: SerializationContext,
|
||||||
|
observableContext: ObservableContext
|
||||||
|
) = serializationContext.withProperty(RpcObservableContextKey, observableContext)
|
||||||
|
|
||||||
|
private fun <T> pinInSubscriptions(observable: Observable<T>, hardReferenceStore: MutableSet<Observable<*>>): Observable<T> {
|
||||||
|
val refCount = AtomicInteger(0)
|
||||||
|
return observable.doOnSubscribe {
|
||||||
|
if (refCount.getAndIncrement() == 0) {
|
||||||
|
require(hardReferenceStore.add(observable)) {
|
||||||
|
"Reference store already contained reference $this on add"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}.doOnUnsubscribe {
|
||||||
|
if (refCount.decrementAndGet() == 0) {
|
||||||
|
require(hardReferenceStore.remove(observable)) {
|
||||||
|
"Reference store did not contain reference $this on remove"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override val schemaForDocumentation = Schema(
|
||||||
|
listOf(
|
||||||
|
CompositeType(
|
||||||
|
name = type.toString(),
|
||||||
|
label = "",
|
||||||
|
provides = emptyList(),
|
||||||
|
descriptor = descriptor,
|
||||||
|
fields = listOf(
|
||||||
|
Field(
|
||||||
|
name = "observableId",
|
||||||
|
type = "string",
|
||||||
|
requires = emptyList(),
|
||||||
|
default = null,
|
||||||
|
label = null,
|
||||||
|
mandatory = true,
|
||||||
|
multiple = false),
|
||||||
|
Field(
|
||||||
|
name = "observableInstant",
|
||||||
|
type = "long",
|
||||||
|
requires = emptyList(),
|
||||||
|
default = null,
|
||||||
|
label = null,
|
||||||
|
mandatory = true,
|
||||||
|
multiple = false)
|
||||||
|
))))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts the serialized form, a blob, back into an Observable
|
||||||
|
*/
|
||||||
|
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput,
|
||||||
|
context: SerializationContext
|
||||||
|
): Observable<*> {
|
||||||
|
if (RpcObservableContextKey !in context.properties) {
|
||||||
|
throw NotSerializableException("Missing Observable Context Key on Client Context")
|
||||||
|
}
|
||||||
|
|
||||||
|
val observableContext =
|
||||||
|
context.properties[RpcClientObservableSerializer.RpcObservableContextKey] as ObservableContext
|
||||||
|
|
||||||
|
if (obj !is List<*>) throw NotSerializableException("Input must be a serialised list")
|
||||||
|
if (obj.size != 2) throw NotSerializableException("Expecting two elements, have ${obj.size}")
|
||||||
|
|
||||||
|
val observableId: Trace.InvocationId = Trace.InvocationId((obj[0] as String), Instant.ofEpochMilli((obj[1] as Long)))
|
||||||
|
val observable = UnicastSubject.create<Notification<*>>()
|
||||||
|
|
||||||
|
require(observableContext.observableMap.getIfPresent(observableId) == null) {
|
||||||
|
"Multiple Observables arrived with the same ID $observableId"
|
||||||
|
}
|
||||||
|
|
||||||
|
val rpcCallSite = getRpcCallSite(context, observableContext)
|
||||||
|
|
||||||
|
observableContext.observableMap.put(observableId, observable)
|
||||||
|
observableContext.callSiteMap?.put(observableId, rpcCallSite)
|
||||||
|
|
||||||
|
// We pin all Observables into a hard reference store (rooted in the RPC proxy) on subscription so that users
|
||||||
|
// don't need to store a reference to the Observables themselves.
|
||||||
|
return pinInSubscriptions(observable, observableContext.hardReferenceStore).doOnUnsubscribe {
|
||||||
|
// This causes Future completions to give warnings because the corresponding OnComplete sent from the server
|
||||||
|
// will arrive after the client unsubscribes from the observable and consequently invalidates the mapping.
|
||||||
|
// The unsubscribe is due to [ObservableToFuture]'s use of first().
|
||||||
|
observableContext.observableMap.invalidate(observableId)
|
||||||
|
}.dematerialize<Any>()
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun getRpcCallSite(context: SerializationContext, observableContext: ObservableContext): Throwable? {
|
||||||
|
val rpcRequestOrObservableId = context.properties[RPCApi.RpcRequestOrObservableIdKey] as Trace.InvocationId
|
||||||
|
return observableContext.callSiteMap?.get(rpcRequestOrObservableId)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun writeDescribedObject(
|
||||||
|
obj: Observable<*>,
|
||||||
|
data: Data,
|
||||||
|
type: Type,
|
||||||
|
output: SerializationOutput,
|
||||||
|
context: SerializationContext
|
||||||
|
) {
|
||||||
|
throw NotSupportedException()
|
||||||
|
}
|
||||||
|
}
|
@ -1,20 +1,25 @@
|
|||||||
package net.corda.client.rpc.internal.serialization.kryo
|
package net.corda.client.rpc.internal.serialization.kryo
|
||||||
|
|
||||||
import com.esotericsoftware.kryo.pool.KryoPool
|
import com.esotericsoftware.kryo.pool.KryoPool
|
||||||
|
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||||
import net.corda.core.serialization.SerializationContext
|
import net.corda.core.serialization.SerializationContext
|
||||||
import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
|
|
||||||
import net.corda.core.serialization.internal.SerializationEnvironment
|
import net.corda.core.serialization.internal.SerializationEnvironment
|
||||||
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
|
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
|
||||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||||
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
|
import net.corda.nodeapi.internal.serialization.*
|
||||||
import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT
|
|
||||||
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
|
|
||||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme
|
|
||||||
import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme
|
import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme
|
||||||
import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer
|
import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer
|
||||||
import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
|
import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
|
||||||
import net.corda.nodeapi.internal.serialization.kryo.RPCKryo
|
import net.corda.nodeapi.internal.serialization.kryo.RPCKryo
|
||||||
|
|
||||||
|
private val KRYO_RPC_CLIENT_CONTEXT = SerializationContextImpl(kryoMagic,
|
||||||
|
net.corda.core.serialization.SerializationDefaults.javaClass.classLoader,
|
||||||
|
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
||||||
|
emptyMap(),
|
||||||
|
true,
|
||||||
|
SerializationContext.UseCase.RPCClient,
|
||||||
|
null)
|
||||||
|
|
||||||
class KryoClientSerializationScheme : AbstractKryoSerializationScheme() {
|
class KryoClientSerializationScheme : AbstractKryoSerializationScheme() {
|
||||||
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
|
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
|
||||||
return magic == kryoMagic && (target == SerializationContext.UseCase.RPCClient || target == SerializationContext.UseCase.P2P)
|
return magic == kryoMagic && (target == SerializationContext.UseCase.RPCClient || target == SerializationContext.UseCase.P2P)
|
||||||
|
@ -15,12 +15,15 @@ import java.time.Instant
|
|||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A [Serializer] to deserialise Observables once the corresponding Kryo instance has been provided with an [ObservableContext].
|
* A [Serializer] to deserialize Observables once the corresponding Kryo instance has been provided with an [ObservableContext].
|
||||||
*/
|
*/
|
||||||
object RpcClientObservableSerializer : Serializer<Observable<*>>() {
|
object RpcClientObservableSerializer : Serializer<Observable<*>>() {
|
||||||
private object RpcObservableContextKey
|
private object RpcObservableContextKey
|
||||||
|
|
||||||
fun createContext(serializationContext: SerializationContext, observableContext: ObservableContext): SerializationContext {
|
fun createContext(
|
||||||
|
serializationContext: SerializationContext,
|
||||||
|
observableContext: ObservableContext
|
||||||
|
): SerializationContext {
|
||||||
return serializationContext.withProperty(RpcObservableContextKey, observableContext)
|
return serializationContext.withProperty(RpcObservableContextKey, observableContext)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package net.corda.client.rpc
|
package net.corda.client.rpc
|
||||||
|
|
||||||
|
import net.corda.core.CordaRuntimeException
|
||||||
import net.corda.core.concurrent.CordaFuture
|
import net.corda.core.concurrent.CordaFuture
|
||||||
import net.corda.core.internal.concurrent.doneFuture
|
import net.corda.core.internal.concurrent.doneFuture
|
||||||
import net.corda.core.internal.concurrent.openFuture
|
import net.corda.core.internal.concurrent.openFuture
|
||||||
@ -11,6 +12,7 @@ import net.corda.testing.node.internal.RPCDriverDSL
|
|||||||
import net.corda.testing.node.internal.rpcDriver
|
import net.corda.testing.node.internal.rpcDriver
|
||||||
import net.corda.testing.node.internal.rpcTestUser
|
import net.corda.testing.node.internal.rpcTestUser
|
||||||
import org.assertj.core.api.Assertions.assertThat
|
import org.assertj.core.api.Assertions.assertThat
|
||||||
|
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import org.junit.runner.RunWith
|
import org.junit.runner.RunWith
|
||||||
import org.junit.runners.Parameterized
|
import org.junit.runners.Parameterized
|
||||||
@ -77,9 +79,10 @@ class ClientRPCInfrastructureTests : AbstractRPCTest() {
|
|||||||
// Does nothing, doesn't throw.
|
// Does nothing, doesn't throw.
|
||||||
proxy.void()
|
proxy.void()
|
||||||
|
|
||||||
assertEquals("Barf!", assertFailsWith<IllegalArgumentException> {
|
assertThatThrownBy { proxy.barf() }
|
||||||
proxy.barf()
|
.isInstanceOf(CordaRuntimeException::class.java)
|
||||||
}.message)
|
.hasMessage("java.lang.IllegalArgumentException: Barf!")
|
||||||
|
|
||||||
|
|
||||||
assertEquals("hi 5", proxy.someCalculation("hi", 5))
|
assertEquals("hi 5", proxy.someCalculation("hi", 5))
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
package net.corda.client.rpc
|
package net.corda.client.rpc
|
||||||
|
|
||||||
import com.esotericsoftware.kryo.KryoException
|
import net.corda.core.CordaRuntimeException
|
||||||
import net.corda.core.concurrent.CordaFuture
|
import net.corda.core.concurrent.CordaFuture
|
||||||
import net.corda.core.internal.concurrent.openFuture
|
import net.corda.core.internal.concurrent.openFuture
|
||||||
import net.corda.core.messaging.*
|
import net.corda.core.messaging.*
|
||||||
@ -48,23 +48,29 @@ class RPCFailureTests {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `kotlin NPE`() = rpc {
|
fun `kotlin NPE`() = rpc {
|
||||||
assertThatThrownBy { it.kotlinNPE() }.isInstanceOf(KotlinNullPointerException::class.java)
|
assertThatThrownBy { it.kotlinNPE() }.isInstanceOf(CordaRuntimeException::class.java)
|
||||||
|
.hasMessageContaining("kotlin.KotlinNullPointerException")
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `kotlin NPE async`() = rpc {
|
fun `kotlin NPE async`() = rpc {
|
||||||
val future = it.kotlinNPEAsync()
|
val future = it.kotlinNPEAsync()
|
||||||
assertThatThrownBy { future.getOrThrow() }.isInstanceOf(KotlinNullPointerException::class.java)
|
assertThatThrownBy { future.getOrThrow() }.isInstanceOf(CordaRuntimeException::class.java)
|
||||||
|
.hasMessageContaining("kotlin.KotlinNullPointerException")
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun unserializable() = rpc {
|
fun `unserializable`() = rpc {
|
||||||
assertThatThrownBy { it.getUnserializable() }.isInstanceOf(KryoException::class.java)
|
assertThatThrownBy { it.getUnserializable() }.isInstanceOf(CordaRuntimeException::class.java)
|
||||||
|
.hasMessageContaining("java.io.NotSerializableException:")
|
||||||
|
.hasMessageContaining("Unserializable is not on the whitelist or annotated with @CordaSerializable.")
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `unserializable async`() = rpc {
|
fun `unserializable async`() = rpc {
|
||||||
val future = it.getUnserializableAsync()
|
val future = it.getUnserializableAsync()
|
||||||
assertThatThrownBy { future.getOrThrow() }.isInstanceOf(KryoException::class.java)
|
assertThatThrownBy { future.getOrThrow() }.isInstanceOf(CordaRuntimeException::class.java)
|
||||||
|
.hasMessageContaining("java.io.NotSerializableException:")
|
||||||
|
.hasMessageContaining("Unserializable is not on the whitelist or annotated with @CordaSerializable.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package net.corda.core.concurrent
|
package net.corda.core.concurrent
|
||||||
|
|
||||||
|
import net.corda.core.serialization.CordaSerializable
|
||||||
import java.util.concurrent.CompletableFuture
|
import java.util.concurrent.CompletableFuture
|
||||||
import java.util.concurrent.Future
|
import java.util.concurrent.Future
|
||||||
|
|
||||||
@ -7,6 +8,7 @@ import java.util.concurrent.Future
|
|||||||
* Same as [Future] with additional methods to provide some of the features of [java.util.concurrent.CompletableFuture] while minimising the API surface area.
|
* Same as [Future] with additional methods to provide some of the features of [java.util.concurrent.CompletableFuture] while minimising the API surface area.
|
||||||
* In Kotlin, to avoid compile errors, whenever CordaFuture is used in a parameter or extension method receiver type, its type parameter should be specified with out variance.
|
* In Kotlin, to avoid compile errors, whenever CordaFuture is used in a parameter or extension method receiver type, its type parameter should be specified with out variance.
|
||||||
*/
|
*/
|
||||||
|
@CordaSerializable
|
||||||
interface CordaFuture<V> : Future<V> {
|
interface CordaFuture<V> : Future<V> {
|
||||||
/**
|
/**
|
||||||
* Run the given callback when this future is done, on the completion thread.
|
* Run the given callback when this future is done, on the completion thread.
|
||||||
|
@ -10,6 +10,7 @@ import rx.Observable
|
|||||||
* [FlowHandle] is a serialisable handle for the started flow, parameterised by the type of the flow's return value.
|
* [FlowHandle] is a serialisable handle for the started flow, parameterised by the type of the flow's return value.
|
||||||
*/
|
*/
|
||||||
@DoNotImplement
|
@DoNotImplement
|
||||||
|
@CordaSerializable
|
||||||
interface FlowHandle<A> : AutoCloseable {
|
interface FlowHandle<A> : AutoCloseable {
|
||||||
/**
|
/**
|
||||||
* The started state machine's ID.
|
* The started state machine's ID.
|
||||||
|
@ -185,6 +185,15 @@ interface SerializationContext {
|
|||||||
enum class UseCase { P2P, RPCServer, RPCClient, Storage, Checkpoint, Testing }
|
enum class UseCase { P2P, RPCServer, RPCClient, Storage, Checkpoint, Testing }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set of well known properties that may be set on a serialization context. This doesn't preclude
|
||||||
|
* others being set that aren't keyed on this enumeration, but for general use properties adding a
|
||||||
|
* well known key here is preferred.
|
||||||
|
*/
|
||||||
|
enum class ContextPropertyKeys {
|
||||||
|
SERIALIZERS
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Global singletons to be used as defaults that are injected elsewhere (generally, in the node or in RPC client).
|
* Global singletons to be used as defaults that are injected elsewhere (generally, in the node or in RPC client).
|
||||||
*/
|
*/
|
||||||
|
@ -6,7 +6,7 @@ package net.corda.core.serialization
|
|||||||
* a proxy serializer can be written that extends this type whose purpose is to move between those an
|
* a proxy serializer can be written that extends this type whose purpose is to move between those an
|
||||||
* unserializable types and an intermediate representation.
|
* unserializable types and an intermediate representation.
|
||||||
*
|
*
|
||||||
* NOTE: The proxy object should be specified as a seperate class. However, this can be defined within the
|
* NOTE: The proxy object should be specified as a separate class. However, this can be defined within the
|
||||||
* scope of the custom serializer.
|
* scope of the custom serializer.
|
||||||
*/
|
*/
|
||||||
interface SerializationCustomSerializer<OBJ, PROXY> {
|
interface SerializationCustomSerializer<OBJ, PROXY> {
|
||||||
|
@ -28,7 +28,9 @@ class InternalNodeException(message: String) : CordaRuntimeException(message) {
|
|||||||
(wrapped as? CordaRuntimeException)?.setCause(null)
|
(wrapped as? CordaRuntimeException)?.setCause(null)
|
||||||
return when {
|
return when {
|
||||||
whitelisted.any { it.isInstance(wrapped) } -> wrapped
|
whitelisted.any { it.isInstance(wrapped) } -> wrapped
|
||||||
else -> InternalNodeException(DEFAULT_MESSAGE)
|
else -> InternalNodeException(DEFAULT_MESSAGE).apply {
|
||||||
|
stackTrace = emptyArray()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,10 +23,11 @@ import net.corda.nodeapi.internal.ContractsJarFile
|
|||||||
import net.corda.nodeapi.internal.DEV_ROOT_CA
|
import net.corda.nodeapi.internal.DEV_ROOT_CA
|
||||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||||
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX
|
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX
|
||||||
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
|
|
||||||
import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
|
import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
|
||||||
|
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
|
||||||
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
|
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
|
||||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
|
import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.amqpMagic
|
||||||
import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme
|
import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme
|
||||||
import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
|
import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
@ -278,7 +279,7 @@ class NetworkBootstrapper {
|
|||||||
_contextSerializationEnv.set(SerializationEnvironmentImpl(
|
_contextSerializationEnv.set(SerializationEnvironmentImpl(
|
||||||
SerializationFactoryImpl().apply {
|
SerializationFactoryImpl().apply {
|
||||||
registerScheme(KryoParametersSerializationScheme)
|
registerScheme(KryoParametersSerializationScheme)
|
||||||
registerScheme(AMQPServerSerializationScheme())
|
registerScheme(AMQPParametersSerializationScheme)
|
||||||
},
|
},
|
||||||
AMQP_P2P_CONTEXT)
|
AMQP_P2P_CONTEXT)
|
||||||
)
|
)
|
||||||
@ -292,4 +293,13 @@ class NetworkBootstrapper {
|
|||||||
override fun rpcClientKryoPool(context: SerializationContext) = throw UnsupportedOperationException()
|
override fun rpcClientKryoPool(context: SerializationContext) = throw UnsupportedOperationException()
|
||||||
override fun rpcServerKryoPool(context: SerializationContext) = throw UnsupportedOperationException()
|
override fun rpcServerKryoPool(context: SerializationContext) = throw UnsupportedOperationException()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private object AMQPParametersSerializationScheme : AbstractAMQPSerializationScheme(emptyList()) {
|
||||||
|
override fun rpcClientSerializerFactory(context: SerializationContext) = throw UnsupportedOperationException()
|
||||||
|
override fun rpcServerSerializerFactory(context: SerializationContext) = throw UnsupportedOperationException()
|
||||||
|
|
||||||
|
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
|
||||||
|
return magic == amqpMagic && target == SerializationContext.UseCase.P2P
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -352,7 +352,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
val connection = event.connection
|
val connection = event.connection
|
||||||
val channel = connection?.context as? Channel
|
val channel = connection?.context as? Channel
|
||||||
if (channel != null) {
|
if (channel != null) {
|
||||||
val appProperties = HashMap(amqpMessage.applicationProperties.value as Map<String, Any?>)
|
val appProperties = HashMap(amqpMessage.applicationProperties.value)
|
||||||
appProperties["_AMQ_VALIDATED_USER"] = remoteLegalName
|
appProperties["_AMQ_VALIDATED_USER"] = remoteLegalName
|
||||||
val localAddress = channel.localAddress() as InetSocketAddress
|
val localAddress = channel.localAddress() as InetSocketAddress
|
||||||
val remoteAddress = channel.remoteAddress() as InetSocketAddress
|
val remoteAddress = channel.remoteAddress() as InetSocketAddress
|
||||||
|
@ -13,13 +13,7 @@ import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
|
|||||||
* servers from trying to instantiate any of them.
|
* servers from trying to instantiate any of them.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
val KRYO_RPC_CLIENT_CONTEXT = SerializationContextImpl(kryoMagic,
|
|
||||||
SerializationDefaults.javaClass.classLoader,
|
|
||||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
|
||||||
emptyMap(),
|
|
||||||
true,
|
|
||||||
SerializationContext.UseCase.RPCClient,
|
|
||||||
null)
|
|
||||||
val AMQP_RPC_CLIENT_CONTEXT = SerializationContextImpl(amqpMagic,
|
val AMQP_RPC_CLIENT_CONTEXT = SerializationContextImpl(amqpMagic,
|
||||||
SerializationDefaults.javaClass.classLoader,
|
SerializationDefaults.javaClass.classLoader,
|
||||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
||||||
|
@ -115,7 +115,7 @@ open class SerializationFactoryImpl(
|
|||||||
return schemes.computeIfAbsent(lookupKey) {
|
return schemes.computeIfAbsent(lookupKey) {
|
||||||
registeredSchemes.filter { it.canDeserializeVersion(magic, target) }.forEach { return@computeIfAbsent it } // XXX: Not single?
|
registeredSchemes.filter { it.canDeserializeVersion(magic, target) }.forEach { return@computeIfAbsent it } // XXX: Not single?
|
||||||
logger.warn("Cannot find serialization scheme for: $lookupKey, registeredSchemes are: $registeredSchemes")
|
logger.warn("Cannot find serialization scheme for: $lookupKey, registeredSchemes are: $registeredSchemes")
|
||||||
throw UnsupportedOperationException("Serialization scheme not supported.")
|
throw UnsupportedOperationException("Serialization scheme $lookupKey not supported.")
|
||||||
} to magic
|
} to magic
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,13 +17,6 @@ import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
|
|||||||
* MUST be kept separate!
|
* MUST be kept separate!
|
||||||
*/
|
*/
|
||||||
|
|
||||||
val KRYO_RPC_SERVER_CONTEXT = SerializationContextImpl(kryoMagic,
|
|
||||||
SerializationDefaults.javaClass.classLoader,
|
|
||||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
|
||||||
emptyMap(),
|
|
||||||
true,
|
|
||||||
SerializationContext.UseCase.RPCServer,
|
|
||||||
null)
|
|
||||||
|
|
||||||
val AMQP_STORAGE_CONTEXT = SerializationContextImpl(amqpMagic,
|
val AMQP_STORAGE_CONTEXT = SerializationContextImpl(amqpMagic,
|
||||||
SerializationDefaults.javaClass.classLoader,
|
SerializationDefaults.javaClass.classLoader,
|
||||||
|
@ -5,6 +5,7 @@ package net.corda.nodeapi.internal.serialization.amqp
|
|||||||
import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner
|
import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner
|
||||||
import net.corda.core.cordapp.Cordapp
|
import net.corda.core.cordapp.Cordapp
|
||||||
import net.corda.core.internal.objectOrNewInstance
|
import net.corda.core.internal.objectOrNewInstance
|
||||||
|
import net.corda.core.internal.uncheckedCast
|
||||||
import net.corda.core.serialization.*
|
import net.corda.core.serialization.*
|
||||||
import net.corda.core.utilities.ByteSequence
|
import net.corda.core.utilities.ByteSequence
|
||||||
import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
|
import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
|
||||||
@ -12,7 +13,6 @@ import net.corda.nodeapi.internal.serialization.DefaultWhitelist
|
|||||||
import net.corda.nodeapi.internal.serialization.MutableClassWhitelist
|
import net.corda.nodeapi.internal.serialization.MutableClassWhitelist
|
||||||
import net.corda.nodeapi.internal.serialization.SerializationScheme
|
import net.corda.nodeapi.internal.serialization.SerializationScheme
|
||||||
import java.lang.reflect.Modifier
|
import java.lang.reflect.Modifier
|
||||||
import java.security.PublicKey
|
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
|
||||||
@ -118,6 +118,12 @@ abstract class AbstractAMQPSerializationScheme(
|
|||||||
factory.registerExternal(CorDappCustomSerializer(customSerializer, factory))
|
factory.registerExternal(CorDappCustomSerializer(customSerializer, factory))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
context.properties[ContextPropertyKeys.SERIALIZERS]?.apply {
|
||||||
|
uncheckedCast<Any, List<CustomSerializer<out Any>>>(this).forEach {
|
||||||
|
factory.register(it)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -131,7 +137,9 @@ abstract class AbstractAMQPSerializationScheme(
|
|||||||
|
|
||||||
protected abstract fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory
|
protected abstract fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory
|
||||||
protected abstract fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory
|
protected abstract fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory
|
||||||
protected open val publicKeySerializer: CustomSerializer.Implements<PublicKey> = net.corda.nodeapi.internal.serialization.amqp.custom.PublicKeySerializer
|
|
||||||
|
// Not used as a simple direct import to facilitate testing
|
||||||
|
open val publicKeySerializer : CustomSerializer<*> = net.corda.nodeapi.internal.serialization.amqp.custom.PublicKeySerializer
|
||||||
|
|
||||||
private fun getSerializerFactory(context: SerializationContext): SerializerFactory {
|
private fun getSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||||
return serializerFactoriesForContexts.computeIfAbsent(Pair(context.whitelist, context.deserializationClassLoader)) {
|
return serializerFactoriesForContexts.computeIfAbsent(Pair(context.whitelist, context.deserializationClassLoader)) {
|
||||||
@ -162,52 +170,3 @@ abstract class AbstractAMQPSerializationScheme(
|
|||||||
|
|
||||||
protected fun canDeserializeVersion(magic: CordaSerializationMagic) = magic == amqpMagic
|
protected fun canDeserializeVersion(magic: CordaSerializationMagic) = magic == amqpMagic
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: This will eventually cover server RPC as well and move to node module, but for now this is not implemented
|
|
||||||
class AMQPServerSerializationScheme(
|
|
||||||
cordappCustomSerializers: Set<SerializationCustomSerializer<*, *>>,
|
|
||||||
serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>
|
|
||||||
) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) {
|
|
||||||
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, ConcurrentHashMap())
|
|
||||||
|
|
||||||
constructor() : this(emptySet(), ConcurrentHashMap())
|
|
||||||
|
|
||||||
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
|
|
||||||
throw UnsupportedOperationException()
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory {
|
|
||||||
throw UnsupportedOperationException()
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
|
|
||||||
return canDeserializeVersion(magic) &&
|
|
||||||
(target == SerializationContext.UseCase.P2P || target == SerializationContext.UseCase.Storage)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: This will eventually cover client RPC as well and move to client module, but for now this is not implemented
|
|
||||||
class AMQPClientSerializationScheme(
|
|
||||||
cordappCustomSerializers: Set<SerializationCustomSerializer<*,*>>,
|
|
||||||
serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>
|
|
||||||
) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) {
|
|
||||||
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, ConcurrentHashMap())
|
|
||||||
|
|
||||||
constructor() : this(emptySet(), ConcurrentHashMap())
|
|
||||||
|
|
||||||
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
|
|
||||||
throw UnsupportedOperationException()
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory {
|
|
||||||
throw UnsupportedOperationException()
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
|
|
||||||
return canDeserializeVersion(magic) &&
|
|
||||||
(target == SerializationContext.UseCase.P2P || target == SerializationContext.UseCase.Storage)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
@ -2,7 +2,6 @@ package net.corda.nodeapi.internal.serialization.amqp.custom
|
|||||||
|
|
||||||
import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer
|
import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer
|
||||||
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
||||||
import net.corda.nodeapi.internal.serialization.amqp.custom.ClassSerializer.ClassProxy
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A serializer for [Class] that uses [ClassProxy] proxy object to write out
|
* A serializer for [Class] that uses [ClassProxy] proxy object to write out
|
||||||
|
@ -39,7 +39,7 @@ object InputStreamSerializer : CustomSerializer.Implements<InputStream>(InputStr
|
|||||||
|
|
||||||
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput,
|
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput,
|
||||||
context: SerializationContext
|
context: SerializationContext
|
||||||
): InputStream {
|
) : InputStream {
|
||||||
val bits = input.readObject(obj, schemas, ByteArray::class.java, context) as ByteArray
|
val bits = input.readObject(obj, schemas, ByteArray::class.java, context) as ByteArray
|
||||||
return bits.inputStream()
|
return bits.inputStream()
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,28 @@
|
|||||||
|
package net.corda.nodeapi.internal.serialization.amqp.custom
|
||||||
|
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
||||||
|
import rx.Notification
|
||||||
|
|
||||||
|
class RxNotificationSerializer(
|
||||||
|
factory: SerializerFactory
|
||||||
|
) : CustomSerializer.Proxy<rx.Notification<*>, RxNotificationSerializer.Proxy>(
|
||||||
|
Notification::class.java,
|
||||||
|
Proxy::class.java,
|
||||||
|
factory
|
||||||
|
) {
|
||||||
|
data class Proxy(
|
||||||
|
val kind: Notification.Kind,
|
||||||
|
val t: Throwable?,
|
||||||
|
val value: Any?)
|
||||||
|
|
||||||
|
override fun toProxy(obj: Notification<*>) = Proxy(obj.kind, obj.throwable, obj.value)
|
||||||
|
|
||||||
|
override fun fromProxy(proxy: Proxy): Notification<*> {
|
||||||
|
return when (proxy.kind) {
|
||||||
|
Notification.Kind.OnCompleted -> Notification.createOnCompleted<Any>()
|
||||||
|
Notification.Kind.OnError -> Notification.createOnError<Any>(proxy.t)
|
||||||
|
Notification.Kind.OnNext -> Notification.createOnNext(proxy.value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -8,12 +8,14 @@ import net.corda.core.internal.div
|
|||||||
import net.corda.core.serialization.SerializationContext
|
import net.corda.core.serialization.SerializationContext
|
||||||
import net.corda.core.serialization.deserialize
|
import net.corda.core.serialization.deserialize
|
||||||
import net.corda.core.serialization.serialize
|
import net.corda.core.serialization.serialize
|
||||||
|
import net.corda.node.serialization.amqp.AMQPServerSerializationScheme
|
||||||
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
||||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||||
import net.corda.nodeapi.internal.createDevKeyStores
|
import net.corda.nodeapi.internal.createDevKeyStores
|
||||||
import net.corda.nodeapi.internal.serialization.AllWhitelist
|
import net.corda.nodeapi.internal.serialization.AllWhitelist
|
||||||
import net.corda.nodeapi.internal.serialization.SerializationContextImpl
|
import net.corda.nodeapi.internal.serialization.SerializationContextImpl
|
||||||
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
|
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.amqpMagic
|
||||||
import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
|
import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
|
||||||
import net.corda.testing.core.ALICE_NAME
|
import net.corda.testing.core.ALICE_NAME
|
||||||
import net.corda.testing.core.BOB_NAME
|
import net.corda.testing.core.BOB_NAME
|
||||||
@ -335,8 +337,8 @@ class X509UtilitiesTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `serialize - deserialize X509Certififcate`() {
|
fun `serialize - deserialize X509Certififcate`() {
|
||||||
val factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) }
|
val factory = SerializationFactoryImpl().apply { registerScheme(AMQPServerSerializationScheme()) }
|
||||||
val context = SerializationContextImpl(kryoMagic,
|
val context = SerializationContextImpl(amqpMagic,
|
||||||
javaClass.classLoader,
|
javaClass.classLoader,
|
||||||
AllWhitelist,
|
AllWhitelist,
|
||||||
emptyMap(),
|
emptyMap(),
|
||||||
@ -351,8 +353,8 @@ class X509UtilitiesTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `serialize - deserialize X509CertPath`() {
|
fun `serialize - deserialize X509CertPath`() {
|
||||||
val factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) }
|
val factory = SerializationFactoryImpl().apply { registerScheme(AMQPServerSerializationScheme()) }
|
||||||
val context = SerializationContextImpl(kryoMagic,
|
val context = SerializationContextImpl(amqpMagic,
|
||||||
javaClass.classLoader,
|
javaClass.classLoader,
|
||||||
AllWhitelist,
|
AllWhitelist,
|
||||||
emptyMap(),
|
emptyMap(),
|
||||||
|
@ -30,6 +30,7 @@ import net.corda.core.serialization.SerializationContext
|
|||||||
import net.corda.core.serialization.SerializationFactory
|
import net.corda.core.serialization.SerializationFactory
|
||||||
import net.corda.core.transactions.LedgerTransaction
|
import net.corda.core.transactions.LedgerTransaction
|
||||||
import net.corda.core.utilities.OpaqueBytes
|
import net.corda.core.utilities.OpaqueBytes
|
||||||
|
import net.corda.node.serialization.amqp.AMQPServerSerializationScheme
|
||||||
import net.corda.nodeapi.internal.DEV_INTERMEDIATE_CA
|
import net.corda.nodeapi.internal.DEV_INTERMEDIATE_CA
|
||||||
import net.corda.nodeapi.internal.crypto.ContentSignerBuilder
|
import net.corda.nodeapi.internal.crypto.ContentSignerBuilder
|
||||||
import net.corda.nodeapi.internal.serialization.AllWhitelist
|
import net.corda.nodeapi.internal.serialization.AllWhitelist
|
||||||
|
@ -12,6 +12,7 @@ import net.corda.core.contracts.PrivacySalt
|
|||||||
import net.corda.core.crypto.*
|
import net.corda.core.crypto.*
|
||||||
import net.corda.core.internal.FetchDataFlow
|
import net.corda.core.internal.FetchDataFlow
|
||||||
import net.corda.core.serialization.*
|
import net.corda.core.serialization.*
|
||||||
|
import net.corda.core.utilities.ByteSequence
|
||||||
import net.corda.core.utilities.ProgressTracker
|
import net.corda.core.utilities.ProgressTracker
|
||||||
import net.corda.core.utilities.sequence
|
import net.corda.core.utilities.sequence
|
||||||
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
||||||
@ -76,11 +77,12 @@ class KryoTests(private val compression: CordaSerializationEncoding?) {
|
|||||||
assertThat(bits.deserialize(factory, context)).isEqualTo(Person("bob", null))
|
assertThat(bits.deserialize(factory, context)).isEqualTo(Person("bob", null))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `serialised form is stable when the same object instance is added to the deserialised object graph`() {
|
fun `serialised form is stable when the same object instance is added to the deserialised object graph`() {
|
||||||
val noReferencesContext = context.withoutReferences()
|
val noReferencesContext = context.withoutReferences()
|
||||||
val obj = Ints.toByteArray(0x01234567).sequence()
|
val obj : ByteSequence = Ints.toByteArray(0x01234567).sequence()
|
||||||
val originalList = arrayListOf(obj)
|
val originalList : ArrayList<ByteSequence> = arrayListOf(obj)
|
||||||
val deserialisedList = originalList.serialize(factory, noReferencesContext).deserialize(factory, noReferencesContext)
|
val deserialisedList = originalList.serialize(factory, noReferencesContext).deserialize(factory, noReferencesContext)
|
||||||
originalList += obj
|
originalList += obj
|
||||||
deserialisedList += obj
|
deserialisedList += obj
|
||||||
|
@ -2,6 +2,7 @@ package net.corda.node
|
|||||||
|
|
||||||
import co.paralleluniverse.fibers.Suspendable
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
import net.corda.client.rpc.CordaRPCClient
|
import net.corda.client.rpc.CordaRPCClient
|
||||||
|
import net.corda.core.CordaRuntimeException
|
||||||
import net.corda.core.flows.FlowLogic
|
import net.corda.core.flows.FlowLogic
|
||||||
import net.corda.core.flows.StartableByRPC
|
import net.corda.core.flows.StartableByRPC
|
||||||
import net.corda.core.internal.div
|
import net.corda.core.internal.div
|
||||||
@ -11,6 +12,7 @@ import net.corda.core.messaging.startFlow
|
|||||||
import net.corda.core.utilities.getOrThrow
|
import net.corda.core.utilities.getOrThrow
|
||||||
import net.corda.node.internal.NodeStartup
|
import net.corda.node.internal.NodeStartup
|
||||||
import net.corda.node.services.Permissions.Companion.startFlow
|
import net.corda.node.services.Permissions.Companion.startFlow
|
||||||
|
import net.corda.nodeapi.exceptions.InternalNodeException
|
||||||
import net.corda.testing.common.internal.ProjectStructure.projectRootDir
|
import net.corda.testing.common.internal.ProjectStructure.projectRootDir
|
||||||
import net.corda.testing.core.ALICE_NAME
|
import net.corda.testing.core.ALICE_NAME
|
||||||
import net.corda.testing.driver.DriverParameters
|
import net.corda.testing.driver.DriverParameters
|
||||||
@ -28,8 +30,11 @@ class BootTests {
|
|||||||
fun `java deserialization is disabled`() {
|
fun `java deserialization is disabled`() {
|
||||||
driver {
|
driver {
|
||||||
val user = User("u", "p", setOf(startFlow<ObjectInputStreamFlow>()))
|
val user = User("u", "p", setOf(startFlow<ObjectInputStreamFlow>()))
|
||||||
val future = CordaRPCClient(startNode(rpcUsers = listOf(user)).getOrThrow().rpcAddress).start(user.username, user.password).proxy.startFlow(::ObjectInputStreamFlow).returnValue
|
val future = CordaRPCClient(startNode(rpcUsers = listOf(user)).getOrThrow().rpcAddress).
|
||||||
assertThatThrownBy { future.getOrThrow() }.isInstanceOf(InvalidClassException::class.java).hasMessage("filter status: REJECTED")
|
start(user.username, user.password).proxy.startFlow(::ObjectInputStreamFlow).returnValue
|
||||||
|
assertThatThrownBy { future.getOrThrow() }
|
||||||
|
.isInstanceOf(CordaRuntimeException::class.java)
|
||||||
|
.hasMessageContaining(InternalNodeException.defaultMessage())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,8 +33,8 @@ class RpcExceptionHandlingTest {
|
|||||||
|
|
||||||
val node = startNode(NodeParameters(rpcUsers = users)).getOrThrow()
|
val node = startNode(NodeParameters(rpcUsers = users)).getOrThrow()
|
||||||
|
|
||||||
assertThatCode { node.rpc.startFlow(::Flow).returnValue.getOrThrow() }
|
assertThatCode { node.rpc.startFlow(::Flow).returnValue.getOrThrow() }.isInstanceOfSatisfying(InternalNodeException::class.java) { exception ->
|
||||||
.isInstanceOfSatisfying(InternalNodeException::class.java) { exception ->
|
|
||||||
assertThat(exception).hasNoCause()
|
assertThat(exception).hasNoCause()
|
||||||
assertThat(exception.stackTrace).isEmpty()
|
assertThat(exception.stackTrace).isEmpty()
|
||||||
assertThat(exception.message).isEqualTo(InternalNodeException.defaultMessage())
|
assertThat(exception.message).isEqualTo(InternalNodeException.defaultMessage())
|
||||||
@ -49,8 +49,8 @@ class RpcExceptionHandlingTest {
|
|||||||
val node = startNode(NodeParameters(rpcUsers = users)).getOrThrow()
|
val node = startNode(NodeParameters(rpcUsers = users)).getOrThrow()
|
||||||
val clientRelevantMessage = "This is for the players!"
|
val clientRelevantMessage = "This is for the players!"
|
||||||
|
|
||||||
assertThatCode { node.rpc.startFlow(::ClientRelevantErrorFlow, clientRelevantMessage).returnValue.getOrThrow() }
|
assertThatCode { node.rpc.startFlow(::ClientRelevantErrorFlow, clientRelevantMessage).returnValue.getOrThrow() }.isInstanceOfSatisfying(ClientRelevantException::class.java) { exception ->
|
||||||
.isInstanceOfSatisfying(ClientRelevantException::class.java) { exception ->
|
|
||||||
assertThat(exception).hasNoCause()
|
assertThat(exception).hasNoCause()
|
||||||
assertThat(exception.stackTrace).isEmpty()
|
assertThat(exception.stackTrace).isEmpty()
|
||||||
assertThat(exception.message).isEqualTo(clientRelevantMessage)
|
assertThat(exception.message).isEqualTo(clientRelevantMessage)
|
||||||
@ -81,6 +81,7 @@ class RpcExceptionHandlingTest {
|
|||||||
|
|
||||||
assertThatCode { nodeA.rpc.startFlow(::InitFlow, nodeB.nodeInfo.singleIdentity()).returnValue.getOrThrow() }
|
assertThatCode { nodeA.rpc.startFlow(::InitFlow, nodeB.nodeInfo.singleIdentity()).returnValue.getOrThrow() }
|
||||||
.isInstanceOfSatisfying(InternalNodeException::class.java) { exception ->
|
.isInstanceOfSatisfying(InternalNodeException::class.java) { exception ->
|
||||||
|
|
||||||
assertThat(exception).hasNoCause()
|
assertThat(exception).hasNoCause()
|
||||||
assertThat(exception.stackTrace).isEmpty()
|
assertThat(exception.stackTrace).isEmpty()
|
||||||
assertThat(exception.message).isEqualTo(InternalNodeException.defaultMessage())
|
assertThat(exception.message).isEqualTo(InternalNodeException.defaultMessage())
|
||||||
|
@ -26,6 +26,7 @@ import net.corda.node.internal.artemis.BrokerAddresses
|
|||||||
import net.corda.node.internal.cordapp.CordappLoader
|
import net.corda.node.internal.cordapp.CordappLoader
|
||||||
import net.corda.node.internal.security.RPCSecurityManagerImpl
|
import net.corda.node.internal.security.RPCSecurityManagerImpl
|
||||||
import net.corda.node.internal.security.RPCSecurityManagerWithAdditionalUser
|
import net.corda.node.internal.security.RPCSecurityManagerWithAdditionalUser
|
||||||
|
import net.corda.node.serialization.amqp.AMQPServerSerializationScheme
|
||||||
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
||||||
import net.corda.node.services.api.NodePropertiesStore
|
import net.corda.node.services.api.NodePropertiesStore
|
||||||
import net.corda.node.services.api.SchemaService
|
import net.corda.node.services.api.SchemaService
|
||||||
@ -42,7 +43,6 @@ import net.corda.nodeapi.internal.addShutdownHook
|
|||||||
import net.corda.nodeapi.internal.bridging.BridgeControlListener
|
import net.corda.nodeapi.internal.bridging.BridgeControlListener
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
import net.corda.nodeapi.internal.serialization.*
|
import net.corda.nodeapi.internal.serialization.*
|
||||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
|
|
||||||
import org.slf4j.Logger
|
import org.slf4j.Logger
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import rx.Scheduler
|
import rx.Scheduler
|
||||||
@ -389,10 +389,10 @@ open class Node(configuration: NodeConfiguration,
|
|||||||
registerScheme(KryoClientSerializationScheme())
|
registerScheme(KryoClientSerializationScheme())
|
||||||
},
|
},
|
||||||
p2pContext = AMQP_P2P_CONTEXT.withClassLoader(classloader),
|
p2pContext = AMQP_P2P_CONTEXT.withClassLoader(classloader),
|
||||||
rpcServerContext = KRYO_RPC_SERVER_CONTEXT.withClassLoader(classloader),
|
rpcServerContext = AMQP_RPC_SERVER_CONTEXT.withClassLoader(classloader),
|
||||||
storageContext = AMQP_STORAGE_CONTEXT.withClassLoader(classloader),
|
storageContext = AMQP_STORAGE_CONTEXT.withClassLoader(classloader),
|
||||||
checkpointContext = KRYO_CHECKPOINT_CONTEXT.withClassLoader(classloader),
|
checkpointContext = KRYO_CHECKPOINT_CONTEXT.withClassLoader(classloader),
|
||||||
rpcClientContext = if (configuration.shouldInitCrashShell()) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classloader) else null) //even Shell embeded in the node connects via RPC to the node
|
rpcClientContext = if (configuration.shouldInitCrashShell()) AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classloader) else null) //even Shell embeded in the node connects via RPC to the node
|
||||||
}
|
}
|
||||||
|
|
||||||
private var rpcMessagingClient: RPCMessagingClient? = null
|
private var rpcMessagingClient: RPCMessagingClient? = null
|
||||||
|
@ -0,0 +1,46 @@
|
|||||||
|
package net.corda.node.serialization.amqp
|
||||||
|
|
||||||
|
import net.corda.core.cordapp.Cordapp
|
||||||
|
import net.corda.core.serialization.ClassWhitelist
|
||||||
|
import net.corda.core.serialization.SerializationContext
|
||||||
|
import net.corda.core.serialization.SerializationCustomSerializer
|
||||||
|
import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.custom.RxNotificationSerializer
|
||||||
|
|
||||||
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When set as the serialization scheme, defines the RPC Server serialization scheme as using the Corda
|
||||||
|
* AMQP implementation.
|
||||||
|
*/
|
||||||
|
class AMQPServerSerializationScheme(
|
||||||
|
cordappCustomSerializers: Set<SerializationCustomSerializer<*, *>>,
|
||||||
|
serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>
|
||||||
|
) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) {
|
||||||
|
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, ConcurrentHashMap())
|
||||||
|
|
||||||
|
constructor() : this(emptySet(), ConcurrentHashMap())
|
||||||
|
|
||||||
|
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||||
|
throw UnsupportedOperationException()
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun rpcServerSerializerFactory(context: SerializationContext) =
|
||||||
|
SerializerFactory(
|
||||||
|
context.whitelist,
|
||||||
|
context.deserializationClassLoader
|
||||||
|
).apply {
|
||||||
|
register(RpcServerObservableSerializer())
|
||||||
|
register(RpcServerCordaFutureSerializer(this))
|
||||||
|
register(RxNotificationSerializer(this))
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean {
|
||||||
|
return canDeserializeVersion(magic) &&
|
||||||
|
( target == SerializationContext.UseCase.P2P
|
||||||
|
|| target == SerializationContext.UseCase.Storage
|
||||||
|
|| target == SerializationContext.UseCase.RPCServer)
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,35 @@
|
|||||||
|
package net.corda.node.serialization.amqp
|
||||||
|
|
||||||
|
import net.corda.core.concurrent.CordaFuture
|
||||||
|
import net.corda.core.toObservable
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
||||||
|
import rx.Observable
|
||||||
|
import java.io.NotSerializableException
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Serializer for [CordaFuture] objects where Futures are converted to Observables and
|
||||||
|
* are thus dealt with by the [RpcServerObservableSerializer]
|
||||||
|
*/
|
||||||
|
class RpcServerCordaFutureSerializer(factory: SerializerFactory)
|
||||||
|
: CustomSerializer.Proxy<CordaFuture<*>,
|
||||||
|
RpcServerCordaFutureSerializer.FutureProxy>(
|
||||||
|
CordaFuture::class.java, RpcServerCordaFutureSerializer.FutureProxy::class.java, factory
|
||||||
|
) {
|
||||||
|
override fun fromProxy(proxy: RpcServerCordaFutureSerializer.FutureProxy): CordaFuture<*> {
|
||||||
|
throw UnsupportedOperationException()
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun toProxy(obj: CordaFuture<*>): RpcServerCordaFutureSerializer.FutureProxy {
|
||||||
|
try {
|
||||||
|
return FutureProxy(obj.toObservable())
|
||||||
|
} catch (e: NotSerializableException) {
|
||||||
|
throw (NotSerializableException("Failed to serialize Future as proxy Observable - ${e.message}"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
data class FutureProxy(val observable: Observable<*>)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -0,0 +1,140 @@
|
|||||||
|
package net.corda.node.serialization.amqp
|
||||||
|
|
||||||
|
import net.corda.core.context.Trace
|
||||||
|
import net.corda.core.serialization.SerializationContext
|
||||||
|
import net.corda.core.utilities.loggerFor
|
||||||
|
import net.corda.node.services.messaging.ObservableContextInterface
|
||||||
|
import net.corda.node.services.messaging.ObservableSubscription
|
||||||
|
import net.corda.nodeapi.RPCApi
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.*
|
||||||
|
import org.apache.qpid.proton.codec.Data
|
||||||
|
|
||||||
|
import rx.Notification
|
||||||
|
import rx.Observable
|
||||||
|
import rx.Subscriber
|
||||||
|
import java.io.NotSerializableException
|
||||||
|
|
||||||
|
import java.lang.reflect.Type
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Server side serializer that notionally serializes RxObservables when used by the RPC
|
||||||
|
* framework for event subscriptions. Notional in the sense that the actual observable
|
||||||
|
* isn't serialized, rather a reference to the observable is, this is then used by
|
||||||
|
* the client side RPC handler to subscribe to the observable stream.
|
||||||
|
*/
|
||||||
|
class RpcServerObservableSerializer : CustomSerializer.Implements<Observable<*>>(
|
||||||
|
Observable::class.java
|
||||||
|
) {
|
||||||
|
// Would be great to make this private, but then it's so much harder to unit test
|
||||||
|
object RpcObservableContextKey
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
fun createContext(
|
||||||
|
serializationContext: SerializationContext,
|
||||||
|
observableContext: ObservableContextInterface
|
||||||
|
) = serializationContext.withProperty(
|
||||||
|
RpcServerObservableSerializer.RpcObservableContextKey, observableContext)
|
||||||
|
}
|
||||||
|
|
||||||
|
override val schemaForDocumentation = Schema(
|
||||||
|
listOf(
|
||||||
|
CompositeType(
|
||||||
|
name = type.toString(),
|
||||||
|
label = "",
|
||||||
|
provides = emptyList(),
|
||||||
|
descriptor = descriptor,
|
||||||
|
fields = listOf(
|
||||||
|
Field(
|
||||||
|
name = "observableId",
|
||||||
|
type = "string",
|
||||||
|
requires = emptyList(),
|
||||||
|
default = null,
|
||||||
|
label = null,
|
||||||
|
mandatory = true,
|
||||||
|
multiple = false),
|
||||||
|
Field(
|
||||||
|
name = "observableInstant",
|
||||||
|
type = "long",
|
||||||
|
requires = emptyList(),
|
||||||
|
default = null,
|
||||||
|
label = null,
|
||||||
|
mandatory = true,
|
||||||
|
multiple = false)
|
||||||
|
))))
|
||||||
|
|
||||||
|
override fun readObject(
|
||||||
|
obj: Any, schemas: SerializationSchemas,
|
||||||
|
input: DeserializationInput,
|
||||||
|
context: SerializationContext
|
||||||
|
): Observable<*> {
|
||||||
|
throw UnsupportedOperationException()
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun writeDescribedObject(
|
||||||
|
obj: Observable<*>,
|
||||||
|
data: Data,
|
||||||
|
type: Type,
|
||||||
|
output: SerializationOutput,
|
||||||
|
context: SerializationContext
|
||||||
|
) {
|
||||||
|
val observableId = Trace.InvocationId.newInstance()
|
||||||
|
if (RpcServerObservableSerializer.RpcObservableContextKey !in context.properties) {
|
||||||
|
throw NotSerializableException("Missing Observable Key on serialization context - $type")
|
||||||
|
}
|
||||||
|
|
||||||
|
val observableContext = context.properties[RpcServerObservableSerializer.RpcObservableContextKey]
|
||||||
|
as ObservableContextInterface
|
||||||
|
|
||||||
|
data.withList {
|
||||||
|
data.putString(observableId.value)
|
||||||
|
data.putLong(observableId.timestamp.toEpochMilli())
|
||||||
|
}
|
||||||
|
|
||||||
|
val observableWithSubscription = ObservableSubscription(
|
||||||
|
subscription = obj.materialize().subscribe(
|
||||||
|
object : Subscriber<Notification<*>>() {
|
||||||
|
override fun onNext(observation: Notification<*>) {
|
||||||
|
if (!isUnsubscribed) {
|
||||||
|
val message = RPCApi.ServerToClient.Observation(
|
||||||
|
id = observableId,
|
||||||
|
content = observation,
|
||||||
|
deduplicationIdentity = observableContext.deduplicationIdentity
|
||||||
|
)
|
||||||
|
observableContext.sendMessage(message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun onError(exception: Throwable) {
|
||||||
|
loggerFor<RpcServerObservableSerializer>().error(
|
||||||
|
"onError called in materialize()d RPC Observable", exception)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun onCompleted() {
|
||||||
|
observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables ->
|
||||||
|
if (observables != null) {
|
||||||
|
observables.remove(observableId)
|
||||||
|
if (observables.isEmpty()) {
|
||||||
|
null
|
||||||
|
} else {
|
||||||
|
observables
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables ->
|
||||||
|
if (observables == null) {
|
||||||
|
hashSetOf(observableId)
|
||||||
|
} else {
|
||||||
|
observables.add(observableId)
|
||||||
|
observables
|
||||||
|
}
|
||||||
|
}
|
||||||
|
observableContext.observableMap.put(observableId, observableWithSubscription)
|
||||||
|
}
|
||||||
|
}
|
@ -19,6 +19,7 @@ object RpcServerObservableSerializer : Serializer<Observable<*>>() {
|
|||||||
private object RpcObservableContextKey
|
private object RpcObservableContextKey
|
||||||
|
|
||||||
private val log = LoggerFactory.getLogger(javaClass)
|
private val log = LoggerFactory.getLogger(javaClass)
|
||||||
|
|
||||||
fun createContext(observableContext: RPCServer.ObservableContext): SerializationContext {
|
fun createContext(observableContext: RPCServer.ObservableContext): SerializationContext {
|
||||||
return SerializationDefaults.RPC_SERVER_CONTEXT.withProperty(RpcServerObservableSerializer.RpcObservableContextKey, observableContext)
|
return SerializationDefaults.RPC_SERVER_CONTEXT.withProperty(RpcServerObservableSerializer.RpcObservableContextKey, observableContext)
|
||||||
}
|
}
|
||||||
@ -80,7 +81,6 @@ object RpcServerObservableSerializer : Serializer<Observable<*>>() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun Output.writeInvocationId(id: Trace.InvocationId) {
|
private fun Output.writeInvocationId(id: Trace.InvocationId) {
|
||||||
|
|
||||||
writeString(id.value)
|
writeString(id.value)
|
||||||
writeLong(id.timestamp.toEpochMilli())
|
writeLong(id.timestamp.toEpochMilli())
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,16 @@
|
|||||||
|
package net.corda.node.services.messaging
|
||||||
|
|
||||||
|
import com.github.benmanes.caffeine.cache.Cache
|
||||||
|
import net.corda.core.context.Trace
|
||||||
|
import net.corda.nodeapi.RPCApi
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
|
||||||
|
interface ObservableContextInterface {
|
||||||
|
fun sendMessage(serverToClient: RPCApi.ServerToClient)
|
||||||
|
|
||||||
|
val observableMap: Cache<Trace.InvocationId, ObservableSubscription>
|
||||||
|
val clientAddressToObservables: ConcurrentHashMap<SimpleString, HashSet<Trace.InvocationId>>
|
||||||
|
val deduplicationIdentity: String
|
||||||
|
val clientAddress: SimpleString
|
||||||
|
}
|
@ -15,13 +15,14 @@ import net.corda.core.identity.CordaX500Name
|
|||||||
import net.corda.core.internal.LifeCycle
|
import net.corda.core.internal.LifeCycle
|
||||||
import net.corda.core.messaging.RPCOps
|
import net.corda.core.messaging.RPCOps
|
||||||
import net.corda.core.serialization.SerializationContext
|
import net.corda.core.serialization.SerializationContext
|
||||||
|
import net.corda.core.serialization.SerializationDefaults
|
||||||
import net.corda.core.serialization.SerializationDefaults.RPC_SERVER_CONTEXT
|
import net.corda.core.serialization.SerializationDefaults.RPC_SERVER_CONTEXT
|
||||||
import net.corda.core.serialization.deserialize
|
import net.corda.core.serialization.deserialize
|
||||||
import net.corda.core.utilities.*
|
import net.corda.core.utilities.*
|
||||||
import net.corda.node.internal.security.AuthorizingSubject
|
import net.corda.node.internal.security.AuthorizingSubject
|
||||||
import net.corda.node.internal.security.RPCSecurityManager
|
import net.corda.node.internal.security.RPCSecurityManager
|
||||||
import net.corda.node.serialization.kryo.RpcServerObservableSerializer
|
|
||||||
import net.corda.node.services.logging.pushToLoggingContext
|
import net.corda.node.services.logging.pushToLoggingContext
|
||||||
|
import net.corda.node.serialization.amqp.RpcServerObservableSerializer
|
||||||
import net.corda.nodeapi.RPCApi
|
import net.corda.nodeapi.RPCApi
|
||||||
import net.corda.nodeapi.externalTrace
|
import net.corda.nodeapi.externalTrace
|
||||||
import net.corda.nodeapi.impersonatedActor
|
import net.corda.nodeapi.impersonatedActor
|
||||||
@ -45,6 +46,8 @@ import java.util.*
|
|||||||
import java.util.concurrent.*
|
import java.util.concurrent.*
|
||||||
import kotlin.concurrent.thread
|
import kotlin.concurrent.thread
|
||||||
|
|
||||||
|
private typealias ObservableSubscriptionMap = Cache<InvocationId, ObservableSubscription>
|
||||||
|
|
||||||
data class RPCServerConfiguration(
|
data class RPCServerConfiguration(
|
||||||
/** The number of threads to use for handling RPC requests */
|
/** The number of threads to use for handling RPC requests */
|
||||||
val rpcThreadPoolSize: Int,
|
val rpcThreadPoolSize: Int,
|
||||||
@ -410,15 +413,18 @@ class RPCServer(
|
|||||||
* muxed correctly. Note that the context construction itself is quite cheap.
|
* muxed correctly. Note that the context construction itself is quite cheap.
|
||||||
*/
|
*/
|
||||||
inner class ObservableContext(
|
inner class ObservableContext(
|
||||||
val observableMap: ObservableSubscriptionMap,
|
override val observableMap: ObservableSubscriptionMap,
|
||||||
val clientAddressToObservables: ConcurrentHashMap<SimpleString, HashSet<InvocationId>>,
|
override val clientAddressToObservables: ConcurrentHashMap<SimpleString, HashSet<InvocationId>>,
|
||||||
val deduplicationIdentity: String,
|
override val deduplicationIdentity: String,
|
||||||
val clientAddress: SimpleString
|
override val clientAddress: SimpleString
|
||||||
) {
|
) : ObservableContextInterface {
|
||||||
private val serializationContextWithObservableContext = RpcServerObservableSerializer.createContext(this)
|
private val serializationContextWithObservableContext = RpcServerObservableSerializer.createContext(
|
||||||
|
observableContext = this,
|
||||||
|
serializationContext = SerializationDefaults.RPC_SERVER_CONTEXT)
|
||||||
|
|
||||||
fun sendMessage(serverToClient: RPCApi.ServerToClient) {
|
override fun sendMessage(serverToClient: RPCApi.ServerToClient) {
|
||||||
sendJobQueue.put(RpcSendJob.Send(contextDatabaseOrNull, clientAddress, serializationContextWithObservableContext, serverToClient))
|
sendJobQueue.put(RpcSendJob.Send(contextDatabaseOrNull, clientAddress,
|
||||||
|
serializationContextWithObservableContext, serverToClient))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -478,4 +484,4 @@ class ObservableSubscription(
|
|||||||
val subscription: Subscription
|
val subscription: Subscription
|
||||||
)
|
)
|
||||||
|
|
||||||
typealias ObservableSubscriptionMap = Cache<InvocationId, ObservableSubscription>
|
|
||||||
|
@ -10,12 +10,12 @@ import net.corda.core.serialization.serialize
|
|||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.core.utilities.debug
|
import net.corda.core.utilities.debug
|
||||||
import net.corda.core.utilities.seconds
|
import net.corda.core.utilities.seconds
|
||||||
|
import net.corda.node.serialization.amqp.AMQPServerSerializationScheme
|
||||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||||
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier
|
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier
|
||||||
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
|
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
|
||||||
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
|
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
|
||||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
|
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.Scheduler
|
import rx.Scheduler
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
|
@ -0,0 +1,100 @@
|
|||||||
|
package net.corda.node.internal.serialization
|
||||||
|
|
||||||
|
import net.corda.client.rpc.internal.ObservableContext as ClientObservableContext
|
||||||
|
import net.corda.core.internal.ThreadBox
|
||||||
|
import net.corda.core.context.Trace
|
||||||
|
import net.corda.node.internal.serialization.testutils.AMQPRoundTripRPCSerializationScheme
|
||||||
|
import net.corda.node.internal.serialization.testutils.TestObservableContext as ServerObservableContext
|
||||||
|
import net.corda.node.services.messaging.ObservableSubscription
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.DeserializationInput
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.SerializationOutput
|
||||||
|
|
||||||
|
import co.paralleluniverse.common.util.SameThreadExecutor
|
||||||
|
import com.github.benmanes.caffeine.cache.Cache
|
||||||
|
import com.github.benmanes.caffeine.cache.Caffeine
|
||||||
|
import com.github.benmanes.caffeine.cache.RemovalListener
|
||||||
|
import com.nhaarman.mockito_kotlin.mock
|
||||||
|
import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer
|
||||||
|
import net.corda.node.internal.serialization.testutils.serializationContext
|
||||||
|
import net.corda.node.serialization.amqp.RpcServerObservableSerializer
|
||||||
|
import net.corda.nodeapi.RPCApi
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
|
import org.junit.Test
|
||||||
|
import rx.Notification
|
||||||
|
import rx.Observable
|
||||||
|
import rx.Subscription
|
||||||
|
import rx.subjects.UnicastSubject
|
||||||
|
import java.time.Instant
|
||||||
|
import java.util.*
|
||||||
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
|
class RoundTripObservableSerializerTests {
|
||||||
|
private fun getID() = Trace.InvocationId("test1", Instant.now())
|
||||||
|
|
||||||
|
private fun subscriptionMap(
|
||||||
|
id: Trace.InvocationId
|
||||||
|
) : Cache<Trace.InvocationId, ObservableSubscription> {
|
||||||
|
val subMap: Cache<Trace.InvocationId, ObservableSubscription> = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
|
||||||
|
.maximumSize(100)
|
||||||
|
.build()
|
||||||
|
|
||||||
|
subMap.put(id, ObservableSubscription(mock<Subscription>()))
|
||||||
|
|
||||||
|
return subMap
|
||||||
|
}
|
||||||
|
|
||||||
|
private val observablesToReap = ThreadBox(object {
|
||||||
|
var observables = ArrayList<Trace.InvocationId>()
|
||||||
|
})
|
||||||
|
|
||||||
|
private fun createRpcObservableMap(): Cache<Trace.InvocationId, UnicastSubject<Notification<*>>> {
|
||||||
|
val onObservableRemove = RemovalListener<Trace.InvocationId, UnicastSubject<Notification<*>>> { key, value, cause ->
|
||||||
|
val observableId = key!!
|
||||||
|
|
||||||
|
observablesToReap.locked { observables.add(observableId) }
|
||||||
|
}
|
||||||
|
|
||||||
|
return Caffeine.newBuilder().weakValues().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()).build()
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun roundTripTest1() {
|
||||||
|
val serializationScheme = AMQPRoundTripRPCSerializationScheme(serializationContext)
|
||||||
|
|
||||||
|
// Fake up a message ID, needs to be used on both "sides". The server setting it in the subscriptionMap,
|
||||||
|
// the client as a property of the deserializer which, in the actual RPC client, is pulled off of
|
||||||
|
// the received message
|
||||||
|
val id : Trace.InvocationId = getID()
|
||||||
|
|
||||||
|
val serverObservableContext = ServerObservableContext(
|
||||||
|
subscriptionMap(id),
|
||||||
|
clientAddressToObservables = ConcurrentHashMap(),
|
||||||
|
deduplicationIdentity = "thisIsATest",
|
||||||
|
clientAddress = SimpleString("clientAddress"))
|
||||||
|
|
||||||
|
val serverSerializer = serializationScheme.rpcServerSerializerFactory(serverObservableContext)
|
||||||
|
|
||||||
|
val clientObservableContext = ClientObservableContext(
|
||||||
|
callSiteMap = null,
|
||||||
|
observableMap = createRpcObservableMap(),
|
||||||
|
hardReferenceStore = Collections.synchronizedSet(mutableSetOf<Observable<*>>())
|
||||||
|
)
|
||||||
|
|
||||||
|
val clientSerializer = serializationScheme.rpcClientSerializerFactory(clientObservableContext, id)
|
||||||
|
|
||||||
|
|
||||||
|
// What we're actually going to serialize then deserialize
|
||||||
|
val obs = Observable.create<Int>({ 12 })
|
||||||
|
|
||||||
|
val serverSerializationContext = RpcServerObservableSerializer.createContext(
|
||||||
|
serializationContext, serverObservableContext)
|
||||||
|
|
||||||
|
val clientSerializationContext = RpcClientObservableSerializer.createContext(
|
||||||
|
serializationContext, clientObservableContext).withProperty(RPCApi.RpcRequestOrObservableIdKey, id)
|
||||||
|
|
||||||
|
|
||||||
|
val blob = SerializationOutput(serverSerializer).serialize(obs, serverSerializationContext)
|
||||||
|
val obs2 = DeserializationInput(clientSerializer).deserialize(blob, clientSerializationContext)
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,87 @@
|
|||||||
|
package net.corda.node.internal.serialization
|
||||||
|
|
||||||
|
import com.github.benmanes.caffeine.cache.Cache
|
||||||
|
import com.github.benmanes.caffeine.cache.Caffeine
|
||||||
|
import com.nhaarman.mockito_kotlin.mock
|
||||||
|
import net.corda.core.context.Trace
|
||||||
|
import net.corda.node.internal.serialization.testutils.*
|
||||||
|
import net.corda.node.serialization.amqp.RpcServerObservableSerializer
|
||||||
|
import net.corda.node.services.messaging.ObservableSubscription
|
||||||
|
import net.corda.nodeapi.internal.serialization.AllWhitelist
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.SerializationOutput
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
|
import org.junit.Test
|
||||||
|
import rx.Observable
|
||||||
|
import rx.Subscription
|
||||||
|
import java.time.Instant
|
||||||
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
import kotlin.test.assertEquals
|
||||||
|
import kotlin.test.assertTrue
|
||||||
|
|
||||||
|
class RpcServerObservableSerializerTests {
|
||||||
|
|
||||||
|
private fun subscriptionMap(): Cache<Trace.InvocationId, ObservableSubscription> {
|
||||||
|
val subMap: Cache<Trace.InvocationId, ObservableSubscription> = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
|
||||||
|
.maximumSize(100)
|
||||||
|
.build()
|
||||||
|
|
||||||
|
subMap.put(Trace.InvocationId("test1", Instant.now()), ObservableSubscription(mock<Subscription>()))
|
||||||
|
|
||||||
|
return subMap
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun canSerializerBeRegistered() {
|
||||||
|
val sf = SerializerFactory(cl = javaClass.classLoader, whitelist = AllWhitelist)
|
||||||
|
|
||||||
|
try {
|
||||||
|
sf.register(RpcServerObservableSerializer())
|
||||||
|
} catch (e: Exception) {
|
||||||
|
throw Error("Observable serializer must be registerable with factory, unexpected exception - ${e.message}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun canAssociateWithContext() {
|
||||||
|
val observable = TestObservableContext(
|
||||||
|
subscriptionMap(),
|
||||||
|
clientAddressToObservables = ConcurrentHashMap(),
|
||||||
|
deduplicationIdentity = "thisIsATest",
|
||||||
|
clientAddress = SimpleString("clientAddress"))
|
||||||
|
|
||||||
|
val newContext = RpcServerObservableSerializer.createContext(serializationContext, observable)
|
||||||
|
|
||||||
|
assertEquals(1, newContext.properties.size)
|
||||||
|
assertTrue(newContext.properties.containsKey(RpcServerObservableSerializer.RpcObservableContextKey))
|
||||||
|
assertEquals(observable, newContext.properties[RpcServerObservableSerializer.RpcObservableContextKey])
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun serialiseFakeObservable() {
|
||||||
|
val testClientAddress = "clientAddres"
|
||||||
|
val observable = TestObservableContext(
|
||||||
|
subscriptionMap(),
|
||||||
|
clientAddressToObservables = ConcurrentHashMap(),
|
||||||
|
deduplicationIdentity = "thisIsATest",
|
||||||
|
clientAddress = SimpleString(testClientAddress))
|
||||||
|
|
||||||
|
val sf = SerializerFactory(
|
||||||
|
cl = javaClass.classLoader,
|
||||||
|
whitelist = AllWhitelist
|
||||||
|
).apply {
|
||||||
|
register(RpcServerObservableSerializer())
|
||||||
|
}
|
||||||
|
|
||||||
|
val obs = Observable.create<Int>({ 12 })
|
||||||
|
val newContext = RpcServerObservableSerializer.createContext(serializationContext, observable)
|
||||||
|
|
||||||
|
try {
|
||||||
|
SerializationOutput(sf).serializeAndReturnSchema(obs, newContext)
|
||||||
|
} catch (e: Exception) {
|
||||||
|
throw Error("Serialization of observable should not throw - ${e.message}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,54 @@
|
|||||||
|
package net.corda.node.internal.serialization.testutils
|
||||||
|
|
||||||
|
import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer
|
||||||
|
import net.corda.core.context.Trace
|
||||||
|
import net.corda.core.cordapp.Cordapp
|
||||||
|
import net.corda.core.serialization.SerializationContext
|
||||||
|
import net.corda.core.serialization.SerializationCustomSerializer
|
||||||
|
import net.corda.node.serialization.amqp.RpcServerObservableSerializer
|
||||||
|
import net.corda.nodeapi.RPCApi
|
||||||
|
import net.corda.nodeapi.internal.serialization.AllWhitelist
|
||||||
|
import net.corda.nodeapi.internal.serialization.CordaSerializationMagic
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
||||||
|
import net.corda.client.rpc.internal.ObservableContext as ClientObservableContext
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Special serialization context for the round trip tests that allows for both server and client RPC
|
||||||
|
* operations
|
||||||
|
*/
|
||||||
|
class AMQPRoundTripRPCSerializationScheme(
|
||||||
|
private val serializationContext: SerializationContext,
|
||||||
|
cordappCustomSerializers: Set<SerializationCustomSerializer<*, *>> = emptySet())
|
||||||
|
: AbstractAMQPSerializationScheme(
|
||||||
|
cordappCustomSerializers
|
||||||
|
) {
|
||||||
|
constructor(
|
||||||
|
serializationContext: SerializationContext,
|
||||||
|
cordapps: List<Cordapp>) : this(serializationContext, cordapps.customSerializers)
|
||||||
|
|
||||||
|
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||||
|
return SerializerFactory(cl = javaClass.classLoader, whitelist = AllWhitelist).apply {
|
||||||
|
register(RpcClientObservableSerializer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||||
|
return SerializerFactory(cl = javaClass.classLoader, whitelist = AllWhitelist).apply {
|
||||||
|
register(RpcServerObservableSerializer())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun canDeserializeVersion(
|
||||||
|
magic: CordaSerializationMagic,
|
||||||
|
target: SerializationContext.UseCase) = true
|
||||||
|
|
||||||
|
fun rpcClientSerializerFactory(observableContext: ClientObservableContext, id: Trace.InvocationId) =
|
||||||
|
rpcClientSerializerFactory(
|
||||||
|
RpcClientObservableSerializer.createContext(serializationContext, observableContext)
|
||||||
|
.withProperty(RPCApi.RpcRequestOrObservableIdKey, id))
|
||||||
|
|
||||||
|
fun rpcServerSerializerFactory(observableContext: TestObservableContext) =
|
||||||
|
rpcServerSerializerFactory(
|
||||||
|
RpcServerObservableSerializer.createContext(serializationContext, observableContext))
|
||||||
|
}
|
@ -0,0 +1,18 @@
|
|||||||
|
package net.corda.node.internal.serialization.testutils
|
||||||
|
|
||||||
|
import com.github.benmanes.caffeine.cache.Cache
|
||||||
|
import net.corda.core.context.Trace
|
||||||
|
import net.corda.node.services.messaging.ObservableContextInterface
|
||||||
|
import net.corda.node.services.messaging.ObservableSubscription
|
||||||
|
import net.corda.nodeapi.RPCApi
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
|
||||||
|
class TestObservableContext(
|
||||||
|
override val observableMap: Cache<Trace.InvocationId, ObservableSubscription>,
|
||||||
|
override val clientAddressToObservables: ConcurrentHashMap<SimpleString, HashSet<Trace.InvocationId>>,
|
||||||
|
override val deduplicationIdentity: String,
|
||||||
|
override val clientAddress: SimpleString
|
||||||
|
) : ObservableContextInterface {
|
||||||
|
override fun sendMessage(serverToClient: RPCApi.ServerToClient) { }
|
||||||
|
}
|
@ -0,0 +1,17 @@
|
|||||||
|
package net.corda.node.internal.serialization.testutils
|
||||||
|
|
||||||
|
import net.corda.core.serialization.SerializationContext
|
||||||
|
import net.corda.nodeapi.internal.serialization.AllWhitelist
|
||||||
|
import net.corda.nodeapi.internal.serialization.SerializationContextImpl
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.amqpMagic
|
||||||
|
|
||||||
|
val serializationProperties: MutableMap<Any, Any> = mutableMapOf()
|
||||||
|
|
||||||
|
val serializationContext = SerializationContextImpl(
|
||||||
|
preferredSerializationVersion = amqpMagic,
|
||||||
|
deserializationClassLoader = ClassLoader.getSystemClassLoader(),
|
||||||
|
whitelist = AllWhitelist,
|
||||||
|
properties = serializationProperties,
|
||||||
|
objectReferencesEnabled = false,
|
||||||
|
useCase = SerializationContext.UseCase.Testing,
|
||||||
|
encoding = null)
|
@ -0,0 +1,11 @@
|
|||||||
|
package net.corda.node.internal.serialization.testutils
|
||||||
|
|
||||||
|
import net.corda.core.serialization.SerializationContext
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
||||||
|
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactoryFactory
|
||||||
|
|
||||||
|
class TestSerializerFactoryFactory : SerializerFactoryFactory() {
|
||||||
|
override fun make(context: SerializationContext): SerializerFactory {
|
||||||
|
return super.make(context)
|
||||||
|
}
|
||||||
|
}
|
@ -1,9 +1,9 @@
|
|||||||
package net.corda.testing.node.internal
|
package net.corda.testing.node.internal
|
||||||
|
|
||||||
import net.corda.client.mock.Generator
|
import net.corda.client.mock.Generator
|
||||||
import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme
|
|
||||||
import net.corda.client.rpc.internal.RPCClient
|
import net.corda.client.rpc.internal.RPCClient
|
||||||
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
||||||
|
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||||
import net.corda.core.concurrent.CordaFuture
|
import net.corda.core.concurrent.CordaFuture
|
||||||
import net.corda.core.context.AuthServiceId
|
import net.corda.core.context.AuthServiceId
|
||||||
import net.corda.core.context.Trace
|
import net.corda.core.context.Trace
|
||||||
@ -23,7 +23,7 @@ import net.corda.node.services.messaging.RPCServerConfiguration
|
|||||||
import net.corda.nodeapi.ArtemisTcpTransport
|
import net.corda.nodeapi.ArtemisTcpTransport
|
||||||
import net.corda.nodeapi.ConnectionDirection
|
import net.corda.nodeapi.ConnectionDirection
|
||||||
import net.corda.nodeapi.RPCApi
|
import net.corda.nodeapi.RPCApi
|
||||||
import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT
|
import net.corda.nodeapi.internal.serialization.AMQP_RPC_CLIENT_CONTEXT
|
||||||
import net.corda.testing.common.internal.testNetworkParameters
|
import net.corda.testing.common.internal.testNetworkParameters
|
||||||
import net.corda.testing.core.MAX_MESSAGE_SIZE
|
import net.corda.testing.core.MAX_MESSAGE_SIZE
|
||||||
import net.corda.testing.driver.JmxPolicy
|
import net.corda.testing.driver.JmxPolicy
|
||||||
@ -513,8 +513,8 @@ class RandomRpcUser {
|
|||||||
val hostAndPort = NetworkHostAndPort.parse(args[1])
|
val hostAndPort = NetworkHostAndPort.parse(args[1])
|
||||||
val username = args[2]
|
val username = args[2]
|
||||||
val password = args[3]
|
val password = args[3]
|
||||||
KryoClientSerializationScheme.initialiseSerialization()
|
AMQPClientSerializationScheme.initialiseSerialization()
|
||||||
val handle = RPCClient<RPCOps>(hostAndPort, null, serializationContext = KRYO_RPC_CLIENT_CONTEXT).start(rpcClass, username, password)
|
val handle = RPCClient<RPCOps>(hostAndPort, null, serializationContext = AMQP_RPC_CLIENT_CONTEXT).start(rpcClass, username, password)
|
||||||
val callGenerators = rpcClass.declaredMethods.map { method ->
|
val callGenerators = rpcClass.declaredMethods.map { method ->
|
||||||
Generator.sequence(method.parameters.map {
|
Generator.sequence(method.parameters.map {
|
||||||
generatorStore[it.type] ?: throw Exception("No generator for ${it.type}")
|
generatorStore[it.type] ?: throw Exception("No generator for ${it.type}")
|
||||||
|
@ -2,7 +2,7 @@ package net.corda.smoketesting
|
|||||||
|
|
||||||
import net.corda.client.rpc.CordaRPCClient
|
import net.corda.client.rpc.CordaRPCClient
|
||||||
import net.corda.client.rpc.CordaRPCConnection
|
import net.corda.client.rpc.CordaRPCConnection
|
||||||
import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme
|
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||||
import net.corda.core.internal.*
|
import net.corda.core.internal.*
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
@ -59,7 +59,7 @@ class NodeProcess(
|
|||||||
val javaPath: Path = Paths.get(System.getProperty("java.home"), "bin", "java")
|
val javaPath: Path = Paths.get(System.getProperty("java.home"), "bin", "java")
|
||||||
val formatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(systemDefault())
|
val formatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(systemDefault())
|
||||||
val defaultNetworkParameters = run {
|
val defaultNetworkParameters = run {
|
||||||
KryoClientSerializationScheme.createSerializationEnv().asContextEnv {
|
AMQPClientSerializationScheme.createSerializationEnv().asContextEnv {
|
||||||
// There are no notaries in the network parameters for smoke test nodes. If this is required then we would
|
// There are no notaries in the network parameters for smoke test nodes. If this is required then we would
|
||||||
// need to introduce the concept of a "network" which predefines the notaries, like the driver and MockNetwork
|
// need to introduce the concept of a "network" which predefines the notaries, like the driver and MockNetwork
|
||||||
NetworkParametersCopier(testNetworkParameters())
|
NetworkParametersCopier(testNetworkParameters())
|
||||||
|
@ -3,6 +3,7 @@ package net.corda.testing.core
|
|||||||
import com.nhaarman.mockito_kotlin.any
|
import com.nhaarman.mockito_kotlin.any
|
||||||
import com.nhaarman.mockito_kotlin.doAnswer
|
import com.nhaarman.mockito_kotlin.doAnswer
|
||||||
import com.nhaarman.mockito_kotlin.whenever
|
import com.nhaarman.mockito_kotlin.whenever
|
||||||
|
import net.corda.core.DoNotImplement
|
||||||
import net.corda.core.internal.staticField
|
import net.corda.core.internal.staticField
|
||||||
import net.corda.core.serialization.internal.SerializationEnvironment
|
import net.corda.core.serialization.internal.SerializationEnvironment
|
||||||
import net.corda.core.serialization.internal.effectiveSerializationEnv
|
import net.corda.core.serialization.internal.effectiveSerializationEnv
|
||||||
|
@ -2,13 +2,13 @@ package net.corda.testing.internal
|
|||||||
|
|
||||||
import com.nhaarman.mockito_kotlin.doNothing
|
import com.nhaarman.mockito_kotlin.doNothing
|
||||||
import com.nhaarman.mockito_kotlin.whenever
|
import com.nhaarman.mockito_kotlin.whenever
|
||||||
import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme
|
|
||||||
import net.corda.core.DoNotImplement
|
import net.corda.core.DoNotImplement
|
||||||
|
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||||
|
import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme
|
||||||
import net.corda.core.serialization.internal.*
|
import net.corda.core.serialization.internal.*
|
||||||
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
||||||
|
import net.corda.node.serialization.amqp.AMQPServerSerializationScheme
|
||||||
import net.corda.nodeapi.internal.serialization.*
|
import net.corda.nodeapi.internal.serialization.*
|
||||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme
|
|
||||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
|
|
||||||
import net.corda.testing.core.SerializationEnvironmentRule
|
import net.corda.testing.core.SerializationEnvironmentRule
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import java.util.concurrent.ExecutorService
|
import java.util.concurrent.ExecutorService
|
||||||
@ -39,8 +39,8 @@ internal fun createTestSerializationEnv(label: String): SerializationEnvironment
|
|||||||
return object : SerializationEnvironmentImpl(
|
return object : SerializationEnvironmentImpl(
|
||||||
factory,
|
factory,
|
||||||
AMQP_P2P_CONTEXT,
|
AMQP_P2P_CONTEXT,
|
||||||
KRYO_RPC_SERVER_CONTEXT,
|
AMQP_RPC_SERVER_CONTEXT,
|
||||||
KRYO_RPC_CLIENT_CONTEXT,
|
AMQP_RPC_CLIENT_CONTEXT,
|
||||||
AMQP_STORAGE_CONTEXT,
|
AMQP_STORAGE_CONTEXT,
|
||||||
KRYO_CHECKPOINT_CONTEXT
|
KRYO_CHECKPOINT_CONTEXT
|
||||||
) {
|
) {
|
||||||
|
@ -1,13 +1,13 @@
|
|||||||
package net.corda.demobench
|
package net.corda.demobench
|
||||||
|
|
||||||
import javafx.scene.image.Image
|
import javafx.scene.image.Image
|
||||||
|
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||||
import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme
|
import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme
|
||||||
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
|
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
|
||||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||||
import net.corda.demobench.views.DemoBenchView
|
import net.corda.demobench.views.DemoBenchView
|
||||||
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
|
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
|
||||||
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
|
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
|
||||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme
|
|
||||||
import tornadofx.*
|
import tornadofx.*
|
||||||
import java.io.InputStreamReader
|
import java.io.InputStreamReader
|
||||||
import java.nio.charset.StandardCharsets.UTF_8
|
import java.nio.charset.StandardCharsets.UTF_8
|
||||||
@ -60,7 +60,7 @@ class DemoBench : App(DemoBenchView::class) {
|
|||||||
nodeSerializationEnv = SerializationEnvironmentImpl(
|
nodeSerializationEnv = SerializationEnvironmentImpl(
|
||||||
SerializationFactoryImpl().apply {
|
SerializationFactoryImpl().apply {
|
||||||
registerScheme(KryoClientSerializationScheme())
|
registerScheme(KryoClientSerializationScheme())
|
||||||
registerScheme(AMQPClientSerializationScheme())
|
registerScheme(AMQPClientSerializationScheme(emptyList()))
|
||||||
},
|
},
|
||||||
AMQP_P2P_CONTEXT)
|
AMQP_P2P_CONTEXT)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user