From f850daa5828e4504ad6831a7bdc95e8db4d7e99b Mon Sep 17 00:00:00 2001 From: Kat Baker Date: Thu, 3 May 2018 16:14:58 +0100 Subject: [PATCH 1/2] CORDA-847 - AMQP RPC * Client and server support for amqp * Observable (and supporting) serialisers Unit Tests * Fixing tests * Test fixes * CORDA-847 - Update api doc with additon of @CordaSerializable annotation * TestFixes * review comments * TestFixes * Test Fix * Test Fix * Test Fix * Test Fix * Test Fix * Test Fix * TestFix * Test Fix * Review Comments --- .ci/api-current.txt | 14 ++ .../client/rpc/BlacklistKotlinClosureTest.kt | 4 +- .../net/corda/client/rpc/CordaRPCClient.kt | 14 +- .../rpc/internal/RPCClientProxyHandler.kt | 2 +- .../amqp/AMQPClientSerializationScheme.kt | 63 ++++++++ .../amqp/RpcClientCordaFutureSerializer.kt | 35 +++++ .../amqp/RpcClientObservableSerializer.kt | 127 ++++++++++++++++ .../kryo/KryoClientSerializationScheme.kt | 15 +- .../kryo/RpcClientObservableSerializer.kt | 9 +- .../rpc/ClientRPCInfrastructureTests.kt | 9 +- .../net/corda/client/rpc/RPCFailureTests.kt | 18 ++- .../net/corda/core/concurrent/CordaFuture.kt | 2 + .../net/corda/core/messaging/FlowHandle.kt | 1 + .../core/serialization/SerializationAPI.kt | 9 ++ .../SerializationCustomSerializer.kt | 2 +- .../exceptions/InternalNodeException.kt | 4 +- .../internal/network/NetworkBootstrapper.kt | 16 +- .../engine/ConnectionStateMachine.kt | 2 +- .../internal/serialization/ClientContexts.kt | 8 +- .../serialization/SerializationScheme.kt | 2 +- .../internal/serialization/ServerContexts.kt | 7 - .../amqp/AMQPSerializationScheme.kt | 61 ++------ .../amqp/custom/ClassSerializer.kt | 1 - .../amqp/custom/InputStreamSerializer.kt | 2 +- .../amqp/custom/RxNotificationSerializer.kt | 28 ++++ .../internal/crypto/X509UtilitiesTest.kt | 10 +- .../amqp/SerializationOutputTests.kt | 1 + .../internal/serialization/kryo/KryoTests.kt | 6 +- .../kotlin/net/corda/node/BootTests.kt | 9 +- .../services/rpc/RpcExceptionHandlingTest.kt | 33 +++-- .../kotlin/net/corda/node/internal/Node.kt | 6 +- .../amqp/AMQPServerSerializationScheme.kt | 46 ++++++ .../amqp/RpcServerCordaFutureSerialiser.kt | 35 +++++ .../amqp/RpcServerObservableSerializer.kt | 140 ++++++++++++++++++ .../kryo/RpcServerObservableSerializer.kt | 2 +- .../messaging/ObservableContextInterface.kt | 16 ++ .../node/services/messaging/RPCServer.kt | 26 ++-- .../node/services/network/NodeInfoWatcher.kt | 2 +- .../RoundTripObservableSerializerTests.kt | 100 +++++++++++++ .../RpcServerObservableSerializerTests.kt | 87 +++++++++++ .../testutils/AMQPTestSerialiationScheme.kt | 54 +++++++ .../testutils/TestObservableContext.kt | 18 +++ .../testutils/TestSerializationContext.kt | 17 +++ .../testutils/TestSerializerFactoryFactory.kt | 11 ++ .../corda/testing/node/internal/RPCDriver.kt | 8 +- .../net/corda/smoketesting/NodeProcess.kt | 4 +- .../testing/core/SerializationTestHelpers.kt | 1 + .../InternalSerializationTestHelpers.kt | 10 +- .../kotlin/net/corda/demobench/DemoBench.kt | 4 +- 49 files changed, 949 insertions(+), 152 deletions(-) create mode 100644 client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/AMQPClientSerializationScheme.kt create mode 100644 client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientCordaFutureSerializer.kt create mode 100644 client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientObservableSerializer.kt create mode 100644 node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/RxNotificationSerializer.kt create mode 100644 node/src/main/kotlin/net/corda/node/serialization/amqp/AMQPServerSerializationScheme.kt create mode 100644 node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerCordaFutureSerialiser.kt create mode 100644 node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerObservableSerializer.kt create mode 100644 node/src/main/kotlin/net/corda/node/services/messaging/ObservableContextInterface.kt create mode 100644 node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt create mode 100644 node/src/test/kotlin/net/corda/node/internal/serialization/RpcServerObservableSerializerTests.kt create mode 100644 node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt create mode 100644 node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestObservableContext.kt create mode 100644 node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestSerializationContext.kt create mode 100644 node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestSerializerFactoryFactory.kt diff --git a/.ci/api-current.txt b/.ci/api-current.txt index b088f61edb..107c0b060f 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -76,10 +76,16 @@ public final class net.corda.core.concurrent.ConcurrencyUtils extends java.lang. @NotNull public static final String shortCircuitedTaskFailedMessage = "Short-circuited task failed:" ## +<<<<<<< HEAD public interface net.corda.core.concurrent.CordaFuture extends java.util.concurrent.Future public abstract void then(kotlin.jvm.functions.Function1, ? extends W>) @NotNull public abstract java.util.concurrent.CompletableFuture toCompletableFuture() +======= +@net.corda.core.serialization.CordaSerializable public interface net.corda.core.concurrent.CordaFuture extends java.util.concurrent.Future + public abstract void then(kotlin.jvm.functions.Function1) + @org.jetbrains.annotations.NotNull public abstract concurrent.CompletableFuture toCompletableFuture() +>>>>>>> CORDA-847 - Update api doc with additon of @CordaSerializable annotation ## @CordaSerializable public final class net.corda.core.context.Actor extends java.lang.Object @@ -2615,8 +2621,12 @@ public final class net.corda.core.messaging.DataFeed extends java.lang.Object public int hashCode() public String toString() ## +<<<<<<< HEAD @DoNotImplement public interface net.corda.core.messaging.FlowHandle extends java.lang.AutoCloseable +======= +@net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public interface net.corda.core.messaging.FlowHandle extends java.lang.AutoCloseable +>>>>>>> CORDA-847 - Update api doc with additon of @CordaSerializable annotation public abstract void close() @NotNull public abstract net.corda.core.flows.StateMachineRunId getId() @@ -2642,8 +2652,12 @@ public final class net.corda.core.messaging.FlowHandleImpl extends java.lang.Obj public int hashCode() public String toString() ## +<<<<<<< HEAD @DoNotImplement public interface net.corda.core.messaging.FlowProgressHandle extends net.corda.core.messaging.FlowHandle +======= +@net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public interface net.corda.core.messaging.FlowProgressHandle extends net.corda.core.messaging.FlowHandle +>>>>>>> CORDA-847 - Update api doc with additon of @CordaSerializable annotation public abstract void close() @NotNull public abstract rx.Observable getProgress() diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/BlacklistKotlinClosureTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/BlacklistKotlinClosureTest.kt index a223cf7210..5a68349f85 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/BlacklistKotlinClosureTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/BlacklistKotlinClosureTest.kt @@ -32,9 +32,9 @@ class BlacklistKotlinClosureTest { driver(DriverParameters(startNodesInProcess = true)) { val rpc = startNode(providedName = ALICE_NAME).getOrThrow().rpc val packet = Packet { EVIL } - assertThatExceptionOfType(KryoException::class.java) + assertThatExceptionOfType(RPCException::class.java) .isThrownBy { rpc.startFlow(::FlowC, packet) } - .withMessageContaining("is not annotated or on the whitelist, so cannot be used in serialization") + .withMessageContaining("is not on the whitelist or annotated with @CordaSerializable") } } } \ No newline at end of file diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index 6080107d3d..4eb3dc4d83 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -1,6 +1,6 @@ package net.corda.client.rpc -import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme +import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.client.rpc.internal.RPCClient import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.context.Actor @@ -11,7 +11,7 @@ import net.corda.core.utilities.NetworkHostAndPort import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.internal.config.SSLConfiguration -import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT +import net.corda.nodeapi.internal.serialization.AMQP_RPC_CLIENT_CONTEXT import java.time.Duration /** @@ -111,7 +111,9 @@ class CordaRPCClient private constructor( private val haAddressPool: List = emptyList() ) { @JvmOverloads - constructor(hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) : this(hostAndPort, configuration, null) + constructor(hostAndPort: NetworkHostAndPort, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) + : this(hostAndPort, configuration, null) /** * @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode. @@ -146,7 +148,7 @@ class CordaRPCClient private constructor( effectiveSerializationEnv } catch (e: IllegalStateException) { try { - KryoClientSerializationScheme.initialiseSerialization(classLoader) + AMQPClientSerializationScheme.initialiseSerialization() } catch (e: IllegalStateException) { // Race e.g. two of these constructed in parallel, ignore. } @@ -158,12 +160,12 @@ class CordaRPCClient private constructor( RPCClient( tcpTransport(ConnectionDirection.Outbound(), hostAndPort, config = sslConfiguration), configuration, - if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT) + if (classLoader != null) AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else AMQP_RPC_CLIENT_CONTEXT) } else { RPCClient(haAddressPool, sslConfiguration, configuration, - if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT) + if (classLoader != null) AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else AMQP_RPC_CLIENT_CONTEXT) } } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index d13116daed..c42138e3fe 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -10,7 +10,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.RPCException import net.corda.client.rpc.RPCSinceVersion -import net.corda.client.rpc.internal.serialization.kryo.RpcClientObservableSerializer +import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer import net.corda.core.context.Actor import net.corda.core.context.Trace import net.corda.core.context.Trace.InvocationId diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/AMQPClientSerializationScheme.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/AMQPClientSerializationScheme.kt new file mode 100644 index 0000000000..c82779fc03 --- /dev/null +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/AMQPClientSerializationScheme.kt @@ -0,0 +1,63 @@ +package net.corda.client.rpc.internal.serialization.amqp + +import net.corda.core.cordapp.Cordapp +import net.corda.core.serialization.ClassWhitelist +import net.corda.core.serialization.SerializationContext +import net.corda.core.serialization.SerializationCustomSerializer +import net.corda.core.serialization.internal.SerializationEnvironment +import net.corda.core.serialization.internal.SerializationEnvironmentImpl +import net.corda.core.serialization.internal.nodeSerializationEnv +import net.corda.nodeapi.internal.serialization.* +import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme +import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory +import net.corda.nodeapi.internal.serialization.amqp.amqpMagic +import java.util.concurrent.ConcurrentHashMap +import net.corda.nodeapi.internal.serialization.amqp.custom.RxNotificationSerializer + +/** + * When set as the serialization scheme for a process, sets it to be the Corda AMQP implementation. + * This scheme is for use by the RPC Client calls. + */ +class AMQPClientSerializationScheme( + cordappCustomSerializers: Set>, + serializerFactoriesForContexts: MutableMap, SerializerFactory> + ) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) { + constructor(cordapps: List) : this(cordapps.customSerializers, ConcurrentHashMap()) + + @Suppress("UNUSED") + constructor() : this(emptySet(), ConcurrentHashMap()) + + companion object { + /** Call from main only. */ + fun initialiseSerialization() { + nodeSerializationEnv = createSerializationEnv() + } + + fun createSerializationEnv(): SerializationEnvironment { + return SerializationEnvironmentImpl( + SerializationFactoryImpl().apply { + registerScheme(AMQPClientSerializationScheme(emptyList())) + }, + storageContext = AMQP_STORAGE_CONTEXT, + p2pContext = AMQP_P2P_CONTEXT, + rpcClientContext = AMQP_RPC_CLIENT_CONTEXT, + rpcServerContext = AMQP_RPC_SERVER_CONTEXT) + } + } + + override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase) = + magic == amqpMagic && ( + target == SerializationContext.UseCase.RPCClient || target == SerializationContext.UseCase.P2P) + + override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { + return SerializerFactory(context.whitelist, ClassLoader.getSystemClassLoader()).apply { + register(RpcClientObservableSerializer) + register(RpcClientCordaFutureSerializer(this)) + register(RxNotificationSerializer(this)) + } + } + + override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory { + throw UnsupportedOperationException() + } +} \ No newline at end of file diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientCordaFutureSerializer.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientCordaFutureSerializer.kt new file mode 100644 index 0000000000..258a2d66ec --- /dev/null +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientCordaFutureSerializer.kt @@ -0,0 +1,35 @@ +package net.corda.client.rpc.internal.serialization.amqp + +import net.corda.core.concurrent.CordaFuture +import net.corda.core.toFuture +import net.corda.core.toObservable +import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer +import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory +import rx.Observable +import java.io.NotSerializableException + +/** + * Serializer for [CordaFuture] instances that can only deserialize such objects (just as the server + * side can only serialize them). Futures will have been converted to an Rx [Observable] for serialization. + */ +class RpcClientCordaFutureSerializer (factory: SerializerFactory) + : CustomSerializer.Proxy, RpcClientCordaFutureSerializer.FutureProxy>( + CordaFuture::class.java, + RpcClientCordaFutureSerializer.FutureProxy::class.java, factory +) { + override fun fromProxy(proxy: FutureProxy): CordaFuture<*> { + try { + return proxy.observable.toFuture() + } catch (e: NotSerializableException) { + throw NotSerializableException("Failed to deserialize Future from proxy Observable - ${e.message}\n").apply { + initCause(e.cause) + } + } + } + + override fun toProxy(obj: CordaFuture<*>): FutureProxy { + throw UnsupportedOperationException() + } + + data class FutureProxy(val observable: Observable<*>) +} \ No newline at end of file diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientObservableSerializer.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientObservableSerializer.kt new file mode 100644 index 0000000000..18a73afa71 --- /dev/null +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientObservableSerializer.kt @@ -0,0 +1,127 @@ +package net.corda.client.rpc.internal.serialization.amqp + + +import net.corda.client.rpc.internal.ObservableContext +import net.corda.core.context.Trace +import net.corda.core.serialization.SerializationContext +import net.corda.nodeapi.RPCApi +import net.corda.nodeapi.internal.serialization.amqp.* +import org.apache.qpid.proton.codec.Data +import rx.Notification +import rx.Observable +import rx.subjects.UnicastSubject +import java.io.NotSerializableException +import java.lang.reflect.Type +import java.time.Instant +import java.util.concurrent.atomic.AtomicInteger +import javax.transaction.NotSupportedException + +/** + * Serializer for Rx[Observable] instances for the RPC Client library. Can only be used to deserialize such objects, + * just as the corresponding RPC server side code can only serialize them. Observables are only notionally serialized, + * what is actually sent is a reference to the observable that can then be subscribed to. + */ +object RpcClientObservableSerializer : CustomSerializer.Implements>(Observable::class.java) { + private object RpcObservableContextKey + + fun createContext( + serializationContext: SerializationContext, + observableContext: ObservableContext + ) = serializationContext.withProperty(RpcObservableContextKey, observableContext) + + private fun pinInSubscriptions(observable: Observable, hardReferenceStore: MutableSet>): Observable { + val refCount = AtomicInteger(0) + return observable.doOnSubscribe { + if (refCount.getAndIncrement() == 0) { + require(hardReferenceStore.add(observable)) { + "Reference store already contained reference $this on add" + } + } + }.doOnUnsubscribe { + if (refCount.decrementAndGet() == 0) { + require(hardReferenceStore.remove(observable)) { + "Reference store did not contain reference $this on remove" + } + } + } + } + + override val schemaForDocumentation = Schema( + listOf( + CompositeType( + name = type.toString(), + label = "", + provides = emptyList(), + descriptor = descriptor, + fields = listOf( + Field( + name = "observableId", + type = "string", + requires = emptyList(), + default = null, + label = null, + mandatory = true, + multiple = false), + Field( + name = "observableInstant", + type = "long", + requires = emptyList(), + default = null, + label = null, + mandatory = true, + multiple = false) + )))) + + /** + * Converts the serialized form, a blob, back into an Observable + */ + override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput, + context: SerializationContext + ): Observable<*> { + if (RpcObservableContextKey !in context.properties) { + throw NotSerializableException("Missing Observable Context Key on Client Context") + } + + val observableContext = + context.properties[RpcClientObservableSerializer.RpcObservableContextKey] as ObservableContext + + if (obj !is List<*>) throw NotSerializableException("Input must be a serialised list") + if (obj.size != 2) throw NotSerializableException("Expecting two elements, have ${obj.size}") + + val observableId: Trace.InvocationId = Trace.InvocationId((obj[0] as String), Instant.ofEpochMilli((obj[1] as Long))) + val observable = UnicastSubject.create>() + + require(observableContext.observableMap.getIfPresent(observableId) == null) { + "Multiple Observables arrived with the same ID $observableId" + } + + val rpcCallSite = getRpcCallSite(context, observableContext) + + observableContext.observableMap.put(observableId, observable) + observableContext.callSiteMap?.put(observableId, rpcCallSite) + + // We pin all Observables into a hard reference store (rooted in the RPC proxy) on subscription so that users + // don't need to store a reference to the Observables themselves. + return pinInSubscriptions(observable, observableContext.hardReferenceStore).doOnUnsubscribe { + // This causes Future completions to give warnings because the corresponding OnComplete sent from the server + // will arrive after the client unsubscribes from the observable and consequently invalidates the mapping. + // The unsubscribe is due to [ObservableToFuture]'s use of first(). + observableContext.observableMap.invalidate(observableId) + }.dematerialize() + } + + private fun getRpcCallSite(context: SerializationContext, observableContext: ObservableContext): Throwable? { + val rpcRequestOrObservableId = context.properties[RPCApi.RpcRequestOrObservableIdKey] as Trace.InvocationId + return observableContext.callSiteMap?.get(rpcRequestOrObservableId) + } + + override fun writeDescribedObject( + obj: Observable<*>, + data: Data, + type: Type, + output: SerializationOutput, + context: SerializationContext + ) { + throw NotSupportedException() + } +} \ No newline at end of file diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/KryoClientSerializationScheme.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/KryoClientSerializationScheme.kt index 06ec72e244..5a5397ca6d 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/KryoClientSerializationScheme.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/KryoClientSerializationScheme.kt @@ -1,20 +1,25 @@ package net.corda.client.rpc.internal.serialization.kryo import com.esotericsoftware.kryo.pool.KryoPool +import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.core.serialization.SerializationContext -import net.corda.nodeapi.internal.serialization.CordaSerializationMagic import net.corda.core.serialization.internal.SerializationEnvironment import net.corda.core.serialization.internal.SerializationEnvironmentImpl import net.corda.core.serialization.internal.nodeSerializationEnv -import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT -import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT -import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl -import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme +import net.corda.nodeapi.internal.serialization.* import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer import net.corda.nodeapi.internal.serialization.kryo.kryoMagic import net.corda.nodeapi.internal.serialization.kryo.RPCKryo +private val KRYO_RPC_CLIENT_CONTEXT = SerializationContextImpl(kryoMagic, + net.corda.core.serialization.SerializationDefaults.javaClass.classLoader, + GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), + emptyMap(), + true, + SerializationContext.UseCase.RPCClient, + null) + class KryoClientSerializationScheme : AbstractKryoSerializationScheme() { override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { return magic == kryoMagic && (target == SerializationContext.UseCase.RPCClient || target == SerializationContext.UseCase.P2P) diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/RpcClientObservableSerializer.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/RpcClientObservableSerializer.kt index 99749093e2..c69081a667 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/RpcClientObservableSerializer.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/RpcClientObservableSerializer.kt @@ -15,12 +15,15 @@ import java.time.Instant import java.util.concurrent.atomic.AtomicInteger /** - * A [Serializer] to deserialise Observables once the corresponding Kryo instance has been provided with an [ObservableContext]. + * A [Serializer] to deserialize Observables once the corresponding Kryo instance has been provided with an [ObservableContext]. */ object RpcClientObservableSerializer : Serializer>() { private object RpcObservableContextKey - fun createContext(serializationContext: SerializationContext, observableContext: ObservableContext): SerializationContext { + fun createContext( + serializationContext: SerializationContext, + observableContext: ObservableContext + ): SerializationContext { return serializationContext.withProperty(RpcObservableContextKey, observableContext) } @@ -72,4 +75,4 @@ object RpcClientObservableSerializer : Serializer>() { val rpcRequestOrObservableId = kryo.context[RPCApi.RpcRequestOrObservableIdKey] as Trace.InvocationId return observableContext.callSiteMap?.get(rpcRequestOrObservableId) } -} +} \ No newline at end of file diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/ClientRPCInfrastructureTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/ClientRPCInfrastructureTests.kt index 43cd0c1ce1..d2a0a2c977 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/ClientRPCInfrastructureTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/ClientRPCInfrastructureTests.kt @@ -1,5 +1,6 @@ package net.corda.client.rpc +import net.corda.core.CordaRuntimeException import net.corda.core.concurrent.CordaFuture import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.openFuture @@ -11,6 +12,7 @@ import net.corda.testing.node.internal.RPCDriverDSL import net.corda.testing.node.internal.rpcDriver import net.corda.testing.node.internal.rpcTestUser import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -77,9 +79,10 @@ class ClientRPCInfrastructureTests : AbstractRPCTest() { // Does nothing, doesn't throw. proxy.void() - assertEquals("Barf!", assertFailsWith { - proxy.barf() - }.message) + assertThatThrownBy { proxy.barf() } + .isInstanceOf(CordaRuntimeException::class.java) + .hasMessage("java.lang.IllegalArgumentException: Barf!") + assertEquals("hi 5", proxy.someCalculation("hi", 5)) } diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCFailureTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCFailureTests.kt index fec7adae5d..2ab308af1d 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCFailureTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCFailureTests.kt @@ -1,6 +1,6 @@ package net.corda.client.rpc -import com.esotericsoftware.kryo.KryoException +import net.corda.core.CordaRuntimeException import net.corda.core.concurrent.CordaFuture import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.* @@ -48,23 +48,29 @@ class RPCFailureTests { @Test fun `kotlin NPE`() = rpc { - assertThatThrownBy { it.kotlinNPE() }.isInstanceOf(KotlinNullPointerException::class.java) + assertThatThrownBy { it.kotlinNPE() }.isInstanceOf(CordaRuntimeException::class.java) + .hasMessageContaining("kotlin.KotlinNullPointerException") } @Test fun `kotlin NPE async`() = rpc { val future = it.kotlinNPEAsync() - assertThatThrownBy { future.getOrThrow() }.isInstanceOf(KotlinNullPointerException::class.java) + assertThatThrownBy { future.getOrThrow() }.isInstanceOf(CordaRuntimeException::class.java) + .hasMessageContaining("kotlin.KotlinNullPointerException") } @Test - fun unserializable() = rpc { - assertThatThrownBy { it.getUnserializable() }.isInstanceOf(KryoException::class.java) + fun `unserializable`() = rpc { + assertThatThrownBy { it.getUnserializable() }.isInstanceOf(CordaRuntimeException::class.java) + .hasMessageContaining("java.io.NotSerializableException:") + .hasMessageContaining("Unserializable is not on the whitelist or annotated with @CordaSerializable.") } @Test fun `unserializable async`() = rpc { val future = it.getUnserializableAsync() - assertThatThrownBy { future.getOrThrow() }.isInstanceOf(KryoException::class.java) + assertThatThrownBy { future.getOrThrow() }.isInstanceOf(CordaRuntimeException::class.java) + .hasMessageContaining("java.io.NotSerializableException:") + .hasMessageContaining("Unserializable is not on the whitelist or annotated with @CordaSerializable.") } } diff --git a/core/src/main/kotlin/net/corda/core/concurrent/CordaFuture.kt b/core/src/main/kotlin/net/corda/core/concurrent/CordaFuture.kt index 4977f3a34b..2f6d95795a 100644 --- a/core/src/main/kotlin/net/corda/core/concurrent/CordaFuture.kt +++ b/core/src/main/kotlin/net/corda/core/concurrent/CordaFuture.kt @@ -1,5 +1,6 @@ package net.corda.core.concurrent +import net.corda.core.serialization.CordaSerializable import java.util.concurrent.CompletableFuture import java.util.concurrent.Future @@ -7,6 +8,7 @@ import java.util.concurrent.Future * Same as [Future] with additional methods to provide some of the features of [java.util.concurrent.CompletableFuture] while minimising the API surface area. * In Kotlin, to avoid compile errors, whenever CordaFuture is used in a parameter or extension method receiver type, its type parameter should be specified with out variance. */ +@CordaSerializable interface CordaFuture : Future { /** * Run the given callback when this future is done, on the completion thread. diff --git a/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt b/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt index ad4d40ec16..4d540d69c8 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt @@ -10,6 +10,7 @@ import rx.Observable * [FlowHandle] is a serialisable handle for the started flow, parameterised by the type of the flow's return value. */ @DoNotImplement +@CordaSerializable interface FlowHandle : AutoCloseable { /** * The started state machine's ID. diff --git a/core/src/main/kotlin/net/corda/core/serialization/SerializationAPI.kt b/core/src/main/kotlin/net/corda/core/serialization/SerializationAPI.kt index dd94c74898..c9b8cc686c 100644 --- a/core/src/main/kotlin/net/corda/core/serialization/SerializationAPI.kt +++ b/core/src/main/kotlin/net/corda/core/serialization/SerializationAPI.kt @@ -185,6 +185,15 @@ interface SerializationContext { enum class UseCase { P2P, RPCServer, RPCClient, Storage, Checkpoint, Testing } } +/** + * Set of well known properties that may be set on a serialization context. This doesn't preclude + * others being set that aren't keyed on this enumeration, but for general use properties adding a + * well known key here is preferred. + */ +enum class ContextPropertyKeys { + SERIALIZERS +} + /** * Global singletons to be used as defaults that are injected elsewhere (generally, in the node or in RPC client). */ diff --git a/core/src/main/kotlin/net/corda/core/serialization/SerializationCustomSerializer.kt b/core/src/main/kotlin/net/corda/core/serialization/SerializationCustomSerializer.kt index 3de86d449d..05852302ce 100644 --- a/core/src/main/kotlin/net/corda/core/serialization/SerializationCustomSerializer.kt +++ b/core/src/main/kotlin/net/corda/core/serialization/SerializationCustomSerializer.kt @@ -6,7 +6,7 @@ package net.corda.core.serialization * a proxy serializer can be written that extends this type whose purpose is to move between those an * unserializable types and an intermediate representation. * - * NOTE: The proxy object should be specified as a seperate class. However, this can be defined within the + * NOTE: The proxy object should be specified as a separate class. However, this can be defined within the * scope of the custom serializer. */ interface SerializationCustomSerializer { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/exceptions/InternalNodeException.kt b/node-api/src/main/kotlin/net/corda/nodeapi/exceptions/InternalNodeException.kt index 0751e2681a..78e6dee579 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/exceptions/InternalNodeException.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/exceptions/InternalNodeException.kt @@ -28,7 +28,9 @@ class InternalNodeException(message: String) : CordaRuntimeException(message) { (wrapped as? CordaRuntimeException)?.setCause(null) return when { whitelisted.any { it.isInstance(wrapped) } -> wrapped - else -> InternalNodeException(DEFAULT_MESSAGE) + else -> InternalNodeException(DEFAULT_MESSAGE).apply { + stackTrace = emptyArray() + } } } } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt index 395c08ddfa..30958d8546 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt @@ -23,10 +23,11 @@ import net.corda.nodeapi.internal.ContractsJarFile import net.corda.nodeapi.internal.DEV_ROOT_CA import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX -import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT import net.corda.nodeapi.internal.serialization.CordaSerializationMagic +import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl -import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme +import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme +import net.corda.nodeapi.internal.serialization.amqp.amqpMagic import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme import net.corda.nodeapi.internal.serialization.kryo.kryoMagic import java.nio.file.Path @@ -278,7 +279,7 @@ class NetworkBootstrapper { _contextSerializationEnv.set(SerializationEnvironmentImpl( SerializationFactoryImpl().apply { registerScheme(KryoParametersSerializationScheme) - registerScheme(AMQPServerSerializationScheme()) + registerScheme(AMQPParametersSerializationScheme) }, AMQP_P2P_CONTEXT) ) @@ -292,4 +293,13 @@ class NetworkBootstrapper { override fun rpcClientKryoPool(context: SerializationContext) = throw UnsupportedOperationException() override fun rpcServerKryoPool(context: SerializationContext) = throw UnsupportedOperationException() } + + private object AMQPParametersSerializationScheme : AbstractAMQPSerializationScheme(emptyList()) { + override fun rpcClientSerializerFactory(context: SerializationContext) = throw UnsupportedOperationException() + override fun rpcServerSerializerFactory(context: SerializationContext) = throw UnsupportedOperationException() + + override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { + return magic == amqpMagic && target == SerializationContext.UseCase.P2P + } + } } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt index c3a3fcce91..bb079ce7e4 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt @@ -352,7 +352,7 @@ internal class ConnectionStateMachine(serverMode: Boolean, val connection = event.connection val channel = connection?.context as? Channel if (channel != null) { - val appProperties = HashMap(amqpMessage.applicationProperties.value as Map) + val appProperties = HashMap(amqpMessage.applicationProperties.value) appProperties["_AMQ_VALIDATED_USER"] = remoteLegalName val localAddress = channel.localAddress() as InetSocketAddress val remoteAddress = channel.remoteAddress() as InetSocketAddress diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ClientContexts.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ClientContexts.kt index e4e2f53417..69a3efd3e7 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ClientContexts.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ClientContexts.kt @@ -13,13 +13,7 @@ import net.corda.nodeapi.internal.serialization.kryo.kryoMagic * servers from trying to instantiate any of them. */ -val KRYO_RPC_CLIENT_CONTEXT = SerializationContextImpl(kryoMagic, - SerializationDefaults.javaClass.classLoader, - GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), - emptyMap(), - true, - SerializationContext.UseCase.RPCClient, - null) + val AMQP_RPC_CLIENT_CONTEXT = SerializationContextImpl(amqpMagic, SerializationDefaults.javaClass.classLoader, GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt index a94aad8786..2163459104 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt @@ -115,7 +115,7 @@ open class SerializationFactoryImpl( return schemes.computeIfAbsent(lookupKey) { registeredSchemes.filter { it.canDeserializeVersion(magic, target) }.forEach { return@computeIfAbsent it } // XXX: Not single? logger.warn("Cannot find serialization scheme for: $lookupKey, registeredSchemes are: $registeredSchemes") - throw UnsupportedOperationException("Serialization scheme not supported.") + throw UnsupportedOperationException("Serialization scheme $lookupKey not supported.") } to magic } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt index ae2e8cdb67..2eb7485d6a 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt @@ -17,13 +17,6 @@ import net.corda.nodeapi.internal.serialization.kryo.kryoMagic * MUST be kept separate! */ -val KRYO_RPC_SERVER_CONTEXT = SerializationContextImpl(kryoMagic, - SerializationDefaults.javaClass.classLoader, - GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), - emptyMap(), - true, - SerializationContext.UseCase.RPCServer, - null) val AMQP_STORAGE_CONTEXT = SerializationContextImpl(amqpMagic, SerializationDefaults.javaClass.classLoader, diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt index 505228a377..edba2095ed 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt @@ -5,6 +5,7 @@ package net.corda.nodeapi.internal.serialization.amqp import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner import net.corda.core.cordapp.Cordapp import net.corda.core.internal.objectOrNewInstance +import net.corda.core.internal.uncheckedCast import net.corda.core.serialization.* import net.corda.core.utilities.ByteSequence import net.corda.nodeapi.internal.serialization.CordaSerializationMagic @@ -12,7 +13,6 @@ import net.corda.nodeapi.internal.serialization.DefaultWhitelist import net.corda.nodeapi.internal.serialization.MutableClassWhitelist import net.corda.nodeapi.internal.serialization.SerializationScheme import java.lang.reflect.Modifier -import java.security.PublicKey import java.util.* import java.util.concurrent.ConcurrentHashMap @@ -118,6 +118,12 @@ abstract class AbstractAMQPSerializationScheme( factory.registerExternal(CorDappCustomSerializer(customSerializer, factory)) } } + + context.properties[ContextPropertyKeys.SERIALIZERS]?.apply { + uncheckedCast>>(this).forEach { + factory.register(it) + } + } } /* @@ -131,7 +137,9 @@ abstract class AbstractAMQPSerializationScheme( protected abstract fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory protected abstract fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory - protected open val publicKeySerializer: CustomSerializer.Implements = net.corda.nodeapi.internal.serialization.amqp.custom.PublicKeySerializer + + // Not used as a simple direct import to facilitate testing + open val publicKeySerializer : CustomSerializer<*> = net.corda.nodeapi.internal.serialization.amqp.custom.PublicKeySerializer private fun getSerializerFactory(context: SerializationContext): SerializerFactory { return serializerFactoriesForContexts.computeIfAbsent(Pair(context.whitelist, context.deserializationClassLoader)) { @@ -162,52 +170,3 @@ abstract class AbstractAMQPSerializationScheme( protected fun canDeserializeVersion(magic: CordaSerializationMagic) = magic == amqpMagic } - -// TODO: This will eventually cover server RPC as well and move to node module, but for now this is not implemented -class AMQPServerSerializationScheme( - cordappCustomSerializers: Set>, - serializerFactoriesForContexts: MutableMap, SerializerFactory> -) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) { - constructor(cordapps: List) : this(cordapps.customSerializers, ConcurrentHashMap()) - - constructor() : this(emptySet(), ConcurrentHashMap()) - - override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { - throw UnsupportedOperationException() - } - - override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory { - throw UnsupportedOperationException() - } - - override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { - return canDeserializeVersion(magic) && - (target == SerializationContext.UseCase.P2P || target == SerializationContext.UseCase.Storage) - } - -} - -// TODO: This will eventually cover client RPC as well and move to client module, but for now this is not implemented -class AMQPClientSerializationScheme( - cordappCustomSerializers: Set>, - serializerFactoriesForContexts: MutableMap, SerializerFactory> -) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) { - constructor(cordapps: List) : this(cordapps.customSerializers, ConcurrentHashMap()) - - constructor() : this(emptySet(), ConcurrentHashMap()) - - override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { - throw UnsupportedOperationException() - } - - override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory { - throw UnsupportedOperationException() - } - - override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { - return canDeserializeVersion(magic) && - (target == SerializationContext.UseCase.P2P || target == SerializationContext.UseCase.Storage) - } - -} - diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/ClassSerializer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/ClassSerializer.kt index 99dd1b9ff6..7c399d5a68 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/ClassSerializer.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/ClassSerializer.kt @@ -2,7 +2,6 @@ package net.corda.nodeapi.internal.serialization.amqp.custom import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory -import net.corda.nodeapi.internal.serialization.amqp.custom.ClassSerializer.ClassProxy /** * A serializer for [Class] that uses [ClassProxy] proxy object to write out diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/InputStreamSerializer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/InputStreamSerializer.kt index faa4f5eebf..0ab61b41c8 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/InputStreamSerializer.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/InputStreamSerializer.kt @@ -39,7 +39,7 @@ object InputStreamSerializer : CustomSerializer.Implements(InputStr override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput, context: SerializationContext - ): InputStream { + ) : InputStream { val bits = input.readObject(obj, schemas, ByteArray::class.java, context) as ByteArray return bits.inputStream() } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/RxNotificationSerializer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/RxNotificationSerializer.kt new file mode 100644 index 0000000000..07dcd561a8 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/RxNotificationSerializer.kt @@ -0,0 +1,28 @@ +package net.corda.nodeapi.internal.serialization.amqp.custom + +import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer +import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory +import rx.Notification + +class RxNotificationSerializer( + factory: SerializerFactory +) : CustomSerializer.Proxy, RxNotificationSerializer.Proxy>( + Notification::class.java, + Proxy::class.java, + factory +) { + data class Proxy( + val kind: Notification.Kind, + val t: Throwable?, + val value: Any?) + + override fun toProxy(obj: Notification<*>) = Proxy(obj.kind, obj.throwable, obj.value) + + override fun fromProxy(proxy: Proxy): Notification<*> { + return when (proxy.kind) { + Notification.Kind.OnCompleted -> Notification.createOnCompleted() + Notification.Kind.OnError -> Notification.createOnError(proxy.t) + Notification.Kind.OnNext -> Notification.createOnNext(proxy.value) + } + } +} \ No newline at end of file diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt index 5986848c74..5508e857d9 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt @@ -8,12 +8,14 @@ import net.corda.core.internal.div import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize +import net.corda.node.serialization.amqp.AMQPServerSerializationScheme import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.nodeapi.internal.config.SSLConfiguration import net.corda.nodeapi.internal.createDevKeyStores import net.corda.nodeapi.internal.serialization.AllWhitelist import net.corda.nodeapi.internal.serialization.SerializationContextImpl import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl +import net.corda.nodeapi.internal.serialization.amqp.amqpMagic import net.corda.nodeapi.internal.serialization.kryo.kryoMagic import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.BOB_NAME @@ -335,8 +337,8 @@ class X509UtilitiesTest { @Test fun `serialize - deserialize X509Certififcate`() { - val factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) } - val context = SerializationContextImpl(kryoMagic, + val factory = SerializationFactoryImpl().apply { registerScheme(AMQPServerSerializationScheme()) } + val context = SerializationContextImpl(amqpMagic, javaClass.classLoader, AllWhitelist, emptyMap(), @@ -351,8 +353,8 @@ class X509UtilitiesTest { @Test fun `serialize - deserialize X509CertPath`() { - val factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) } - val context = SerializationContextImpl(kryoMagic, + val factory = SerializationFactoryImpl().apply { registerScheme(AMQPServerSerializationScheme()) } + val context = SerializationContextImpl(amqpMagic, javaClass.classLoader, AllWhitelist, emptyMap(), diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationOutputTests.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationOutputTests.kt index d3bea49d1b..468a46a465 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationOutputTests.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationOutputTests.kt @@ -30,6 +30,7 @@ import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationFactory import net.corda.core.transactions.LedgerTransaction import net.corda.core.utilities.OpaqueBytes +import net.corda.node.serialization.amqp.AMQPServerSerializationScheme import net.corda.nodeapi.internal.DEV_INTERMEDIATE_CA import net.corda.nodeapi.internal.crypto.ContentSignerBuilder import net.corda.nodeapi.internal.serialization.AllWhitelist diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt index 7f8a8c302b..860c7b64c0 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt @@ -12,6 +12,7 @@ import net.corda.core.contracts.PrivacySalt import net.corda.core.crypto.* import net.corda.core.internal.FetchDataFlow import net.corda.core.serialization.* +import net.corda.core.utilities.ByteSequence import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.sequence import net.corda.node.serialization.kryo.KryoServerSerializationScheme @@ -76,11 +77,12 @@ class KryoTests(private val compression: CordaSerializationEncoding?) { assertThat(bits.deserialize(factory, context)).isEqualTo(Person("bob", null)) } + @Test fun `serialised form is stable when the same object instance is added to the deserialised object graph`() { val noReferencesContext = context.withoutReferences() - val obj = Ints.toByteArray(0x01234567).sequence() - val originalList = arrayListOf(obj) + val obj : ByteSequence = Ints.toByteArray(0x01234567).sequence() + val originalList : ArrayList = arrayListOf(obj) val deserialisedList = originalList.serialize(factory, noReferencesContext).deserialize(factory, noReferencesContext) originalList += obj deserialisedList += obj diff --git a/node/src/integration-test/kotlin/net/corda/node/BootTests.kt b/node/src/integration-test/kotlin/net/corda/node/BootTests.kt index da6c55249c..375c96f0b7 100644 --- a/node/src/integration-test/kotlin/net/corda/node/BootTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/BootTests.kt @@ -2,6 +2,7 @@ package net.corda.node import co.paralleluniverse.fibers.Suspendable import net.corda.client.rpc.CordaRPCClient +import net.corda.core.CordaRuntimeException import net.corda.core.flows.FlowLogic import net.corda.core.flows.StartableByRPC import net.corda.core.internal.div @@ -11,6 +12,7 @@ import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow import net.corda.node.internal.NodeStartup import net.corda.node.services.Permissions.Companion.startFlow +import net.corda.nodeapi.exceptions.InternalNodeException import net.corda.testing.common.internal.ProjectStructure.projectRootDir import net.corda.testing.core.ALICE_NAME import net.corda.testing.driver.DriverParameters @@ -28,8 +30,11 @@ class BootTests { fun `java deserialization is disabled`() { driver { val user = User("u", "p", setOf(startFlow())) - val future = CordaRPCClient(startNode(rpcUsers = listOf(user)).getOrThrow().rpcAddress).start(user.username, user.password).proxy.startFlow(::ObjectInputStreamFlow).returnValue - assertThatThrownBy { future.getOrThrow() }.isInstanceOf(InvalidClassException::class.java).hasMessage("filter status: REJECTED") + val future = CordaRPCClient(startNode(rpcUsers = listOf(user)).getOrThrow().rpcAddress). + start(user.username, user.password).proxy.startFlow(::ObjectInputStreamFlow).returnValue + assertThatThrownBy { future.getOrThrow() } + .isInstanceOf(CordaRuntimeException::class.java) + .hasMessageContaining(InternalNodeException.defaultMessage()) } } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcExceptionHandlingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcExceptionHandlingTest.kt index 21c6be7f50..d6714f506f 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcExceptionHandlingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcExceptionHandlingTest.kt @@ -33,12 +33,12 @@ class RpcExceptionHandlingTest { val node = startNode(NodeParameters(rpcUsers = users)).getOrThrow() - assertThatCode { node.rpc.startFlow(::Flow).returnValue.getOrThrow() } - .isInstanceOfSatisfying(InternalNodeException::class.java) { exception -> - assertThat(exception).hasNoCause() - assertThat(exception.stackTrace).isEmpty() - assertThat(exception.message).isEqualTo(InternalNodeException.defaultMessage()) - } + assertThatCode { node.rpc.startFlow(::Flow).returnValue.getOrThrow() }.isInstanceOfSatisfying(InternalNodeException::class.java) { exception -> + + assertThat(exception).hasNoCause() + assertThat(exception.stackTrace).isEmpty() + assertThat(exception.message).isEqualTo(InternalNodeException.defaultMessage()) + } } } @@ -49,12 +49,12 @@ class RpcExceptionHandlingTest { val node = startNode(NodeParameters(rpcUsers = users)).getOrThrow() val clientRelevantMessage = "This is for the players!" - assertThatCode { node.rpc.startFlow(::ClientRelevantErrorFlow, clientRelevantMessage).returnValue.getOrThrow() } - .isInstanceOfSatisfying(ClientRelevantException::class.java) { exception -> - assertThat(exception).hasNoCause() - assertThat(exception.stackTrace).isEmpty() - assertThat(exception.message).isEqualTo(clientRelevantMessage) - } + assertThatCode { node.rpc.startFlow(::ClientRelevantErrorFlow, clientRelevantMessage).returnValue.getOrThrow() }.isInstanceOfSatisfying(ClientRelevantException::class.java) { exception -> + + assertThat(exception).hasNoCause() + assertThat(exception.stackTrace).isEmpty() + assertThat(exception.message).isEqualTo(clientRelevantMessage) + } } } @@ -81,10 +81,11 @@ class RpcExceptionHandlingTest { assertThatCode { nodeA.rpc.startFlow(::InitFlow, nodeB.nodeInfo.singleIdentity()).returnValue.getOrThrow() } .isInstanceOfSatisfying(InternalNodeException::class.java) { exception -> - assertThat(exception).hasNoCause() - assertThat(exception.stackTrace).isEmpty() - assertThat(exception.message).isEqualTo(InternalNodeException.defaultMessage()) - } + + assertThat(exception).hasNoCause() + assertThat(exception.stackTrace).isEmpty() + assertThat(exception.message).isEqualTo(InternalNodeException.defaultMessage()) + } } } } diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 16dcdeec41..26fa515e6d 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -26,6 +26,7 @@ import net.corda.node.internal.artemis.BrokerAddresses import net.corda.node.internal.cordapp.CordappLoader import net.corda.node.internal.security.RPCSecurityManagerImpl import net.corda.node.internal.security.RPCSecurityManagerWithAdditionalUser +import net.corda.node.serialization.amqp.AMQPServerSerializationScheme import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.node.services.api.NodePropertiesStore import net.corda.node.services.api.SchemaService @@ -42,7 +43,6 @@ import net.corda.nodeapi.internal.addShutdownHook import net.corda.nodeapi.internal.bridging.BridgeControlListener import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.serialization.* -import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme import org.slf4j.Logger import org.slf4j.LoggerFactory import rx.Scheduler @@ -389,10 +389,10 @@ open class Node(configuration: NodeConfiguration, registerScheme(KryoClientSerializationScheme()) }, p2pContext = AMQP_P2P_CONTEXT.withClassLoader(classloader), - rpcServerContext = KRYO_RPC_SERVER_CONTEXT.withClassLoader(classloader), + rpcServerContext = AMQP_RPC_SERVER_CONTEXT.withClassLoader(classloader), storageContext = AMQP_STORAGE_CONTEXT.withClassLoader(classloader), checkpointContext = KRYO_CHECKPOINT_CONTEXT.withClassLoader(classloader), - rpcClientContext = if (configuration.shouldInitCrashShell()) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classloader) else null) //even Shell embeded in the node connects via RPC to the node + rpcClientContext = if (configuration.shouldInitCrashShell()) AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classloader) else null) //even Shell embeded in the node connects via RPC to the node } private var rpcMessagingClient: RPCMessagingClient? = null diff --git a/node/src/main/kotlin/net/corda/node/serialization/amqp/AMQPServerSerializationScheme.kt b/node/src/main/kotlin/net/corda/node/serialization/amqp/AMQPServerSerializationScheme.kt new file mode 100644 index 0000000000..b645b1289d --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/serialization/amqp/AMQPServerSerializationScheme.kt @@ -0,0 +1,46 @@ +package net.corda.node.serialization.amqp + +import net.corda.core.cordapp.Cordapp +import net.corda.core.serialization.ClassWhitelist +import net.corda.core.serialization.SerializationContext +import net.corda.core.serialization.SerializationCustomSerializer +import net.corda.nodeapi.internal.serialization.CordaSerializationMagic +import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme +import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory +import net.corda.nodeapi.internal.serialization.amqp.custom.RxNotificationSerializer + +import java.util.concurrent.ConcurrentHashMap + +/** + * When set as the serialization scheme, defines the RPC Server serialization scheme as using the Corda + * AMQP implementation. + */ +class AMQPServerSerializationScheme( + cordappCustomSerializers: Set>, + serializerFactoriesForContexts: MutableMap, SerializerFactory> +) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) { + constructor(cordapps: List) : this(cordapps.customSerializers, ConcurrentHashMap()) + + constructor() : this(emptySet(), ConcurrentHashMap()) + + override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { + throw UnsupportedOperationException() + } + + override fun rpcServerSerializerFactory(context: SerializationContext) = + SerializerFactory( + context.whitelist, + context.deserializationClassLoader + ).apply { + register(RpcServerObservableSerializer()) + register(RpcServerCordaFutureSerializer(this)) + register(RxNotificationSerializer(this)) + } + + override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { + return canDeserializeVersion(magic) && + ( target == SerializationContext.UseCase.P2P + || target == SerializationContext.UseCase.Storage + || target == SerializationContext.UseCase.RPCServer) + } +} diff --git a/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerCordaFutureSerialiser.kt b/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerCordaFutureSerialiser.kt new file mode 100644 index 0000000000..f66e3c5daf --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerCordaFutureSerialiser.kt @@ -0,0 +1,35 @@ +package net.corda.node.serialization.amqp + +import net.corda.core.concurrent.CordaFuture +import net.corda.core.toObservable +import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer +import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory +import rx.Observable +import java.io.NotSerializableException + +/** + * Serializer for [CordaFuture] objects where Futures are converted to Observables and + * are thus dealt with by the [RpcServerObservableSerializer] + */ +class RpcServerCordaFutureSerializer(factory: SerializerFactory) + : CustomSerializer.Proxy, + RpcServerCordaFutureSerializer.FutureProxy>( + CordaFuture::class.java, RpcServerCordaFutureSerializer.FutureProxy::class.java, factory +) { + override fun fromProxy(proxy: RpcServerCordaFutureSerializer.FutureProxy): CordaFuture<*> { + throw UnsupportedOperationException() + } + + override fun toProxy(obj: CordaFuture<*>): RpcServerCordaFutureSerializer.FutureProxy { + try { + return FutureProxy(obj.toObservable()) + } catch (e: NotSerializableException) { + throw (NotSerializableException("Failed to serialize Future as proxy Observable - ${e.message}")) + } + } + + data class FutureProxy(val observable: Observable<*>) +} + + + diff --git a/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerObservableSerializer.kt b/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerObservableSerializer.kt new file mode 100644 index 0000000000..312a4906cc --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerObservableSerializer.kt @@ -0,0 +1,140 @@ +package net.corda.node.serialization.amqp + +import net.corda.core.context.Trace +import net.corda.core.serialization.SerializationContext +import net.corda.core.utilities.loggerFor +import net.corda.node.services.messaging.ObservableContextInterface +import net.corda.node.services.messaging.ObservableSubscription +import net.corda.nodeapi.RPCApi +import net.corda.nodeapi.internal.serialization.amqp.* +import org.apache.qpid.proton.codec.Data + +import rx.Notification +import rx.Observable +import rx.Subscriber +import java.io.NotSerializableException + +import java.lang.reflect.Type + +/** + * Server side serializer that notionally serializes RxObservables when used by the RPC + * framework for event subscriptions. Notional in the sense that the actual observable + * isn't serialized, rather a reference to the observable is, this is then used by + * the client side RPC handler to subscribe to the observable stream. + */ +class RpcServerObservableSerializer : CustomSerializer.Implements>( + Observable::class.java +) { + // Would be great to make this private, but then it's so much harder to unit test + object RpcObservableContextKey + + companion object { + fun createContext( + serializationContext: SerializationContext, + observableContext: ObservableContextInterface + ) = serializationContext.withProperty( + RpcServerObservableSerializer.RpcObservableContextKey, observableContext) + } + + override val schemaForDocumentation = Schema( + listOf( + CompositeType( + name = type.toString(), + label = "", + provides = emptyList(), + descriptor = descriptor, + fields = listOf( + Field( + name = "observableId", + type = "string", + requires = emptyList(), + default = null, + label = null, + mandatory = true, + multiple = false), + Field( + name = "observableInstant", + type = "long", + requires = emptyList(), + default = null, + label = null, + mandatory = true, + multiple = false) + )))) + + override fun readObject( + obj: Any, schemas: SerializationSchemas, + input: DeserializationInput, + context: SerializationContext + ): Observable<*> { + throw UnsupportedOperationException() + } + + override fun writeDescribedObject( + obj: Observable<*>, + data: Data, + type: Type, + output: SerializationOutput, + context: SerializationContext + ) { + val observableId = Trace.InvocationId.newInstance() + if (RpcServerObservableSerializer.RpcObservableContextKey !in context.properties) { + throw NotSerializableException("Missing Observable Key on serialization context - $type") + } + + val observableContext = context.properties[RpcServerObservableSerializer.RpcObservableContextKey] + as ObservableContextInterface + + data.withList { + data.putString(observableId.value) + data.putLong(observableId.timestamp.toEpochMilli()) + } + + val observableWithSubscription = ObservableSubscription( + subscription = obj.materialize().subscribe( + object : Subscriber>() { + override fun onNext(observation: Notification<*>) { + if (!isUnsubscribed) { + val message = RPCApi.ServerToClient.Observation( + id = observableId, + content = observation, + deduplicationIdentity = observableContext.deduplicationIdentity + ) + observableContext.sendMessage(message) + } + } + + override fun onError(exception: Throwable) { + loggerFor().error( + "onError called in materialize()d RPC Observable", exception) + } + + override fun onCompleted() { + observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> + if (observables != null) { + observables.remove(observableId) + if (observables.isEmpty()) { + null + } else { + observables + } + } else { + null + } + } + } + } + ) + ) + + observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> + if (observables == null) { + hashSetOf(observableId) + } else { + observables.add(observableId) + observables + } + } + observableContext.observableMap.put(observableId, observableWithSubscription) + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt b/node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt index 51013a9bce..a51448e83a 100644 --- a/node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt +++ b/node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt @@ -19,6 +19,7 @@ object RpcServerObservableSerializer : Serializer>() { private object RpcObservableContextKey private val log = LoggerFactory.getLogger(javaClass) + fun createContext(observableContext: RPCServer.ObservableContext): SerializationContext { return SerializationDefaults.RPC_SERVER_CONTEXT.withProperty(RpcServerObservableSerializer.RpcObservableContextKey, observableContext) } @@ -80,7 +81,6 @@ object RpcServerObservableSerializer : Serializer>() { } private fun Output.writeInvocationId(id: Trace.InvocationId) { - writeString(id.value) writeLong(id.timestamp.toEpochMilli()) } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ObservableContextInterface.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ObservableContextInterface.kt new file mode 100644 index 0000000000..ce0521d686 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ObservableContextInterface.kt @@ -0,0 +1,16 @@ +package net.corda.node.services.messaging + +import com.github.benmanes.caffeine.cache.Cache +import net.corda.core.context.Trace +import net.corda.nodeapi.RPCApi +import org.apache.activemq.artemis.api.core.SimpleString +import java.util.concurrent.ConcurrentHashMap + +interface ObservableContextInterface { + fun sendMessage(serverToClient: RPCApi.ServerToClient) + + val observableMap: Cache + val clientAddressToObservables: ConcurrentHashMap> + val deduplicationIdentity: String + val clientAddress: SimpleString +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index 7c9ed1cf86..7931bd158b 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -15,13 +15,14 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.internal.LifeCycle import net.corda.core.messaging.RPCOps import net.corda.core.serialization.SerializationContext +import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults.RPC_SERVER_CONTEXT import net.corda.core.serialization.deserialize import net.corda.core.utilities.* import net.corda.node.internal.security.AuthorizingSubject import net.corda.node.internal.security.RPCSecurityManager -import net.corda.node.serialization.kryo.RpcServerObservableSerializer import net.corda.node.services.logging.pushToLoggingContext +import net.corda.node.serialization.amqp.RpcServerObservableSerializer import net.corda.nodeapi.RPCApi import net.corda.nodeapi.externalTrace import net.corda.nodeapi.impersonatedActor @@ -45,6 +46,8 @@ import java.util.* import java.util.concurrent.* import kotlin.concurrent.thread +private typealias ObservableSubscriptionMap = Cache + data class RPCServerConfiguration( /** The number of threads to use for handling RPC requests */ val rpcThreadPoolSize: Int, @@ -410,15 +413,18 @@ class RPCServer( * muxed correctly. Note that the context construction itself is quite cheap. */ inner class ObservableContext( - val observableMap: ObservableSubscriptionMap, - val clientAddressToObservables: ConcurrentHashMap>, - val deduplicationIdentity: String, - val clientAddress: SimpleString - ) { - private val serializationContextWithObservableContext = RpcServerObservableSerializer.createContext(this) + override val observableMap: ObservableSubscriptionMap, + override val clientAddressToObservables: ConcurrentHashMap>, + override val deduplicationIdentity: String, + override val clientAddress: SimpleString + ) : ObservableContextInterface { + private val serializationContextWithObservableContext = RpcServerObservableSerializer.createContext( + observableContext = this, + serializationContext = SerializationDefaults.RPC_SERVER_CONTEXT) - fun sendMessage(serverToClient: RPCApi.ServerToClient) { - sendJobQueue.put(RpcSendJob.Send(contextDatabaseOrNull, clientAddress, serializationContextWithObservableContext, serverToClient)) + override fun sendMessage(serverToClient: RPCApi.ServerToClient) { + sendJobQueue.put(RpcSendJob.Send(contextDatabaseOrNull, clientAddress, + serializationContextWithObservableContext, serverToClient)) } } @@ -478,4 +484,4 @@ class ObservableSubscription( val subscription: Subscription ) -typealias ObservableSubscriptionMap = Cache + diff --git a/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt b/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt index 2e30a1b711..2f54a90cea 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt @@ -10,12 +10,12 @@ import net.corda.core.serialization.serialize import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug import net.corda.core.utilities.seconds +import net.corda.node.serialization.amqp.AMQPServerSerializationScheme import net.corda.nodeapi.internal.NodeInfoAndSigned import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.network.NodeInfoFilesCopier import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl -import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme import rx.Observable import rx.Scheduler import java.nio.file.Path diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt new file mode 100644 index 0000000000..e661007909 --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt @@ -0,0 +1,100 @@ +package net.corda.node.internal.serialization + +import net.corda.client.rpc.internal.ObservableContext as ClientObservableContext +import net.corda.core.internal.ThreadBox +import net.corda.core.context.Trace +import net.corda.node.internal.serialization.testutils.AMQPRoundTripRPCSerializationScheme +import net.corda.node.internal.serialization.testutils.TestObservableContext as ServerObservableContext +import net.corda.node.services.messaging.ObservableSubscription +import net.corda.nodeapi.internal.serialization.amqp.DeserializationInput +import net.corda.nodeapi.internal.serialization.amqp.SerializationOutput + +import co.paralleluniverse.common.util.SameThreadExecutor +import com.github.benmanes.caffeine.cache.Cache +import com.github.benmanes.caffeine.cache.Caffeine +import com.github.benmanes.caffeine.cache.RemovalListener +import com.nhaarman.mockito_kotlin.mock +import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer +import net.corda.node.internal.serialization.testutils.serializationContext +import net.corda.node.serialization.amqp.RpcServerObservableSerializer +import net.corda.nodeapi.RPCApi +import org.apache.activemq.artemis.api.core.SimpleString +import org.junit.Test +import rx.Notification +import rx.Observable +import rx.Subscription +import rx.subjects.UnicastSubject +import java.time.Instant +import java.util.* +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit + +class RoundTripObservableSerializerTests { + private fun getID() = Trace.InvocationId("test1", Instant.now()) + + private fun subscriptionMap( + id: Trace.InvocationId + ) : Cache { + val subMap: Cache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES) + .maximumSize(100) + .build() + + subMap.put(id, ObservableSubscription(mock())) + + return subMap + } + + private val observablesToReap = ThreadBox(object { + var observables = ArrayList() + }) + + private fun createRpcObservableMap(): Cache>> { + val onObservableRemove = RemovalListener>> { key, value, cause -> + val observableId = key!! + + observablesToReap.locked { observables.add(observableId) } + } + + return Caffeine.newBuilder().weakValues().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()).build() + } + + @Test + fun roundTripTest1() { + val serializationScheme = AMQPRoundTripRPCSerializationScheme(serializationContext) + + // Fake up a message ID, needs to be used on both "sides". The server setting it in the subscriptionMap, + // the client as a property of the deserializer which, in the actual RPC client, is pulled off of + // the received message + val id : Trace.InvocationId = getID() + + val serverObservableContext = ServerObservableContext( + subscriptionMap(id), + clientAddressToObservables = ConcurrentHashMap(), + deduplicationIdentity = "thisIsATest", + clientAddress = SimpleString("clientAddress")) + + val serverSerializer = serializationScheme.rpcServerSerializerFactory(serverObservableContext) + + val clientObservableContext = ClientObservableContext( + callSiteMap = null, + observableMap = createRpcObservableMap(), + hardReferenceStore = Collections.synchronizedSet(mutableSetOf>()) + ) + + val clientSerializer = serializationScheme.rpcClientSerializerFactory(clientObservableContext, id) + + + // What we're actually going to serialize then deserialize + val obs = Observable.create({ 12 }) + + val serverSerializationContext = RpcServerObservableSerializer.createContext( + serializationContext, serverObservableContext) + + val clientSerializationContext = RpcClientObservableSerializer.createContext( + serializationContext, clientObservableContext).withProperty(RPCApi.RpcRequestOrObservableIdKey, id) + + + val blob = SerializationOutput(serverSerializer).serialize(obs, serverSerializationContext) + val obs2 = DeserializationInput(clientSerializer).deserialize(blob, clientSerializationContext) + } +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/RpcServerObservableSerializerTests.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/RpcServerObservableSerializerTests.kt new file mode 100644 index 0000000000..17fa31d95c --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/RpcServerObservableSerializerTests.kt @@ -0,0 +1,87 @@ +package net.corda.node.internal.serialization + +import com.github.benmanes.caffeine.cache.Cache +import com.github.benmanes.caffeine.cache.Caffeine +import com.nhaarman.mockito_kotlin.mock +import net.corda.core.context.Trace +import net.corda.node.internal.serialization.testutils.* +import net.corda.node.serialization.amqp.RpcServerObservableSerializer +import net.corda.node.services.messaging.ObservableSubscription +import net.corda.nodeapi.internal.serialization.AllWhitelist +import net.corda.nodeapi.internal.serialization.amqp.SerializationOutput +import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory + +import org.apache.activemq.artemis.api.core.SimpleString +import org.junit.Test +import rx.Observable +import rx.Subscription +import java.time.Instant +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class RpcServerObservableSerializerTests { + + private fun subscriptionMap(): Cache { + val subMap: Cache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES) + .maximumSize(100) + .build() + + subMap.put(Trace.InvocationId("test1", Instant.now()), ObservableSubscription(mock())) + + return subMap + } + + @Test + fun canSerializerBeRegistered() { + val sf = SerializerFactory(cl = javaClass.classLoader, whitelist = AllWhitelist) + + try { + sf.register(RpcServerObservableSerializer()) + } catch (e: Exception) { + throw Error("Observable serializer must be registerable with factory, unexpected exception - ${e.message}") + } + } + + @Test + fun canAssociateWithContext() { + val observable = TestObservableContext( + subscriptionMap(), + clientAddressToObservables = ConcurrentHashMap(), + deduplicationIdentity = "thisIsATest", + clientAddress = SimpleString("clientAddress")) + + val newContext = RpcServerObservableSerializer.createContext(serializationContext, observable) + + assertEquals(1, newContext.properties.size) + assertTrue(newContext.properties.containsKey(RpcServerObservableSerializer.RpcObservableContextKey)) + assertEquals(observable, newContext.properties[RpcServerObservableSerializer.RpcObservableContextKey]) + } + + @Test + fun serialiseFakeObservable() { + val testClientAddress = "clientAddres" + val observable = TestObservableContext( + subscriptionMap(), + clientAddressToObservables = ConcurrentHashMap(), + deduplicationIdentity = "thisIsATest", + clientAddress = SimpleString(testClientAddress)) + + val sf = SerializerFactory( + cl = javaClass.classLoader, + whitelist = AllWhitelist + ).apply { + register(RpcServerObservableSerializer()) + } + + val obs = Observable.create({ 12 }) + val newContext = RpcServerObservableSerializer.createContext(serializationContext, observable) + + try { + SerializationOutput(sf).serializeAndReturnSchema(obs, newContext) + } catch (e: Exception) { + throw Error("Serialization of observable should not throw - ${e.message}") + } + } +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt new file mode 100644 index 0000000000..70ef79cc8b --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt @@ -0,0 +1,54 @@ +package net.corda.node.internal.serialization.testutils + +import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer +import net.corda.core.context.Trace +import net.corda.core.cordapp.Cordapp +import net.corda.core.serialization.SerializationContext +import net.corda.core.serialization.SerializationCustomSerializer +import net.corda.node.serialization.amqp.RpcServerObservableSerializer +import net.corda.nodeapi.RPCApi +import net.corda.nodeapi.internal.serialization.AllWhitelist +import net.corda.nodeapi.internal.serialization.CordaSerializationMagic +import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme +import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory +import net.corda.client.rpc.internal.ObservableContext as ClientObservableContext + +/** + * Special serialization context for the round trip tests that allows for both server and client RPC + * operations + */ +class AMQPRoundTripRPCSerializationScheme( + private val serializationContext: SerializationContext, + cordappCustomSerializers: Set> = emptySet()) + : AbstractAMQPSerializationScheme( + cordappCustomSerializers +) { + constructor( + serializationContext: SerializationContext, + cordapps: List) : this(serializationContext, cordapps.customSerializers) + + override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { + return SerializerFactory(cl = javaClass.classLoader, whitelist = AllWhitelist).apply { + register(RpcClientObservableSerializer) + } + } + + override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory { + return SerializerFactory(cl = javaClass.classLoader, whitelist = AllWhitelist).apply { + register(RpcServerObservableSerializer()) + } + } + + override fun canDeserializeVersion( + magic: CordaSerializationMagic, + target: SerializationContext.UseCase) = true + + fun rpcClientSerializerFactory(observableContext: ClientObservableContext, id: Trace.InvocationId) = + rpcClientSerializerFactory( + RpcClientObservableSerializer.createContext(serializationContext, observableContext) + .withProperty(RPCApi.RpcRequestOrObservableIdKey, id)) + + fun rpcServerSerializerFactory(observableContext: TestObservableContext) = + rpcServerSerializerFactory( + RpcServerObservableSerializer.createContext(serializationContext, observableContext)) +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestObservableContext.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestObservableContext.kt new file mode 100644 index 0000000000..cbfcaa8f5c --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestObservableContext.kt @@ -0,0 +1,18 @@ +package net.corda.node.internal.serialization.testutils + +import com.github.benmanes.caffeine.cache.Cache +import net.corda.core.context.Trace +import net.corda.node.services.messaging.ObservableContextInterface +import net.corda.node.services.messaging.ObservableSubscription +import net.corda.nodeapi.RPCApi +import org.apache.activemq.artemis.api.core.SimpleString +import java.util.concurrent.ConcurrentHashMap + +class TestObservableContext( + override val observableMap: Cache, + override val clientAddressToObservables: ConcurrentHashMap>, + override val deduplicationIdentity: String, + override val clientAddress: SimpleString +) : ObservableContextInterface { + override fun sendMessage(serverToClient: RPCApi.ServerToClient) { } +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestSerializationContext.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestSerializationContext.kt new file mode 100644 index 0000000000..54ad5e55ba --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestSerializationContext.kt @@ -0,0 +1,17 @@ +package net.corda.node.internal.serialization.testutils + +import net.corda.core.serialization.SerializationContext +import net.corda.nodeapi.internal.serialization.AllWhitelist +import net.corda.nodeapi.internal.serialization.SerializationContextImpl +import net.corda.nodeapi.internal.serialization.amqp.amqpMagic + +val serializationProperties: MutableMap = mutableMapOf() + +val serializationContext = SerializationContextImpl( + preferredSerializationVersion = amqpMagic, + deserializationClassLoader = ClassLoader.getSystemClassLoader(), + whitelist = AllWhitelist, + properties = serializationProperties, + objectReferencesEnabled = false, + useCase = SerializationContext.UseCase.Testing, + encoding = null) \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestSerializerFactoryFactory.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestSerializerFactoryFactory.kt new file mode 100644 index 0000000000..a1c20aae53 --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestSerializerFactoryFactory.kt @@ -0,0 +1,11 @@ +package net.corda.node.internal.serialization.testutils + +import net.corda.core.serialization.SerializationContext +import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory +import net.corda.nodeapi.internal.serialization.amqp.SerializerFactoryFactory + +class TestSerializerFactoryFactory : SerializerFactoryFactory() { + override fun make(context: SerializationContext): SerializerFactory { + return super.make(context) + } +} \ No newline at end of file diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt index 5b3f1a964b..13abc74202 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt @@ -1,9 +1,9 @@ package net.corda.testing.node.internal import net.corda.client.mock.Generator -import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme import net.corda.client.rpc.internal.RPCClient import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl +import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.core.concurrent.CordaFuture import net.corda.core.context.AuthServiceId import net.corda.core.context.Trace @@ -23,7 +23,7 @@ import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.nodeapi.ArtemisTcpTransport import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.RPCApi -import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT +import net.corda.nodeapi.internal.serialization.AMQP_RPC_CLIENT_CONTEXT import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.core.MAX_MESSAGE_SIZE import net.corda.testing.driver.JmxPolicy @@ -513,8 +513,8 @@ class RandomRpcUser { val hostAndPort = NetworkHostAndPort.parse(args[1]) val username = args[2] val password = args[3] - KryoClientSerializationScheme.initialiseSerialization() - val handle = RPCClient(hostAndPort, null, serializationContext = KRYO_RPC_CLIENT_CONTEXT).start(rpcClass, username, password) + AMQPClientSerializationScheme.initialiseSerialization() + val handle = RPCClient(hostAndPort, null, serializationContext = AMQP_RPC_CLIENT_CONTEXT).start(rpcClass, username, password) val callGenerators = rpcClass.declaredMethods.map { method -> Generator.sequence(method.parameters.map { generatorStore[it.type] ?: throw Exception("No generator for ${it.type}") diff --git a/testing/smoke-test-utils/src/main/kotlin/net/corda/smoketesting/NodeProcess.kt b/testing/smoke-test-utils/src/main/kotlin/net/corda/smoketesting/NodeProcess.kt index ee3936c4c2..a339d1040a 100644 --- a/testing/smoke-test-utils/src/main/kotlin/net/corda/smoketesting/NodeProcess.kt +++ b/testing/smoke-test-utils/src/main/kotlin/net/corda/smoketesting/NodeProcess.kt @@ -2,7 +2,7 @@ package net.corda.smoketesting import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCConnection -import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme +import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.core.internal.* import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger @@ -59,7 +59,7 @@ class NodeProcess( val javaPath: Path = Paths.get(System.getProperty("java.home"), "bin", "java") val formatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(systemDefault()) val defaultNetworkParameters = run { - KryoClientSerializationScheme.createSerializationEnv().asContextEnv { + AMQPClientSerializationScheme.createSerializationEnv().asContextEnv { // There are no notaries in the network parameters for smoke test nodes. If this is required then we would // need to introduce the concept of a "network" which predefines the notaries, like the driver and MockNetwork NetworkParametersCopier(testNetworkParameters()) diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/core/SerializationTestHelpers.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/core/SerializationTestHelpers.kt index 5cfe318790..d896710d26 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/core/SerializationTestHelpers.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/core/SerializationTestHelpers.kt @@ -3,6 +3,7 @@ package net.corda.testing.core import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.doAnswer import com.nhaarman.mockito_kotlin.whenever +import net.corda.core.DoNotImplement import net.corda.core.internal.staticField import net.corda.core.serialization.internal.SerializationEnvironment import net.corda.core.serialization.internal.effectiveSerializationEnv diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt index ae2c734f96..eaebc3b134 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt @@ -2,13 +2,13 @@ package net.corda.testing.internal import com.nhaarman.mockito_kotlin.doNothing import com.nhaarman.mockito_kotlin.whenever -import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme import net.corda.core.DoNotImplement +import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme +import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme import net.corda.core.serialization.internal.* import net.corda.node.serialization.kryo.KryoServerSerializationScheme +import net.corda.node.serialization.amqp.AMQPServerSerializationScheme import net.corda.nodeapi.internal.serialization.* -import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme -import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme import net.corda.testing.core.SerializationEnvironmentRule import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ExecutorService @@ -39,8 +39,8 @@ internal fun createTestSerializationEnv(label: String): SerializationEnvironment return object : SerializationEnvironmentImpl( factory, AMQP_P2P_CONTEXT, - KRYO_RPC_SERVER_CONTEXT, - KRYO_RPC_CLIENT_CONTEXT, + AMQP_RPC_SERVER_CONTEXT, + AMQP_RPC_CLIENT_CONTEXT, AMQP_STORAGE_CONTEXT, KRYO_CHECKPOINT_CONTEXT ) { diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/DemoBench.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/DemoBench.kt index df5a534352..2a76aa1fb3 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/DemoBench.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/DemoBench.kt @@ -1,13 +1,13 @@ package net.corda.demobench import javafx.scene.image.Image +import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme import net.corda.core.serialization.internal.SerializationEnvironmentImpl import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.demobench.views.DemoBenchView import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl -import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme import tornadofx.* import java.io.InputStreamReader import java.nio.charset.StandardCharsets.UTF_8 @@ -60,7 +60,7 @@ class DemoBench : App(DemoBenchView::class) { nodeSerializationEnv = SerializationEnvironmentImpl( SerializationFactoryImpl().apply { registerScheme(KryoClientSerializationScheme()) - registerScheme(AMQPClientSerializationScheme()) + registerScheme(AMQPClientSerializationScheme(emptyList())) }, AMQP_P2P_CONTEXT) } From 0c3a30edc82b71699fc1aaf349418626639508b1 Mon Sep 17 00:00:00 2001 From: Kat Baker Date: Thu, 10 May 2018 21:21:42 +0100 Subject: [PATCH 2/2] Corda-847 - Remove Kryo for RPC It's no longer used as we've switched over to AMQP for RPC calls so remove it from everywhere and only use it for checkpointing * Wire up demo bench post Kryo removal * Test Fixes * rebase and fix tests * Test Fix * wip * revert changes to api now we don't need to add annotations --- .ci/api-current.txt | 14 --- .../kryo/KryoClientSerializationScheme.kt | 55 ------------ .../kryo/RpcClientObservableSerializer.kt | 78 ----------------- .../corda/core/utilities/KotlinUtilsTest.kt | 24 +++-- docs/source/changelog.rst | 4 + .../serialization/SerializationScheme.kt | 3 +- .../serialization/amqp/SerializerFactory.kt | 6 +- .../internal/serialization/kryo/Kryo.kt | 79 ++--------------- .../ForbiddenLambdaSerializationTests.java | 31 +++---- .../internal/crypto/X509UtilitiesTest.kt | 2 - .../internal/serialization/kryo/KryoTests.kt | 17 +++- .../kotlin/net/corda/node/internal/Node.kt | 6 +- .../amqp/AMQPServerSerializationScheme.kt | 6 +- .../kryo/KryoServerSerializationScheme.kt | 13 +-- .../kryo/RpcServerObservableSerializer.kt | 87 ------------------- .../messaging/ObservableContextInterface.kt | 5 ++ .../node/services/messaging/RPCServer.kt | 2 +- .../RoundTripObservableSerializerTests.kt | 3 +- .../RpcServerObservableSerializerTests.kt | 7 +- .../testutils/AMQPTestSerialiationScheme.kt | 17 ++-- .../testutils/TestSerializerFactoryFactory.kt | 11 --- .../InternalSerializationTestHelpers.kt | 7 +- .../kotlin/net/corda/demobench/DemoBench.kt | 2 - .../net/corda/tools/shell/SSHServerTest.kt | 3 +- 24 files changed, 96 insertions(+), 386 deletions(-) delete mode 100644 client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/KryoClientSerializationScheme.kt delete mode 100644 client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/RpcClientObservableSerializer.kt delete mode 100644 node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt delete mode 100644 node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestSerializerFactoryFactory.kt diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 107c0b060f..b088f61edb 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -76,16 +76,10 @@ public final class net.corda.core.concurrent.ConcurrencyUtils extends java.lang. @NotNull public static final String shortCircuitedTaskFailedMessage = "Short-circuited task failed:" ## -<<<<<<< HEAD public interface net.corda.core.concurrent.CordaFuture extends java.util.concurrent.Future public abstract void then(kotlin.jvm.functions.Function1, ? extends W>) @NotNull public abstract java.util.concurrent.CompletableFuture toCompletableFuture() -======= -@net.corda.core.serialization.CordaSerializable public interface net.corda.core.concurrent.CordaFuture extends java.util.concurrent.Future - public abstract void then(kotlin.jvm.functions.Function1) - @org.jetbrains.annotations.NotNull public abstract concurrent.CompletableFuture toCompletableFuture() ->>>>>>> CORDA-847 - Update api doc with additon of @CordaSerializable annotation ## @CordaSerializable public final class net.corda.core.context.Actor extends java.lang.Object @@ -2621,12 +2615,8 @@ public final class net.corda.core.messaging.DataFeed extends java.lang.Object public int hashCode() public String toString() ## -<<<<<<< HEAD @DoNotImplement public interface net.corda.core.messaging.FlowHandle extends java.lang.AutoCloseable -======= -@net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public interface net.corda.core.messaging.FlowHandle extends java.lang.AutoCloseable ->>>>>>> CORDA-847 - Update api doc with additon of @CordaSerializable annotation public abstract void close() @NotNull public abstract net.corda.core.flows.StateMachineRunId getId() @@ -2652,12 +2642,8 @@ public final class net.corda.core.messaging.FlowHandleImpl extends java.lang.Obj public int hashCode() public String toString() ## -<<<<<<< HEAD @DoNotImplement public interface net.corda.core.messaging.FlowProgressHandle extends net.corda.core.messaging.FlowHandle -======= -@net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public interface net.corda.core.messaging.FlowProgressHandle extends net.corda.core.messaging.FlowHandle ->>>>>>> CORDA-847 - Update api doc with additon of @CordaSerializable annotation public abstract void close() @NotNull public abstract rx.Observable getProgress() diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/KryoClientSerializationScheme.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/KryoClientSerializationScheme.kt deleted file mode 100644 index 5a5397ca6d..0000000000 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/KryoClientSerializationScheme.kt +++ /dev/null @@ -1,55 +0,0 @@ -package net.corda.client.rpc.internal.serialization.kryo - -import com.esotericsoftware.kryo.pool.KryoPool -import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme -import net.corda.core.serialization.SerializationContext -import net.corda.core.serialization.internal.SerializationEnvironment -import net.corda.core.serialization.internal.SerializationEnvironmentImpl -import net.corda.core.serialization.internal.nodeSerializationEnv -import net.corda.nodeapi.internal.serialization.* -import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme -import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer -import net.corda.nodeapi.internal.serialization.kryo.kryoMagic -import net.corda.nodeapi.internal.serialization.kryo.RPCKryo - -private val KRYO_RPC_CLIENT_CONTEXT = SerializationContextImpl(kryoMagic, - net.corda.core.serialization.SerializationDefaults.javaClass.classLoader, - GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), - emptyMap(), - true, - SerializationContext.UseCase.RPCClient, - null) - -class KryoClientSerializationScheme : AbstractKryoSerializationScheme() { - override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { - return magic == kryoMagic && (target == SerializationContext.UseCase.RPCClient || target == SerializationContext.UseCase.P2P) - } - - override fun rpcClientKryoPool(context: SerializationContext): KryoPool { - return KryoPool.Builder { - DefaultKryoCustomizer.customize(RPCKryo(RpcClientObservableSerializer, context), publicKeySerializer).apply { - classLoader = context.deserializationClassLoader - } - }.build() - } - - // We're on the client and don't have access to server classes. - override fun rpcServerKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException() - - companion object { - /** Call from main only. */ - fun initialiseSerialization(classLoader: ClassLoader? = null) { - nodeSerializationEnv = createSerializationEnv(classLoader) - } - - fun createSerializationEnv(classLoader: ClassLoader? = null): SerializationEnvironment { - return SerializationEnvironmentImpl( - SerializationFactoryImpl().apply { - registerScheme(KryoClientSerializationScheme()) - registerScheme(AMQPClientSerializationScheme(emptyList())) - }, - if (classLoader != null) AMQP_P2P_CONTEXT.withClassLoader(classLoader) else AMQP_P2P_CONTEXT, - rpcClientContext = if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT) - } - } -} \ No newline at end of file diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/RpcClientObservableSerializer.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/RpcClientObservableSerializer.kt deleted file mode 100644 index c69081a667..0000000000 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/RpcClientObservableSerializer.kt +++ /dev/null @@ -1,78 +0,0 @@ -package net.corda.client.rpc.internal.serialization.kryo - -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.Serializer -import com.esotericsoftware.kryo.io.Input -import com.esotericsoftware.kryo.io.Output -import net.corda.client.rpc.internal.ObservableContext -import net.corda.core.context.Trace -import net.corda.core.serialization.SerializationContext -import net.corda.nodeapi.RPCApi -import rx.Notification -import rx.Observable -import rx.subjects.UnicastSubject -import java.time.Instant -import java.util.concurrent.atomic.AtomicInteger - -/** - * A [Serializer] to deserialize Observables once the corresponding Kryo instance has been provided with an [ObservableContext]. - */ -object RpcClientObservableSerializer : Serializer>() { - private object RpcObservableContextKey - - fun createContext( - serializationContext: SerializationContext, - observableContext: ObservableContext - ): SerializationContext { - return serializationContext.withProperty(RpcObservableContextKey, observableContext) - } - - private fun pinInSubscriptions(observable: Observable, hardReferenceStore: MutableSet>): Observable { - val refCount = AtomicInteger(0) - return observable.doOnSubscribe { - if (refCount.getAndIncrement() == 0) { - require(hardReferenceStore.add(observable)) { "Reference store already contained reference $this on add" } - } - }.doOnUnsubscribe { - if (refCount.decrementAndGet() == 0) { - require(hardReferenceStore.remove(observable)) { "Reference store did not contain reference $this on remove" } - } - } - } - - override fun read(kryo: Kryo, input: Input, type: Class>): Observable { - val observableContext = kryo.context[RpcObservableContextKey] as ObservableContext - val observableId = input.readInvocationId() ?: throw IllegalStateException("Unable to read invocationId from Input.") - val observable = UnicastSubject.create>() - require(observableContext.observableMap.getIfPresent(observableId) == null) { - "Multiple Observables arrived with the same ID $observableId" - } - val rpcCallSite = getRpcCallSite(kryo, observableContext) - observableContext.observableMap.put(observableId, observable) - observableContext.callSiteMap?.put(observableId, rpcCallSite) - // We pin all Observables into a hard reference store (rooted in the RPC proxy) on subscription so that users - // don't need to store a reference to the Observables themselves. - return pinInSubscriptions(observable, observableContext.hardReferenceStore).doOnUnsubscribe { - // This causes Future completions to give warnings because the corresponding OnComplete sent from the server - // will arrive after the client unsubscribes from the observable and consequently invalidates the mapping. - // The unsubscribe is due to [ObservableToFuture]'s use of first(). - observableContext.observableMap.invalidate(observableId) - }.dematerialize() - } - - private fun Input.readInvocationId(): Trace.InvocationId? { - - val value = readString() ?: return null - val timestamp = readLong() - return Trace.InvocationId(value, Instant.ofEpochMilli(timestamp)) - } - - override fun write(kryo: Kryo, output: Output, observable: Observable<*>) { - throw UnsupportedOperationException("Cannot serialise Observables on the client side") - } - - private fun getRpcCallSite(kryo: Kryo, observableContext: ObservableContext): Throwable? { - val rpcRequestOrObservableId = kryo.context[RPCApi.RpcRequestOrObservableIdKey] as Trace.InvocationId - return observableContext.callSiteMap?.get(rpcRequestOrObservableId) - } -} \ No newline at end of file diff --git a/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt b/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt index f725babdf2..a62ec2333d 100644 --- a/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt +++ b/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt @@ -2,16 +2,20 @@ package net.corda.core.utilities import com.esotericsoftware.kryo.KryoException import net.corda.core.crypto.random63BitValue -import net.corda.core.serialization.CordaSerializable -import net.corda.core.serialization.deserialize -import net.corda.core.serialization.serialize +import net.corda.core.serialization.* import net.corda.nodeapi.internal.serialization.KRYO_CHECKPOINT_CONTEXT +import net.corda.nodeapi.internal.serialization.SerializationContextImpl +import net.corda.nodeapi.internal.serialization.kryo.kryoMagic import net.corda.testing.core.SerializationEnvironmentRule import org.assertj.core.api.Assertions.assertThat import org.junit.Rule import org.junit.Test import org.junit.rules.ExpectedException +object EmptyWhitelist : ClassWhitelist { + override fun hasListed(type: Class<*>): Boolean = false +} + class KotlinUtilsTest { @Rule @JvmField @@ -20,6 +24,14 @@ class KotlinUtilsTest { @Rule val expectedEx: ExpectedException = ExpectedException.none() + val KRYO_CHECKPOINT_NOWHITELIST_CONTEXT = SerializationContextImpl(kryoMagic, + SerializationDefaults.javaClass.classLoader, + EmptyWhitelist, + emptyMap(), + true, + SerializationContext.UseCase.Checkpoint, + null) + @Test fun `transient property which is null`() { val test = NullTransientProperty() @@ -43,7 +55,7 @@ class KotlinUtilsTest { expectedEx.expect(KryoException::class.java) expectedEx.expectMessage("is not annotated or on the whitelist, so cannot be used in serialization") val original = NonCapturingTransientProperty() - original.serialize(context = KRYO_CHECKPOINT_CONTEXT).deserialize() + original.serialize(context = KRYO_CHECKPOINT_CONTEXT).deserialize(context = KRYO_CHECKPOINT_NOWHITELIST_CONTEXT) } @Test @@ -61,8 +73,10 @@ class KotlinUtilsTest { fun `deserialise transient property with capturing lambda`() { expectedEx.expect(KryoException::class.java) expectedEx.expectMessage("is not annotated or on the whitelist, so cannot be used in serialization") + val original = CapturingTransientProperty("Hello") - original.serialize(context = KRYO_CHECKPOINT_CONTEXT).deserialize() + + original.serialize(context = KRYO_CHECKPOINT_CONTEXT).deserialize(context = KRYO_CHECKPOINT_NOWHITELIST_CONTEXT) } private class NullTransientProperty { diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 331b6b5158..51e3d5bd89 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -6,6 +6,10 @@ release, see :doc:`upgrade-notes`. Unreleased ========== + +* RPC Framework moved from Kryo to the Corda AMQP implementation [Corda-847]. This completes the removal + of ``Kryo`` from general use within Corda, remaining only for use in flow checkpointing. + * Set co.paralleluniverse.fibers.verifyInstrumentation=true in devMode. * Node will now gracefully fail to start if one of the required ports is already in use. diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt index 2163459104..7e660ca78e 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt @@ -114,7 +114,8 @@ open class SerializationFactoryImpl( val lookupKey = magic to target return schemes.computeIfAbsent(lookupKey) { registeredSchemes.filter { it.canDeserializeVersion(magic, target) }.forEach { return@computeIfAbsent it } // XXX: Not single? - logger.warn("Cannot find serialization scheme for: $lookupKey, registeredSchemes are: $registeredSchemes") + logger.warn("Cannot find serialization scheme for: [$lookupKey, " + + "${if (magic == amqpMagic) "AMQP" else if (magic == kryoMagic) "Kryo" else "UNKNOWN MAGIC"}] registeredSchemes are: $registeredSchemes") throw UnsupportedOperationException("Serialization scheme $lookupKey not supported.") } to magic } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializerFactory.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializerFactory.kt index 8a56baf1b0..1ea7daf020 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializerFactory.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializerFactory.kt @@ -285,7 +285,11 @@ open class SerializerFactory( } private fun makeClassSerializer(clazz: Class<*>, type: Type, declaredType: Type): AMQPSerializer = serializersByType.computeIfAbsent(type) { - if (isPrimitive(clazz)) { + if (clazz.isSynthetic) { + // Explicitly ban synthetic classes, we have no way of recreating them when deserializing. This also + // captures Lambda expressions and other anonymous functions + throw NotSerializableException(type.typeName) + } else if (isPrimitive(clazz)) { AMQPPrimitiveSerializer(clazz) } else { findCustomSerializer(clazz, declaredType) ?: run { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/Kryo.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/Kryo.kt index 7a5111fd93..df8c3814ac 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/Kryo.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/Kryo.kt @@ -7,7 +7,6 @@ import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer import com.esotericsoftware.kryo.serializers.FieldSerializer import com.esotericsoftware.kryo.util.MapReferenceResolver -import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.PrivacySalt import net.corda.core.crypto.Crypto import net.corda.core.crypto.SecureHash @@ -18,8 +17,6 @@ import net.corda.core.serialization.SerializationContext.UseCase.Checkpoint import net.corda.core.serialization.SerializationContext.UseCase.Storage import net.corda.core.serialization.SerializeAsTokenContext import net.corda.core.serialization.SerializedBytes -import net.corda.core.toFuture -import net.corda.core.toObservable import net.corda.core.transactions.* import net.corda.core.utilities.OpaqueBytes import net.corda.nodeapi.internal.crypto.X509CertificateFactory @@ -27,7 +24,6 @@ import net.corda.nodeapi.internal.serialization.CordaClassResolver import net.corda.nodeapi.internal.serialization.serializationContextKey import org.slf4j.Logger import org.slf4j.LoggerFactory -import rx.Observable import java.io.InputStream import java.lang.reflect.InvocationTargetException import java.security.PrivateKey @@ -46,39 +42,16 @@ import kotlin.reflect.jvm.isAccessible import kotlin.reflect.jvm.javaType /** - * Serialization utilities, using the Kryo framework with a custom serialiser for immutable data classes and a dead - * simple, totally non-extensible binary (sub)format. - * - * This is NOT what should be used in any final platform product, rather, the final state should be a precisely - * specified and standardised binary format with attention paid to anti-malleability, versioning and performance. - * FIX SBE is a potential candidate: it prioritises performance over convenience and was designed for HFT. Google - * Protocol Buffers with a minor tightening to make field reordering illegal is another possibility. - * - * FIX SBE: - * https://real-logic.github.io/simple-binary-encoding/ - * http://mechanical-sympathy.blogspot.co.at/2014/05/simple-binary-encoding.html - * Protocol buffers: - * https://developers.google.com/protocol-buffers/ - * - * But for now we use Kryo to maximise prototyping speed. - * - * Note that this code ignores *ALL* concerns beyond convenience, in particular it ignores: - * - * - Performance - * - Security - * - * This code will happily deserialise literally anything, including malicious streams that would reconstruct classes - * in invalid states, thus violating system invariants. It isn't designed to handle malicious streams and therefore, - * isn't usable beyond the prototyping stage. But that's fine: we can revisit serialisation technologies later after - * a formal evaluation process. - * - * We now distinguish between internal, storage related Kryo and external, network facing Kryo. We presently use - * some non-whitelisted classes as part of internal storage. - * TODO: eliminate internal, storage related whitelist issues, such as private keys in blob storage. + * Serialization utilities, using the Kryo framework with a custom serializer for immutable data classes and a dead + * simple, totally non-extensible binary (sub)format. Used exclusively within Corda for checkpointing flows as + * it will happily deserialise literally anything, including malicious streams that would reconstruct classes + * in invalid states and thus violating system invariants. In the context of checkpointing a Java stack, this is + * absolutely the functionality we desire, for a stable binary wire format and persistence technology, we have + * the AMQP implementation. */ /** - * A serialiser that avoids writing the wrapper class to the byte stream, thus ensuring [SerializedBytes] is a pure + * A serializer that avoids writing the wrapper class to the byte stream, thus ensuring [SerializedBytes] is a pure * type safety hack. */ object SerializedBytesSerializer : Serializer>() { @@ -391,44 +364,6 @@ open class CordaKryo(classResolver: ClassResolver) : Kryo(classResolver, MapRefe } } -/** - * The Kryo used for the RPC wire protocol. - */ -// Every type in the wire protocol is listed here explicitly. -// This is annoying to write out, but will make it easier to formalise the wire protocol when the time comes, -// because we can see everything we're using in one place. -class RPCKryo(observableSerializer: Serializer>, serializationContext: SerializationContext) : CordaKryo(CordaClassResolver(serializationContext)) { - init { - DefaultKryoCustomizer.customize(this) - - // RPC specific classes - register(InputStream::class.java, InputStreamSerializer) - register(Observable::class.java, observableSerializer) - register(CordaFuture::class, - read = { kryo, input -> observableSerializer.read(kryo, input, Observable::class.java).toFuture() }, - write = { kryo, output, obj -> observableSerializer.write(kryo, output, obj.toObservable()) } - ) - } - - override fun getRegistration(type: Class<*>): Registration { - if (Observable::class.java != type && Observable::class.java.isAssignableFrom(type)) { - return super.getRegistration(Observable::class.java) - } - if (InputStream::class.java != type && InputStream::class.java.isAssignableFrom(type)) { - return super.getRegistration(InputStream::class.java) - } - if (CordaFuture::class.java != type && CordaFuture::class.java.isAssignableFrom(type)) { - return super.getRegistration(CordaFuture::class.java) - } - type.requireExternal("RPC not allowed to deserialise internal classes") - return super.getRegistration(type) - } - - private fun Class<*>.requireExternal(msg: String) { - require(!name.startsWith("net.corda.node.") && ".internal" !in name) { "$msg: $name" } - } -} - inline fun Kryo.register( type: KClass, crossinline read: (Kryo, Input) -> T, diff --git a/node-api/src/test/java/net/corda/nodeapi/internal/serialization/ForbiddenLambdaSerializationTests.java b/node-api/src/test/java/net/corda/nodeapi/internal/serialization/ForbiddenLambdaSerializationTests.java index db8da10b45..8ae47ea391 100644 --- a/node-api/src/test/java/net/corda/nodeapi/internal/serialization/ForbiddenLambdaSerializationTests.java +++ b/node-api/src/test/java/net/corda/nodeapi/internal/serialization/ForbiddenLambdaSerializationTests.java @@ -4,13 +4,13 @@ import com.google.common.collect.Maps; import net.corda.core.serialization.SerializationContext; import net.corda.core.serialization.SerializationFactory; import net.corda.core.serialization.SerializedBytes; -import net.corda.nodeapi.internal.serialization.kryo.CordaClosureBlacklistSerializer; -import net.corda.nodeapi.internal.serialization.kryo.KryoSerializationSchemeKt; +import net.corda.nodeapi.internal.serialization.amqp.SchemaKt; import net.corda.testing.core.SerializationEnvironmentRule; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import java.io.NotSerializableException; import java.io.Serializable; import java.util.EnumSet; import java.util.concurrent.Callable; @@ -33,20 +33,17 @@ public final class ForbiddenLambdaSerializationTests { @Test public final void serialization_fails_for_serializable_java_lambdas() { contexts.forEach(ctx -> { - SerializationContext context = new SerializationContextImpl(KryoSerializationSchemeKt.getKryoMagic(), + SerializationContext context = new SerializationContextImpl(SchemaKt.getAmqpMagic(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx, null); String value = "Hey"; Callable target = (Callable & Serializable) () -> value; Throwable throwable = catchThrowable(() -> serialize(target, context)); - assertThat(throwable).isNotNull(); - assertThat(throwable).isInstanceOf(IllegalArgumentException.class); - if (ctx != SerializationContext.UseCase.RPCServer && ctx != SerializationContext.UseCase.Storage) { - assertThat(throwable).hasMessage(CordaClosureBlacklistSerializer.ERROR_MESSAGE); - } else { - assertThat(throwable).hasMessageContaining("RPC not allowed to deserialise internal classes"); - } + assertThat(throwable) + .isNotNull() + .isInstanceOf(NotSerializableException.class) + .hasMessageContaining(getClass().getName()); }); } @@ -54,21 +51,17 @@ public final class ForbiddenLambdaSerializationTests { @SuppressWarnings("unchecked") public final void serialization_fails_for_not_serializable_java_lambdas() { contexts.forEach(ctx -> { - SerializationContext context = new SerializationContextImpl(KryoSerializationSchemeKt.getKryoMagic(), + SerializationContext context = new SerializationContextImpl(SchemaKt.getAmqpMagic(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx, null); String value = "Hey"; Callable target = () -> value; Throwable throwable = catchThrowable(() -> serialize(target, context)); - assertThat(throwable).isNotNull(); - assertThat(throwable).isInstanceOf(IllegalArgumentException.class); - assertThat(throwable).isInstanceOf(IllegalArgumentException.class); - if (ctx != SerializationContext.UseCase.RPCServer && ctx != SerializationContext.UseCase.Storage) { - assertThat(throwable).hasMessage(CordaClosureBlacklistSerializer.ERROR_MESSAGE); - } else { - assertThat(throwable).hasMessageContaining("RPC not allowed to deserialise internal classes"); - } + assertThat(throwable) + .isNotNull() + .isInstanceOf(NotSerializableException.class) + .hasMessageContaining(getClass().getName()); }); } diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt index 5508e857d9..ed9345c00e 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt @@ -9,14 +9,12 @@ import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.node.serialization.amqp.AMQPServerSerializationScheme -import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.nodeapi.internal.config.SSLConfiguration import net.corda.nodeapi.internal.createDevKeyStores import net.corda.nodeapi.internal.serialization.AllWhitelist import net.corda.nodeapi.internal.serialization.SerializationContextImpl import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl import net.corda.nodeapi.internal.serialization.amqp.amqpMagic -import net.corda.nodeapi.internal.serialization.kryo.kryoMagic import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.BOB_NAME import net.corda.testing.core.TestIdentity diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt index 860c7b64c0..8607d04d8c 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt @@ -5,6 +5,7 @@ import com.esotericsoftware.kryo.KryoException import com.esotericsoftware.kryo.KryoSerializable import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output +import com.esotericsoftware.kryo.pool.KryoPool import com.google.common.primitives.Ints import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.whenever @@ -15,7 +16,6 @@ import net.corda.core.serialization.* import net.corda.core.utilities.ByteSequence import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.sequence -import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.node.services.persistence.NodeAttachmentService import net.corda.nodeapi.internal.serialization.* import net.corda.testing.core.ALICE_NAME @@ -35,6 +35,17 @@ import java.time.Instant import java.util.* import kotlin.test.* +class TestScheme : AbstractKryoSerializationScheme() { + override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { + return magic == kryoMagic && target != SerializationContext.UseCase.RPCClient + } + + override fun rpcClientKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException() + + override fun rpcServerKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException() + +} + @RunWith(Parameterized::class) class KryoTests(private val compression: CordaSerializationEncoding?) { companion object { @@ -49,7 +60,7 @@ class KryoTests(private val compression: CordaSerializationEncoding?) { @Before fun setup() { - factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) } + factory = SerializationFactoryImpl().apply { registerScheme(TestScheme()) } context = SerializationContextImpl(kryoMagic, javaClass.classLoader, AllWhitelist, @@ -270,7 +281,7 @@ class KryoTests(private val compression: CordaSerializationEncoding?) { } } Tmp() - val factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) } + val factory = SerializationFactoryImpl().apply { registerScheme(TestScheme()) } val context = SerializationContextImpl(kryoMagic, javaClass.classLoader, AllWhitelist, diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 26fa515e6d..944faf6579 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -1,7 +1,7 @@ package net.corda.node.internal import com.codahale.metrics.JmxReporter -import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme +import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.core.concurrent.CordaFuture import net.corda.core.internal.Emoji import net.corda.core.internal.concurrent.openFuture @@ -384,9 +384,9 @@ open class Node(configuration: NodeConfiguration, val classloader = cordappLoader.appClassLoader nodeSerializationEnv = SerializationEnvironmentImpl( SerializationFactoryImpl().apply { - registerScheme(KryoServerSerializationScheme()) registerScheme(AMQPServerSerializationScheme(cordappLoader.cordapps)) - registerScheme(KryoClientSerializationScheme()) + registerScheme(AMQPClientSerializationScheme(cordappLoader.cordapps)) + registerScheme(KryoServerSerializationScheme() ) }, p2pContext = AMQP_P2P_CONTEXT.withClassLoader(classloader), rpcServerContext = AMQP_RPC_SERVER_CONTEXT.withClassLoader(classloader), diff --git a/node/src/main/kotlin/net/corda/node/serialization/amqp/AMQPServerSerializationScheme.kt b/node/src/main/kotlin/net/corda/node/serialization/amqp/AMQPServerSerializationScheme.kt index b645b1289d..dc9ea63428 100644 --- a/node/src/main/kotlin/net/corda/node/serialization/amqp/AMQPServerSerializationScheme.kt +++ b/node/src/main/kotlin/net/corda/node/serialization/amqp/AMQPServerSerializationScheme.kt @@ -39,8 +39,8 @@ class AMQPServerSerializationScheme( override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { return canDeserializeVersion(magic) && - ( target == SerializationContext.UseCase.P2P - || target == SerializationContext.UseCase.Storage - || target == SerializationContext.UseCase.RPCServer) + ( target == SerializationContext.UseCase.P2P + || target == SerializationContext.UseCase.Storage + || target == SerializationContext.UseCase.RPCServer) } } diff --git a/node/src/main/kotlin/net/corda/node/serialization/kryo/KryoServerSerializationScheme.kt b/node/src/main/kotlin/net/corda/node/serialization/kryo/KryoServerSerializationScheme.kt index acf4c7f461..83eec03995 100644 --- a/node/src/main/kotlin/net/corda/node/serialization/kryo/KryoServerSerializationScheme.kt +++ b/node/src/main/kotlin/net/corda/node/serialization/kryo/KryoServerSerializationScheme.kt @@ -4,22 +4,15 @@ import com.esotericsoftware.kryo.pool.KryoPool import net.corda.core.serialization.SerializationContext import net.corda.nodeapi.internal.serialization.CordaSerializationMagic import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme -import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer import net.corda.nodeapi.internal.serialization.kryo.kryoMagic -import net.corda.nodeapi.internal.serialization.kryo.RPCKryo class KryoServerSerializationScheme : AbstractKryoSerializationScheme() { override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { - return magic == kryoMagic && target != SerializationContext.UseCase.RPCClient + return magic == kryoMagic && target == SerializationContext.UseCase.Checkpoint } override fun rpcClientKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException() - override fun rpcServerKryoPool(context: SerializationContext): KryoPool { - return KryoPool.Builder { - DefaultKryoCustomizer.customize(RPCKryo(RpcServerObservableSerializer, context), publicKeySerializer).apply { - classLoader = context.deserializationClassLoader - } - }.build() - } + override fun rpcServerKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException() + } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt b/node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt deleted file mode 100644 index a51448e83a..0000000000 --- a/node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt +++ /dev/null @@ -1,87 +0,0 @@ -package net.corda.node.serialization.kryo - -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.Serializer -import com.esotericsoftware.kryo.io.Input -import com.esotericsoftware.kryo.io.Output -import net.corda.core.context.Trace -import net.corda.core.serialization.SerializationContext -import net.corda.core.serialization.SerializationDefaults -import net.corda.node.services.messaging.ObservableSubscription -import net.corda.node.services.messaging.RPCServer -import net.corda.nodeapi.RPCApi -import org.slf4j.LoggerFactory -import rx.Notification -import rx.Observable -import rx.Subscriber - -object RpcServerObservableSerializer : Serializer>() { - private object RpcObservableContextKey - - private val log = LoggerFactory.getLogger(javaClass) - - fun createContext(observableContext: RPCServer.ObservableContext): SerializationContext { - return SerializationDefaults.RPC_SERVER_CONTEXT.withProperty(RpcServerObservableSerializer.RpcObservableContextKey, observableContext) - } - - override fun read(kryo: Kryo?, input: Input?, type: Class>?): Observable { - throw UnsupportedOperationException() - } - - override fun write(kryo: Kryo, output: Output, observable: Observable<*>) { - val observableId = Trace.InvocationId.newInstance() - val observableContext = kryo.context[RpcObservableContextKey] as RPCServer.ObservableContext - output.writeInvocationId(observableId) - val observableWithSubscription = ObservableSubscription( - // We capture [observableContext] in the subscriber. Note that all synchronisation/kryo borrowing - // must be done again within the subscriber - subscription = observable.materialize().subscribe( - object : Subscriber>() { - override fun onNext(observation: Notification<*>) { - if (!isUnsubscribed) { - val message = RPCApi.ServerToClient.Observation( - id = observableId, - content = observation, - deduplicationIdentity = observableContext.deduplicationIdentity - ) - observableContext.sendMessage(message) - } - } - - override fun onError(exception: Throwable) { - log.error("onError called in materialize()d RPC Observable", exception) - } - - override fun onCompleted() { - observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> - if (observables != null) { - observables.remove(observableId) - if (observables.isEmpty()) { - null - } else { - observables - } - } else { - null - } - } - } - } - ) - ) - observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> - if (observables == null) { - hashSetOf(observableId) - } else { - observables.add(observableId) - observables - } - } - observableContext.observableMap.put(observableId, observableWithSubscription) - } - - private fun Output.writeInvocationId(id: Trace.InvocationId) { - writeString(id.value) - writeLong(id.timestamp.toEpochMilli()) - } -} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ObservableContextInterface.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ObservableContextInterface.kt index ce0521d686..32adde2e64 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ObservableContextInterface.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ObservableContextInterface.kt @@ -6,6 +6,11 @@ import net.corda.nodeapi.RPCApi import org.apache.activemq.artemis.api.core.SimpleString import java.util.concurrent.ConcurrentHashMap +/** + * An observable context is constructed on each RPC request. If subsequently a nested Observable is encountered this + * same context is propagated by the serialization context. This way all observations rooted in a single RPC will be + * muxed correctly. Note that the context construction itself is quite cheap. + */ interface ObservableContextInterface { fun sendMessage(serverToClient: RPCApi.ServerToClient) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index 7931bd158b..4574afffef 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -409,7 +409,7 @@ class RPCServer( /* * We construct an observable context on each RPC request. If subsequently a nested Observable is encountered this - * same context is propagated by the instrumented KryoPool. This way all observations rooted in a single RPC will be + * same context is propagated by serialization context. This way all observations rooted in a single RPC will be * muxed correctly. Note that the context construction itself is quite cheap. */ inner class ObservableContext( diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt index e661007909..4467a271d6 100644 --- a/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt @@ -60,7 +60,8 @@ class RoundTripObservableSerializerTests { @Test fun roundTripTest1() { - val serializationScheme = AMQPRoundTripRPCSerializationScheme(serializationContext) + val serializationScheme = AMQPRoundTripRPCSerializationScheme( + serializationContext, emptySet(), ConcurrentHashMap()) // Fake up a message ID, needs to be used on both "sides". The server setting it in the subscriptionMap, // the client as a property of the deserializer which, in the actual RPC client, is pulled off of diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/RpcServerObservableSerializerTests.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/RpcServerObservableSerializerTests.kt index 17fa31d95c..6778a2ef91 100644 --- a/node/src/test/kotlin/net/corda/node/internal/serialization/RpcServerObservableSerializerTests.kt +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/RpcServerObservableSerializerTests.kt @@ -35,7 +35,7 @@ class RpcServerObservableSerializerTests { @Test fun canSerializerBeRegistered() { - val sf = SerializerFactory(cl = javaClass.classLoader, whitelist = AllWhitelist) + val sf = SerializerFactory(AllWhitelist, javaClass.classLoader) try { sf.register(RpcServerObservableSerializer()) @@ -68,10 +68,7 @@ class RpcServerObservableSerializerTests { deduplicationIdentity = "thisIsATest", clientAddress = SimpleString(testClientAddress)) - val sf = SerializerFactory( - cl = javaClass.classLoader, - whitelist = AllWhitelist - ).apply { + val sf = SerializerFactory(AllWhitelist, javaClass.classLoader).apply { register(RpcServerObservableSerializer()) } diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt index 70ef79cc8b..73a48147ba 100644 --- a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt @@ -3,6 +3,7 @@ package net.corda.node.internal.serialization.testutils import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer import net.corda.core.context.Trace import net.corda.core.cordapp.Cordapp +import net.corda.core.serialization.ClassWhitelist import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationCustomSerializer import net.corda.node.serialization.amqp.RpcServerObservableSerializer @@ -11,30 +12,30 @@ import net.corda.nodeapi.internal.serialization.AllWhitelist import net.corda.nodeapi.internal.serialization.CordaSerializationMagic import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory +import java.util.concurrent.ConcurrentHashMap import net.corda.client.rpc.internal.ObservableContext as ClientObservableContext /** * Special serialization context for the round trip tests that allows for both server and client RPC * operations */ + + class AMQPRoundTripRPCSerializationScheme( private val serializationContext: SerializationContext, - cordappCustomSerializers: Set> = emptySet()) + cordappCustomSerializers: Set>, + serializerFactoriesForContexts: MutableMap, SerializerFactory>) : AbstractAMQPSerializationScheme( - cordappCustomSerializers + cordappCustomSerializers, serializerFactoriesForContexts ) { - constructor( - serializationContext: SerializationContext, - cordapps: List) : this(serializationContext, cordapps.customSerializers) - override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { - return SerializerFactory(cl = javaClass.classLoader, whitelist = AllWhitelist).apply { + return SerializerFactory(AllWhitelist, javaClass.classLoader).apply { register(RpcClientObservableSerializer) } } override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory { - return SerializerFactory(cl = javaClass.classLoader, whitelist = AllWhitelist).apply { + return SerializerFactory(AllWhitelist, javaClass.classLoader).apply { register(RpcServerObservableSerializer()) } } diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestSerializerFactoryFactory.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestSerializerFactoryFactory.kt deleted file mode 100644 index a1c20aae53..0000000000 --- a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestSerializerFactoryFactory.kt +++ /dev/null @@ -1,11 +0,0 @@ -package net.corda.node.internal.serialization.testutils - -import net.corda.core.serialization.SerializationContext -import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory -import net.corda.nodeapi.internal.serialization.amqp.SerializerFactoryFactory - -class TestSerializerFactoryFactory : SerializerFactoryFactory() { - override fun make(context: SerializationContext): SerializerFactory { - return super.make(context) - } -} \ No newline at end of file diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt index eaebc3b134..abc75b4eba 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt @@ -4,10 +4,9 @@ import com.nhaarman.mockito_kotlin.doNothing import com.nhaarman.mockito_kotlin.whenever import net.corda.core.DoNotImplement import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme -import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme import net.corda.core.serialization.internal.* -import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.node.serialization.amqp.AMQPServerSerializationScheme +import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.nodeapi.internal.serialization.* import net.corda.testing.core.SerializationEnvironmentRule import java.util.concurrent.ConcurrentHashMap @@ -31,10 +30,10 @@ fun withoutTestSerialization(callable: () -> T): T { // TODO: Delete this, s internal fun createTestSerializationEnv(label: String): SerializationEnvironmentImpl { val factory = SerializationFactoryImpl().apply { - registerScheme(KryoClientSerializationScheme()) - registerScheme(KryoServerSerializationScheme()) registerScheme(AMQPClientSerializationScheme(emptyList())) registerScheme(AMQPServerSerializationScheme(emptyList())) + // needed for checkpointing + registerScheme(KryoServerSerializationScheme()) } return object : SerializationEnvironmentImpl( factory, diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/DemoBench.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/DemoBench.kt index 2a76aa1fb3..4b909ed4f6 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/DemoBench.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/DemoBench.kt @@ -2,7 +2,6 @@ package net.corda.demobench import javafx.scene.image.Image import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme -import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme import net.corda.core.serialization.internal.SerializationEnvironmentImpl import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.demobench.views.DemoBenchView @@ -59,7 +58,6 @@ class DemoBench : App(DemoBenchView::class) { private fun initialiseSerialization() { nodeSerializationEnv = SerializationEnvironmentImpl( SerializationFactoryImpl().apply { - registerScheme(KryoClientSerializationScheme()) registerScheme(AMQPClientSerializationScheme(emptyList())) }, AMQP_P2P_CONTEXT) diff --git a/tools/shell/src/integration-test/kotlin/net/corda/tools/shell/SSHServerTest.kt b/tools/shell/src/integration-test/kotlin/net/corda/tools/shell/SSHServerTest.kt index 919acd78cf..9aceb2cff2 100644 --- a/tools/shell/src/integration-test/kotlin/net/corda/tools/shell/SSHServerTest.kt +++ b/tools/shell/src/integration-test/kotlin/net/corda/tools/shell/SSHServerTest.kt @@ -22,6 +22,7 @@ import org.assertj.core.api.Assertions.assertThat import org.bouncycastle.util.io.Streams import org.junit.Ignore import org.junit.Test +import java.lang.Thread.sleep import java.net.ConnectException import kotlin.test.assertTrue import kotlin.test.fail @@ -55,7 +56,7 @@ class SSHServerTest { // The driver will automatically pick up the annotated flows below driver { val node = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user), - customOverrides = mapOf("sshd" to mapOf("port" to 2222))) + customOverrides = mapOf("sshd" to mapOf("port" to 2222)) /*, startInSameProcess = true */) node.getOrThrow() val session = JSch().getSession("u", "localhost", 2222)