diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/BlacklistKotlinClosureTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/BlacklistKotlinClosureTest.kt index a223cf7210..5a68349f85 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/BlacklistKotlinClosureTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/BlacklistKotlinClosureTest.kt @@ -32,9 +32,9 @@ class BlacklistKotlinClosureTest { driver(DriverParameters(startNodesInProcess = true)) { val rpc = startNode(providedName = ALICE_NAME).getOrThrow().rpc val packet = Packet { EVIL } - assertThatExceptionOfType(KryoException::class.java) + assertThatExceptionOfType(RPCException::class.java) .isThrownBy { rpc.startFlow(::FlowC, packet) } - .withMessageContaining("is not annotated or on the whitelist, so cannot be used in serialization") + .withMessageContaining("is not on the whitelist or annotated with @CordaSerializable") } } } \ No newline at end of file diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index 6080107d3d..4eb3dc4d83 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -1,6 +1,6 @@ package net.corda.client.rpc -import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme +import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.client.rpc.internal.RPCClient import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.context.Actor @@ -11,7 +11,7 @@ import net.corda.core.utilities.NetworkHostAndPort import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.internal.config.SSLConfiguration -import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT +import net.corda.nodeapi.internal.serialization.AMQP_RPC_CLIENT_CONTEXT import java.time.Duration /** @@ -111,7 +111,9 @@ class CordaRPCClient private constructor( private val haAddressPool: List = emptyList() ) { @JvmOverloads - constructor(hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) : this(hostAndPort, configuration, null) + constructor(hostAndPort: NetworkHostAndPort, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) + : this(hostAndPort, configuration, null) /** * @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode. @@ -146,7 +148,7 @@ class CordaRPCClient private constructor( effectiveSerializationEnv } catch (e: IllegalStateException) { try { - KryoClientSerializationScheme.initialiseSerialization(classLoader) + AMQPClientSerializationScheme.initialiseSerialization() } catch (e: IllegalStateException) { // Race e.g. two of these constructed in parallel, ignore. } @@ -158,12 +160,12 @@ class CordaRPCClient private constructor( RPCClient( tcpTransport(ConnectionDirection.Outbound(), hostAndPort, config = sslConfiguration), configuration, - if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT) + if (classLoader != null) AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else AMQP_RPC_CLIENT_CONTEXT) } else { RPCClient(haAddressPool, sslConfiguration, configuration, - if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT) + if (classLoader != null) AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else AMQP_RPC_CLIENT_CONTEXT) } } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index d13116daed..c42138e3fe 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -10,7 +10,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.RPCException import net.corda.client.rpc.RPCSinceVersion -import net.corda.client.rpc.internal.serialization.kryo.RpcClientObservableSerializer +import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer import net.corda.core.context.Actor import net.corda.core.context.Trace import net.corda.core.context.Trace.InvocationId diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/AMQPClientSerializationScheme.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/AMQPClientSerializationScheme.kt new file mode 100644 index 0000000000..c82779fc03 --- /dev/null +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/AMQPClientSerializationScheme.kt @@ -0,0 +1,63 @@ +package net.corda.client.rpc.internal.serialization.amqp + +import net.corda.core.cordapp.Cordapp +import net.corda.core.serialization.ClassWhitelist +import net.corda.core.serialization.SerializationContext +import net.corda.core.serialization.SerializationCustomSerializer +import net.corda.core.serialization.internal.SerializationEnvironment +import net.corda.core.serialization.internal.SerializationEnvironmentImpl +import net.corda.core.serialization.internal.nodeSerializationEnv +import net.corda.nodeapi.internal.serialization.* +import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme +import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory +import net.corda.nodeapi.internal.serialization.amqp.amqpMagic +import java.util.concurrent.ConcurrentHashMap +import net.corda.nodeapi.internal.serialization.amqp.custom.RxNotificationSerializer + +/** + * When set as the serialization scheme for a process, sets it to be the Corda AMQP implementation. + * This scheme is for use by the RPC Client calls. + */ +class AMQPClientSerializationScheme( + cordappCustomSerializers: Set>, + serializerFactoriesForContexts: MutableMap, SerializerFactory> + ) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) { + constructor(cordapps: List) : this(cordapps.customSerializers, ConcurrentHashMap()) + + @Suppress("UNUSED") + constructor() : this(emptySet(), ConcurrentHashMap()) + + companion object { + /** Call from main only. */ + fun initialiseSerialization() { + nodeSerializationEnv = createSerializationEnv() + } + + fun createSerializationEnv(): SerializationEnvironment { + return SerializationEnvironmentImpl( + SerializationFactoryImpl().apply { + registerScheme(AMQPClientSerializationScheme(emptyList())) + }, + storageContext = AMQP_STORAGE_CONTEXT, + p2pContext = AMQP_P2P_CONTEXT, + rpcClientContext = AMQP_RPC_CLIENT_CONTEXT, + rpcServerContext = AMQP_RPC_SERVER_CONTEXT) + } + } + + override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase) = + magic == amqpMagic && ( + target == SerializationContext.UseCase.RPCClient || target == SerializationContext.UseCase.P2P) + + override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { + return SerializerFactory(context.whitelist, ClassLoader.getSystemClassLoader()).apply { + register(RpcClientObservableSerializer) + register(RpcClientCordaFutureSerializer(this)) + register(RxNotificationSerializer(this)) + } + } + + override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory { + throw UnsupportedOperationException() + } +} \ No newline at end of file diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientCordaFutureSerializer.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientCordaFutureSerializer.kt new file mode 100644 index 0000000000..258a2d66ec --- /dev/null +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientCordaFutureSerializer.kt @@ -0,0 +1,35 @@ +package net.corda.client.rpc.internal.serialization.amqp + +import net.corda.core.concurrent.CordaFuture +import net.corda.core.toFuture +import net.corda.core.toObservable +import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer +import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory +import rx.Observable +import java.io.NotSerializableException + +/** + * Serializer for [CordaFuture] instances that can only deserialize such objects (just as the server + * side can only serialize them). Futures will have been converted to an Rx [Observable] for serialization. + */ +class RpcClientCordaFutureSerializer (factory: SerializerFactory) + : CustomSerializer.Proxy, RpcClientCordaFutureSerializer.FutureProxy>( + CordaFuture::class.java, + RpcClientCordaFutureSerializer.FutureProxy::class.java, factory +) { + override fun fromProxy(proxy: FutureProxy): CordaFuture<*> { + try { + return proxy.observable.toFuture() + } catch (e: NotSerializableException) { + throw NotSerializableException("Failed to deserialize Future from proxy Observable - ${e.message}\n").apply { + initCause(e.cause) + } + } + } + + override fun toProxy(obj: CordaFuture<*>): FutureProxy { + throw UnsupportedOperationException() + } + + data class FutureProxy(val observable: Observable<*>) +} \ No newline at end of file diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientObservableSerializer.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientObservableSerializer.kt new file mode 100644 index 0000000000..18a73afa71 --- /dev/null +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientObservableSerializer.kt @@ -0,0 +1,127 @@ +package net.corda.client.rpc.internal.serialization.amqp + + +import net.corda.client.rpc.internal.ObservableContext +import net.corda.core.context.Trace +import net.corda.core.serialization.SerializationContext +import net.corda.nodeapi.RPCApi +import net.corda.nodeapi.internal.serialization.amqp.* +import org.apache.qpid.proton.codec.Data +import rx.Notification +import rx.Observable +import rx.subjects.UnicastSubject +import java.io.NotSerializableException +import java.lang.reflect.Type +import java.time.Instant +import java.util.concurrent.atomic.AtomicInteger +import javax.transaction.NotSupportedException + +/** + * Serializer for Rx[Observable] instances for the RPC Client library. Can only be used to deserialize such objects, + * just as the corresponding RPC server side code can only serialize them. Observables are only notionally serialized, + * what is actually sent is a reference to the observable that can then be subscribed to. + */ +object RpcClientObservableSerializer : CustomSerializer.Implements>(Observable::class.java) { + private object RpcObservableContextKey + + fun createContext( + serializationContext: SerializationContext, + observableContext: ObservableContext + ) = serializationContext.withProperty(RpcObservableContextKey, observableContext) + + private fun pinInSubscriptions(observable: Observable, hardReferenceStore: MutableSet>): Observable { + val refCount = AtomicInteger(0) + return observable.doOnSubscribe { + if (refCount.getAndIncrement() == 0) { + require(hardReferenceStore.add(observable)) { + "Reference store already contained reference $this on add" + } + } + }.doOnUnsubscribe { + if (refCount.decrementAndGet() == 0) { + require(hardReferenceStore.remove(observable)) { + "Reference store did not contain reference $this on remove" + } + } + } + } + + override val schemaForDocumentation = Schema( + listOf( + CompositeType( + name = type.toString(), + label = "", + provides = emptyList(), + descriptor = descriptor, + fields = listOf( + Field( + name = "observableId", + type = "string", + requires = emptyList(), + default = null, + label = null, + mandatory = true, + multiple = false), + Field( + name = "observableInstant", + type = "long", + requires = emptyList(), + default = null, + label = null, + mandatory = true, + multiple = false) + )))) + + /** + * Converts the serialized form, a blob, back into an Observable + */ + override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput, + context: SerializationContext + ): Observable<*> { + if (RpcObservableContextKey !in context.properties) { + throw NotSerializableException("Missing Observable Context Key on Client Context") + } + + val observableContext = + context.properties[RpcClientObservableSerializer.RpcObservableContextKey] as ObservableContext + + if (obj !is List<*>) throw NotSerializableException("Input must be a serialised list") + if (obj.size != 2) throw NotSerializableException("Expecting two elements, have ${obj.size}") + + val observableId: Trace.InvocationId = Trace.InvocationId((obj[0] as String), Instant.ofEpochMilli((obj[1] as Long))) + val observable = UnicastSubject.create>() + + require(observableContext.observableMap.getIfPresent(observableId) == null) { + "Multiple Observables arrived with the same ID $observableId" + } + + val rpcCallSite = getRpcCallSite(context, observableContext) + + observableContext.observableMap.put(observableId, observable) + observableContext.callSiteMap?.put(observableId, rpcCallSite) + + // We pin all Observables into a hard reference store (rooted in the RPC proxy) on subscription so that users + // don't need to store a reference to the Observables themselves. + return pinInSubscriptions(observable, observableContext.hardReferenceStore).doOnUnsubscribe { + // This causes Future completions to give warnings because the corresponding OnComplete sent from the server + // will arrive after the client unsubscribes from the observable and consequently invalidates the mapping. + // The unsubscribe is due to [ObservableToFuture]'s use of first(). + observableContext.observableMap.invalidate(observableId) + }.dematerialize() + } + + private fun getRpcCallSite(context: SerializationContext, observableContext: ObservableContext): Throwable? { + val rpcRequestOrObservableId = context.properties[RPCApi.RpcRequestOrObservableIdKey] as Trace.InvocationId + return observableContext.callSiteMap?.get(rpcRequestOrObservableId) + } + + override fun writeDescribedObject( + obj: Observable<*>, + data: Data, + type: Type, + output: SerializationOutput, + context: SerializationContext + ) { + throw NotSupportedException() + } +} \ No newline at end of file diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/KryoClientSerializationScheme.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/KryoClientSerializationScheme.kt deleted file mode 100644 index 06ec72e244..0000000000 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/KryoClientSerializationScheme.kt +++ /dev/null @@ -1,50 +0,0 @@ -package net.corda.client.rpc.internal.serialization.kryo - -import com.esotericsoftware.kryo.pool.KryoPool -import net.corda.core.serialization.SerializationContext -import net.corda.nodeapi.internal.serialization.CordaSerializationMagic -import net.corda.core.serialization.internal.SerializationEnvironment -import net.corda.core.serialization.internal.SerializationEnvironmentImpl -import net.corda.core.serialization.internal.nodeSerializationEnv -import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT -import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT -import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl -import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme -import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme -import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer -import net.corda.nodeapi.internal.serialization.kryo.kryoMagic -import net.corda.nodeapi.internal.serialization.kryo.RPCKryo - -class KryoClientSerializationScheme : AbstractKryoSerializationScheme() { - override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { - return magic == kryoMagic && (target == SerializationContext.UseCase.RPCClient || target == SerializationContext.UseCase.P2P) - } - - override fun rpcClientKryoPool(context: SerializationContext): KryoPool { - return KryoPool.Builder { - DefaultKryoCustomizer.customize(RPCKryo(RpcClientObservableSerializer, context), publicKeySerializer).apply { - classLoader = context.deserializationClassLoader - } - }.build() - } - - // We're on the client and don't have access to server classes. - override fun rpcServerKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException() - - companion object { - /** Call from main only. */ - fun initialiseSerialization(classLoader: ClassLoader? = null) { - nodeSerializationEnv = createSerializationEnv(classLoader) - } - - fun createSerializationEnv(classLoader: ClassLoader? = null): SerializationEnvironment { - return SerializationEnvironmentImpl( - SerializationFactoryImpl().apply { - registerScheme(KryoClientSerializationScheme()) - registerScheme(AMQPClientSerializationScheme(emptyList())) - }, - if (classLoader != null) AMQP_P2P_CONTEXT.withClassLoader(classLoader) else AMQP_P2P_CONTEXT, - rpcClientContext = if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT) - } - } -} \ No newline at end of file diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/RpcClientObservableSerializer.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/RpcClientObservableSerializer.kt deleted file mode 100644 index 99749093e2..0000000000 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/kryo/RpcClientObservableSerializer.kt +++ /dev/null @@ -1,75 +0,0 @@ -package net.corda.client.rpc.internal.serialization.kryo - -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.Serializer -import com.esotericsoftware.kryo.io.Input -import com.esotericsoftware.kryo.io.Output -import net.corda.client.rpc.internal.ObservableContext -import net.corda.core.context.Trace -import net.corda.core.serialization.SerializationContext -import net.corda.nodeapi.RPCApi -import rx.Notification -import rx.Observable -import rx.subjects.UnicastSubject -import java.time.Instant -import java.util.concurrent.atomic.AtomicInteger - -/** - * A [Serializer] to deserialise Observables once the corresponding Kryo instance has been provided with an [ObservableContext]. - */ -object RpcClientObservableSerializer : Serializer>() { - private object RpcObservableContextKey - - fun createContext(serializationContext: SerializationContext, observableContext: ObservableContext): SerializationContext { - return serializationContext.withProperty(RpcObservableContextKey, observableContext) - } - - private fun pinInSubscriptions(observable: Observable, hardReferenceStore: MutableSet>): Observable { - val refCount = AtomicInteger(0) - return observable.doOnSubscribe { - if (refCount.getAndIncrement() == 0) { - require(hardReferenceStore.add(observable)) { "Reference store already contained reference $this on add" } - } - }.doOnUnsubscribe { - if (refCount.decrementAndGet() == 0) { - require(hardReferenceStore.remove(observable)) { "Reference store did not contain reference $this on remove" } - } - } - } - - override fun read(kryo: Kryo, input: Input, type: Class>): Observable { - val observableContext = kryo.context[RpcObservableContextKey] as ObservableContext - val observableId = input.readInvocationId() ?: throw IllegalStateException("Unable to read invocationId from Input.") - val observable = UnicastSubject.create>() - require(observableContext.observableMap.getIfPresent(observableId) == null) { - "Multiple Observables arrived with the same ID $observableId" - } - val rpcCallSite = getRpcCallSite(kryo, observableContext) - observableContext.observableMap.put(observableId, observable) - observableContext.callSiteMap?.put(observableId, rpcCallSite) - // We pin all Observables into a hard reference store (rooted in the RPC proxy) on subscription so that users - // don't need to store a reference to the Observables themselves. - return pinInSubscriptions(observable, observableContext.hardReferenceStore).doOnUnsubscribe { - // This causes Future completions to give warnings because the corresponding OnComplete sent from the server - // will arrive after the client unsubscribes from the observable and consequently invalidates the mapping. - // The unsubscribe is due to [ObservableToFuture]'s use of first(). - observableContext.observableMap.invalidate(observableId) - }.dematerialize() - } - - private fun Input.readInvocationId(): Trace.InvocationId? { - - val value = readString() ?: return null - val timestamp = readLong() - return Trace.InvocationId(value, Instant.ofEpochMilli(timestamp)) - } - - override fun write(kryo: Kryo, output: Output, observable: Observable<*>) { - throw UnsupportedOperationException("Cannot serialise Observables on the client side") - } - - private fun getRpcCallSite(kryo: Kryo, observableContext: ObservableContext): Throwable? { - val rpcRequestOrObservableId = kryo.context[RPCApi.RpcRequestOrObservableIdKey] as Trace.InvocationId - return observableContext.callSiteMap?.get(rpcRequestOrObservableId) - } -} diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/ClientRPCInfrastructureTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/ClientRPCInfrastructureTests.kt index 43cd0c1ce1..d2a0a2c977 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/ClientRPCInfrastructureTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/ClientRPCInfrastructureTests.kt @@ -1,5 +1,6 @@ package net.corda.client.rpc +import net.corda.core.CordaRuntimeException import net.corda.core.concurrent.CordaFuture import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.openFuture @@ -11,6 +12,7 @@ import net.corda.testing.node.internal.RPCDriverDSL import net.corda.testing.node.internal.rpcDriver import net.corda.testing.node.internal.rpcTestUser import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -77,9 +79,10 @@ class ClientRPCInfrastructureTests : AbstractRPCTest() { // Does nothing, doesn't throw. proxy.void() - assertEquals("Barf!", assertFailsWith { - proxy.barf() - }.message) + assertThatThrownBy { proxy.barf() } + .isInstanceOf(CordaRuntimeException::class.java) + .hasMessage("java.lang.IllegalArgumentException: Barf!") + assertEquals("hi 5", proxy.someCalculation("hi", 5)) } diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCFailureTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCFailureTests.kt index fec7adae5d..2ab308af1d 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCFailureTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCFailureTests.kt @@ -1,6 +1,6 @@ package net.corda.client.rpc -import com.esotericsoftware.kryo.KryoException +import net.corda.core.CordaRuntimeException import net.corda.core.concurrent.CordaFuture import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.* @@ -48,23 +48,29 @@ class RPCFailureTests { @Test fun `kotlin NPE`() = rpc { - assertThatThrownBy { it.kotlinNPE() }.isInstanceOf(KotlinNullPointerException::class.java) + assertThatThrownBy { it.kotlinNPE() }.isInstanceOf(CordaRuntimeException::class.java) + .hasMessageContaining("kotlin.KotlinNullPointerException") } @Test fun `kotlin NPE async`() = rpc { val future = it.kotlinNPEAsync() - assertThatThrownBy { future.getOrThrow() }.isInstanceOf(KotlinNullPointerException::class.java) + assertThatThrownBy { future.getOrThrow() }.isInstanceOf(CordaRuntimeException::class.java) + .hasMessageContaining("kotlin.KotlinNullPointerException") } @Test - fun unserializable() = rpc { - assertThatThrownBy { it.getUnserializable() }.isInstanceOf(KryoException::class.java) + fun `unserializable`() = rpc { + assertThatThrownBy { it.getUnserializable() }.isInstanceOf(CordaRuntimeException::class.java) + .hasMessageContaining("java.io.NotSerializableException:") + .hasMessageContaining("Unserializable is not on the whitelist or annotated with @CordaSerializable.") } @Test fun `unserializable async`() = rpc { val future = it.getUnserializableAsync() - assertThatThrownBy { future.getOrThrow() }.isInstanceOf(KryoException::class.java) + assertThatThrownBy { future.getOrThrow() }.isInstanceOf(CordaRuntimeException::class.java) + .hasMessageContaining("java.io.NotSerializableException:") + .hasMessageContaining("Unserializable is not on the whitelist or annotated with @CordaSerializable.") } } diff --git a/core/src/main/kotlin/net/corda/core/concurrent/CordaFuture.kt b/core/src/main/kotlin/net/corda/core/concurrent/CordaFuture.kt index 4977f3a34b..2f6d95795a 100644 --- a/core/src/main/kotlin/net/corda/core/concurrent/CordaFuture.kt +++ b/core/src/main/kotlin/net/corda/core/concurrent/CordaFuture.kt @@ -1,5 +1,6 @@ package net.corda.core.concurrent +import net.corda.core.serialization.CordaSerializable import java.util.concurrent.CompletableFuture import java.util.concurrent.Future @@ -7,6 +8,7 @@ import java.util.concurrent.Future * Same as [Future] with additional methods to provide some of the features of [java.util.concurrent.CompletableFuture] while minimising the API surface area. * In Kotlin, to avoid compile errors, whenever CordaFuture is used in a parameter or extension method receiver type, its type parameter should be specified with out variance. */ +@CordaSerializable interface CordaFuture : Future { /** * Run the given callback when this future is done, on the completion thread. diff --git a/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt b/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt index ad4d40ec16..4d540d69c8 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt @@ -10,6 +10,7 @@ import rx.Observable * [FlowHandle] is a serialisable handle for the started flow, parameterised by the type of the flow's return value. */ @DoNotImplement +@CordaSerializable interface FlowHandle : AutoCloseable { /** * The started state machine's ID. diff --git a/core/src/main/kotlin/net/corda/core/serialization/SerializationAPI.kt b/core/src/main/kotlin/net/corda/core/serialization/SerializationAPI.kt index dd94c74898..c9b8cc686c 100644 --- a/core/src/main/kotlin/net/corda/core/serialization/SerializationAPI.kt +++ b/core/src/main/kotlin/net/corda/core/serialization/SerializationAPI.kt @@ -185,6 +185,15 @@ interface SerializationContext { enum class UseCase { P2P, RPCServer, RPCClient, Storage, Checkpoint, Testing } } +/** + * Set of well known properties that may be set on a serialization context. This doesn't preclude + * others being set that aren't keyed on this enumeration, but for general use properties adding a + * well known key here is preferred. + */ +enum class ContextPropertyKeys { + SERIALIZERS +} + /** * Global singletons to be used as defaults that are injected elsewhere (generally, in the node or in RPC client). */ diff --git a/core/src/main/kotlin/net/corda/core/serialization/SerializationCustomSerializer.kt b/core/src/main/kotlin/net/corda/core/serialization/SerializationCustomSerializer.kt index 3de86d449d..05852302ce 100644 --- a/core/src/main/kotlin/net/corda/core/serialization/SerializationCustomSerializer.kt +++ b/core/src/main/kotlin/net/corda/core/serialization/SerializationCustomSerializer.kt @@ -6,7 +6,7 @@ package net.corda.core.serialization * a proxy serializer can be written that extends this type whose purpose is to move between those an * unserializable types and an intermediate representation. * - * NOTE: The proxy object should be specified as a seperate class. However, this can be defined within the + * NOTE: The proxy object should be specified as a separate class. However, this can be defined within the * scope of the custom serializer. */ interface SerializationCustomSerializer { diff --git a/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt b/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt index f725babdf2..a62ec2333d 100644 --- a/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt +++ b/core/src/test/kotlin/net/corda/core/utilities/KotlinUtilsTest.kt @@ -2,16 +2,20 @@ package net.corda.core.utilities import com.esotericsoftware.kryo.KryoException import net.corda.core.crypto.random63BitValue -import net.corda.core.serialization.CordaSerializable -import net.corda.core.serialization.deserialize -import net.corda.core.serialization.serialize +import net.corda.core.serialization.* import net.corda.nodeapi.internal.serialization.KRYO_CHECKPOINT_CONTEXT +import net.corda.nodeapi.internal.serialization.SerializationContextImpl +import net.corda.nodeapi.internal.serialization.kryo.kryoMagic import net.corda.testing.core.SerializationEnvironmentRule import org.assertj.core.api.Assertions.assertThat import org.junit.Rule import org.junit.Test import org.junit.rules.ExpectedException +object EmptyWhitelist : ClassWhitelist { + override fun hasListed(type: Class<*>): Boolean = false +} + class KotlinUtilsTest { @Rule @JvmField @@ -20,6 +24,14 @@ class KotlinUtilsTest { @Rule val expectedEx: ExpectedException = ExpectedException.none() + val KRYO_CHECKPOINT_NOWHITELIST_CONTEXT = SerializationContextImpl(kryoMagic, + SerializationDefaults.javaClass.classLoader, + EmptyWhitelist, + emptyMap(), + true, + SerializationContext.UseCase.Checkpoint, + null) + @Test fun `transient property which is null`() { val test = NullTransientProperty() @@ -43,7 +55,7 @@ class KotlinUtilsTest { expectedEx.expect(KryoException::class.java) expectedEx.expectMessage("is not annotated or on the whitelist, so cannot be used in serialization") val original = NonCapturingTransientProperty() - original.serialize(context = KRYO_CHECKPOINT_CONTEXT).deserialize() + original.serialize(context = KRYO_CHECKPOINT_CONTEXT).deserialize(context = KRYO_CHECKPOINT_NOWHITELIST_CONTEXT) } @Test @@ -61,8 +73,10 @@ class KotlinUtilsTest { fun `deserialise transient property with capturing lambda`() { expectedEx.expect(KryoException::class.java) expectedEx.expectMessage("is not annotated or on the whitelist, so cannot be used in serialization") + val original = CapturingTransientProperty("Hello") - original.serialize(context = KRYO_CHECKPOINT_CONTEXT).deserialize() + + original.serialize(context = KRYO_CHECKPOINT_CONTEXT).deserialize(context = KRYO_CHECKPOINT_NOWHITELIST_CONTEXT) } private class NullTransientProperty { diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 331b6b5158..51e3d5bd89 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -6,6 +6,10 @@ release, see :doc:`upgrade-notes`. Unreleased ========== + +* RPC Framework moved from Kryo to the Corda AMQP implementation [Corda-847]. This completes the removal + of ``Kryo`` from general use within Corda, remaining only for use in flow checkpointing. + * Set co.paralleluniverse.fibers.verifyInstrumentation=true in devMode. * Node will now gracefully fail to start if one of the required ports is already in use. diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/exceptions/InternalNodeException.kt b/node-api/src/main/kotlin/net/corda/nodeapi/exceptions/InternalNodeException.kt index 0751e2681a..78e6dee579 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/exceptions/InternalNodeException.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/exceptions/InternalNodeException.kt @@ -28,7 +28,9 @@ class InternalNodeException(message: String) : CordaRuntimeException(message) { (wrapped as? CordaRuntimeException)?.setCause(null) return when { whitelisted.any { it.isInstance(wrapped) } -> wrapped - else -> InternalNodeException(DEFAULT_MESSAGE) + else -> InternalNodeException(DEFAULT_MESSAGE).apply { + stackTrace = emptyArray() + } } } } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt index 395c08ddfa..30958d8546 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt @@ -23,10 +23,11 @@ import net.corda.nodeapi.internal.ContractsJarFile import net.corda.nodeapi.internal.DEV_ROOT_CA import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX -import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT import net.corda.nodeapi.internal.serialization.CordaSerializationMagic +import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl -import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme +import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme +import net.corda.nodeapi.internal.serialization.amqp.amqpMagic import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme import net.corda.nodeapi.internal.serialization.kryo.kryoMagic import java.nio.file.Path @@ -278,7 +279,7 @@ class NetworkBootstrapper { _contextSerializationEnv.set(SerializationEnvironmentImpl( SerializationFactoryImpl().apply { registerScheme(KryoParametersSerializationScheme) - registerScheme(AMQPServerSerializationScheme()) + registerScheme(AMQPParametersSerializationScheme) }, AMQP_P2P_CONTEXT) ) @@ -292,4 +293,13 @@ class NetworkBootstrapper { override fun rpcClientKryoPool(context: SerializationContext) = throw UnsupportedOperationException() override fun rpcServerKryoPool(context: SerializationContext) = throw UnsupportedOperationException() } + + private object AMQPParametersSerializationScheme : AbstractAMQPSerializationScheme(emptyList()) { + override fun rpcClientSerializerFactory(context: SerializationContext) = throw UnsupportedOperationException() + override fun rpcServerSerializerFactory(context: SerializationContext) = throw UnsupportedOperationException() + + override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { + return magic == amqpMagic && target == SerializationContext.UseCase.P2P + } + } } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt index c3a3fcce91..bb079ce7e4 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt @@ -352,7 +352,7 @@ internal class ConnectionStateMachine(serverMode: Boolean, val connection = event.connection val channel = connection?.context as? Channel if (channel != null) { - val appProperties = HashMap(amqpMessage.applicationProperties.value as Map) + val appProperties = HashMap(amqpMessage.applicationProperties.value) appProperties["_AMQ_VALIDATED_USER"] = remoteLegalName val localAddress = channel.localAddress() as InetSocketAddress val remoteAddress = channel.remoteAddress() as InetSocketAddress diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ClientContexts.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ClientContexts.kt index e4e2f53417..69a3efd3e7 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ClientContexts.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ClientContexts.kt @@ -13,13 +13,7 @@ import net.corda.nodeapi.internal.serialization.kryo.kryoMagic * servers from trying to instantiate any of them. */ -val KRYO_RPC_CLIENT_CONTEXT = SerializationContextImpl(kryoMagic, - SerializationDefaults.javaClass.classLoader, - GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), - emptyMap(), - true, - SerializationContext.UseCase.RPCClient, - null) + val AMQP_RPC_CLIENT_CONTEXT = SerializationContextImpl(amqpMagic, SerializationDefaults.javaClass.classLoader, GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt index a94aad8786..7e660ca78e 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt @@ -114,8 +114,9 @@ open class SerializationFactoryImpl( val lookupKey = magic to target return schemes.computeIfAbsent(lookupKey) { registeredSchemes.filter { it.canDeserializeVersion(magic, target) }.forEach { return@computeIfAbsent it } // XXX: Not single? - logger.warn("Cannot find serialization scheme for: $lookupKey, registeredSchemes are: $registeredSchemes") - throw UnsupportedOperationException("Serialization scheme not supported.") + logger.warn("Cannot find serialization scheme for: [$lookupKey, " + + "${if (magic == amqpMagic) "AMQP" else if (magic == kryoMagic) "Kryo" else "UNKNOWN MAGIC"}] registeredSchemes are: $registeredSchemes") + throw UnsupportedOperationException("Serialization scheme $lookupKey not supported.") } to magic } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt index ae2e8cdb67..2eb7485d6a 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt @@ -17,13 +17,6 @@ import net.corda.nodeapi.internal.serialization.kryo.kryoMagic * MUST be kept separate! */ -val KRYO_RPC_SERVER_CONTEXT = SerializationContextImpl(kryoMagic, - SerializationDefaults.javaClass.classLoader, - GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), - emptyMap(), - true, - SerializationContext.UseCase.RPCServer, - null) val AMQP_STORAGE_CONTEXT = SerializationContextImpl(amqpMagic, SerializationDefaults.javaClass.classLoader, diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt index 505228a377..edba2095ed 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt @@ -5,6 +5,7 @@ package net.corda.nodeapi.internal.serialization.amqp import io.github.lukehutch.fastclasspathscanner.FastClasspathScanner import net.corda.core.cordapp.Cordapp import net.corda.core.internal.objectOrNewInstance +import net.corda.core.internal.uncheckedCast import net.corda.core.serialization.* import net.corda.core.utilities.ByteSequence import net.corda.nodeapi.internal.serialization.CordaSerializationMagic @@ -12,7 +13,6 @@ import net.corda.nodeapi.internal.serialization.DefaultWhitelist import net.corda.nodeapi.internal.serialization.MutableClassWhitelist import net.corda.nodeapi.internal.serialization.SerializationScheme import java.lang.reflect.Modifier -import java.security.PublicKey import java.util.* import java.util.concurrent.ConcurrentHashMap @@ -118,6 +118,12 @@ abstract class AbstractAMQPSerializationScheme( factory.registerExternal(CorDappCustomSerializer(customSerializer, factory)) } } + + context.properties[ContextPropertyKeys.SERIALIZERS]?.apply { + uncheckedCast>>(this).forEach { + factory.register(it) + } + } } /* @@ -131,7 +137,9 @@ abstract class AbstractAMQPSerializationScheme( protected abstract fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory protected abstract fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory - protected open val publicKeySerializer: CustomSerializer.Implements = net.corda.nodeapi.internal.serialization.amqp.custom.PublicKeySerializer + + // Not used as a simple direct import to facilitate testing + open val publicKeySerializer : CustomSerializer<*> = net.corda.nodeapi.internal.serialization.amqp.custom.PublicKeySerializer private fun getSerializerFactory(context: SerializationContext): SerializerFactory { return serializerFactoriesForContexts.computeIfAbsent(Pair(context.whitelist, context.deserializationClassLoader)) { @@ -162,52 +170,3 @@ abstract class AbstractAMQPSerializationScheme( protected fun canDeserializeVersion(magic: CordaSerializationMagic) = magic == amqpMagic } - -// TODO: This will eventually cover server RPC as well and move to node module, but for now this is not implemented -class AMQPServerSerializationScheme( - cordappCustomSerializers: Set>, - serializerFactoriesForContexts: MutableMap, SerializerFactory> -) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) { - constructor(cordapps: List) : this(cordapps.customSerializers, ConcurrentHashMap()) - - constructor() : this(emptySet(), ConcurrentHashMap()) - - override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { - throw UnsupportedOperationException() - } - - override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory { - throw UnsupportedOperationException() - } - - override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { - return canDeserializeVersion(magic) && - (target == SerializationContext.UseCase.P2P || target == SerializationContext.UseCase.Storage) - } - -} - -// TODO: This will eventually cover client RPC as well and move to client module, but for now this is not implemented -class AMQPClientSerializationScheme( - cordappCustomSerializers: Set>, - serializerFactoriesForContexts: MutableMap, SerializerFactory> -) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) { - constructor(cordapps: List) : this(cordapps.customSerializers, ConcurrentHashMap()) - - constructor() : this(emptySet(), ConcurrentHashMap()) - - override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { - throw UnsupportedOperationException() - } - - override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory { - throw UnsupportedOperationException() - } - - override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { - return canDeserializeVersion(magic) && - (target == SerializationContext.UseCase.P2P || target == SerializationContext.UseCase.Storage) - } - -} - diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializerFactory.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializerFactory.kt index 8a56baf1b0..1ea7daf020 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializerFactory.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializerFactory.kt @@ -285,7 +285,11 @@ open class SerializerFactory( } private fun makeClassSerializer(clazz: Class<*>, type: Type, declaredType: Type): AMQPSerializer = serializersByType.computeIfAbsent(type) { - if (isPrimitive(clazz)) { + if (clazz.isSynthetic) { + // Explicitly ban synthetic classes, we have no way of recreating them when deserializing. This also + // captures Lambda expressions and other anonymous functions + throw NotSerializableException(type.typeName) + } else if (isPrimitive(clazz)) { AMQPPrimitiveSerializer(clazz) } else { findCustomSerializer(clazz, declaredType) ?: run { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/ClassSerializer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/ClassSerializer.kt index 99dd1b9ff6..7c399d5a68 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/ClassSerializer.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/ClassSerializer.kt @@ -2,7 +2,6 @@ package net.corda.nodeapi.internal.serialization.amqp.custom import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory -import net.corda.nodeapi.internal.serialization.amqp.custom.ClassSerializer.ClassProxy /** * A serializer for [Class] that uses [ClassProxy] proxy object to write out diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/InputStreamSerializer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/InputStreamSerializer.kt index faa4f5eebf..0ab61b41c8 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/InputStreamSerializer.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/InputStreamSerializer.kt @@ -39,7 +39,7 @@ object InputStreamSerializer : CustomSerializer.Implements(InputStr override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput, context: SerializationContext - ): InputStream { + ) : InputStream { val bits = input.readObject(obj, schemas, ByteArray::class.java, context) as ByteArray return bits.inputStream() } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/RxNotificationSerializer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/RxNotificationSerializer.kt new file mode 100644 index 0000000000..07dcd561a8 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/custom/RxNotificationSerializer.kt @@ -0,0 +1,28 @@ +package net.corda.nodeapi.internal.serialization.amqp.custom + +import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer +import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory +import rx.Notification + +class RxNotificationSerializer( + factory: SerializerFactory +) : CustomSerializer.Proxy, RxNotificationSerializer.Proxy>( + Notification::class.java, + Proxy::class.java, + factory +) { + data class Proxy( + val kind: Notification.Kind, + val t: Throwable?, + val value: Any?) + + override fun toProxy(obj: Notification<*>) = Proxy(obj.kind, obj.throwable, obj.value) + + override fun fromProxy(proxy: Proxy): Notification<*> { + return when (proxy.kind) { + Notification.Kind.OnCompleted -> Notification.createOnCompleted() + Notification.Kind.OnError -> Notification.createOnError(proxy.t) + Notification.Kind.OnNext -> Notification.createOnNext(proxy.value) + } + } +} \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/Kryo.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/Kryo.kt index 7a5111fd93..df8c3814ac 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/Kryo.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/Kryo.kt @@ -7,7 +7,6 @@ import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer import com.esotericsoftware.kryo.serializers.FieldSerializer import com.esotericsoftware.kryo.util.MapReferenceResolver -import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.PrivacySalt import net.corda.core.crypto.Crypto import net.corda.core.crypto.SecureHash @@ -18,8 +17,6 @@ import net.corda.core.serialization.SerializationContext.UseCase.Checkpoint import net.corda.core.serialization.SerializationContext.UseCase.Storage import net.corda.core.serialization.SerializeAsTokenContext import net.corda.core.serialization.SerializedBytes -import net.corda.core.toFuture -import net.corda.core.toObservable import net.corda.core.transactions.* import net.corda.core.utilities.OpaqueBytes import net.corda.nodeapi.internal.crypto.X509CertificateFactory @@ -27,7 +24,6 @@ import net.corda.nodeapi.internal.serialization.CordaClassResolver import net.corda.nodeapi.internal.serialization.serializationContextKey import org.slf4j.Logger import org.slf4j.LoggerFactory -import rx.Observable import java.io.InputStream import java.lang.reflect.InvocationTargetException import java.security.PrivateKey @@ -46,39 +42,16 @@ import kotlin.reflect.jvm.isAccessible import kotlin.reflect.jvm.javaType /** - * Serialization utilities, using the Kryo framework with a custom serialiser for immutable data classes and a dead - * simple, totally non-extensible binary (sub)format. - * - * This is NOT what should be used in any final platform product, rather, the final state should be a precisely - * specified and standardised binary format with attention paid to anti-malleability, versioning and performance. - * FIX SBE is a potential candidate: it prioritises performance over convenience and was designed for HFT. Google - * Protocol Buffers with a minor tightening to make field reordering illegal is another possibility. - * - * FIX SBE: - * https://real-logic.github.io/simple-binary-encoding/ - * http://mechanical-sympathy.blogspot.co.at/2014/05/simple-binary-encoding.html - * Protocol buffers: - * https://developers.google.com/protocol-buffers/ - * - * But for now we use Kryo to maximise prototyping speed. - * - * Note that this code ignores *ALL* concerns beyond convenience, in particular it ignores: - * - * - Performance - * - Security - * - * This code will happily deserialise literally anything, including malicious streams that would reconstruct classes - * in invalid states, thus violating system invariants. It isn't designed to handle malicious streams and therefore, - * isn't usable beyond the prototyping stage. But that's fine: we can revisit serialisation technologies later after - * a formal evaluation process. - * - * We now distinguish between internal, storage related Kryo and external, network facing Kryo. We presently use - * some non-whitelisted classes as part of internal storage. - * TODO: eliminate internal, storage related whitelist issues, such as private keys in blob storage. + * Serialization utilities, using the Kryo framework with a custom serializer for immutable data classes and a dead + * simple, totally non-extensible binary (sub)format. Used exclusively within Corda for checkpointing flows as + * it will happily deserialise literally anything, including malicious streams that would reconstruct classes + * in invalid states and thus violating system invariants. In the context of checkpointing a Java stack, this is + * absolutely the functionality we desire, for a stable binary wire format and persistence technology, we have + * the AMQP implementation. */ /** - * A serialiser that avoids writing the wrapper class to the byte stream, thus ensuring [SerializedBytes] is a pure + * A serializer that avoids writing the wrapper class to the byte stream, thus ensuring [SerializedBytes] is a pure * type safety hack. */ object SerializedBytesSerializer : Serializer>() { @@ -391,44 +364,6 @@ open class CordaKryo(classResolver: ClassResolver) : Kryo(classResolver, MapRefe } } -/** - * The Kryo used for the RPC wire protocol. - */ -// Every type in the wire protocol is listed here explicitly. -// This is annoying to write out, but will make it easier to formalise the wire protocol when the time comes, -// because we can see everything we're using in one place. -class RPCKryo(observableSerializer: Serializer>, serializationContext: SerializationContext) : CordaKryo(CordaClassResolver(serializationContext)) { - init { - DefaultKryoCustomizer.customize(this) - - // RPC specific classes - register(InputStream::class.java, InputStreamSerializer) - register(Observable::class.java, observableSerializer) - register(CordaFuture::class, - read = { kryo, input -> observableSerializer.read(kryo, input, Observable::class.java).toFuture() }, - write = { kryo, output, obj -> observableSerializer.write(kryo, output, obj.toObservable()) } - ) - } - - override fun getRegistration(type: Class<*>): Registration { - if (Observable::class.java != type && Observable::class.java.isAssignableFrom(type)) { - return super.getRegistration(Observable::class.java) - } - if (InputStream::class.java != type && InputStream::class.java.isAssignableFrom(type)) { - return super.getRegistration(InputStream::class.java) - } - if (CordaFuture::class.java != type && CordaFuture::class.java.isAssignableFrom(type)) { - return super.getRegistration(CordaFuture::class.java) - } - type.requireExternal("RPC not allowed to deserialise internal classes") - return super.getRegistration(type) - } - - private fun Class<*>.requireExternal(msg: String) { - require(!name.startsWith("net.corda.node.") && ".internal" !in name) { "$msg: $name" } - } -} - inline fun Kryo.register( type: KClass, crossinline read: (Kryo, Input) -> T, diff --git a/node-api/src/test/java/net/corda/nodeapi/internal/serialization/ForbiddenLambdaSerializationTests.java b/node-api/src/test/java/net/corda/nodeapi/internal/serialization/ForbiddenLambdaSerializationTests.java index db8da10b45..8ae47ea391 100644 --- a/node-api/src/test/java/net/corda/nodeapi/internal/serialization/ForbiddenLambdaSerializationTests.java +++ b/node-api/src/test/java/net/corda/nodeapi/internal/serialization/ForbiddenLambdaSerializationTests.java @@ -4,13 +4,13 @@ import com.google.common.collect.Maps; import net.corda.core.serialization.SerializationContext; import net.corda.core.serialization.SerializationFactory; import net.corda.core.serialization.SerializedBytes; -import net.corda.nodeapi.internal.serialization.kryo.CordaClosureBlacklistSerializer; -import net.corda.nodeapi.internal.serialization.kryo.KryoSerializationSchemeKt; +import net.corda.nodeapi.internal.serialization.amqp.SchemaKt; import net.corda.testing.core.SerializationEnvironmentRule; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import java.io.NotSerializableException; import java.io.Serializable; import java.util.EnumSet; import java.util.concurrent.Callable; @@ -33,20 +33,17 @@ public final class ForbiddenLambdaSerializationTests { @Test public final void serialization_fails_for_serializable_java_lambdas() { contexts.forEach(ctx -> { - SerializationContext context = new SerializationContextImpl(KryoSerializationSchemeKt.getKryoMagic(), + SerializationContext context = new SerializationContextImpl(SchemaKt.getAmqpMagic(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx, null); String value = "Hey"; Callable target = (Callable & Serializable) () -> value; Throwable throwable = catchThrowable(() -> serialize(target, context)); - assertThat(throwable).isNotNull(); - assertThat(throwable).isInstanceOf(IllegalArgumentException.class); - if (ctx != SerializationContext.UseCase.RPCServer && ctx != SerializationContext.UseCase.Storage) { - assertThat(throwable).hasMessage(CordaClosureBlacklistSerializer.ERROR_MESSAGE); - } else { - assertThat(throwable).hasMessageContaining("RPC not allowed to deserialise internal classes"); - } + assertThat(throwable) + .isNotNull() + .isInstanceOf(NotSerializableException.class) + .hasMessageContaining(getClass().getName()); }); } @@ -54,21 +51,17 @@ public final class ForbiddenLambdaSerializationTests { @SuppressWarnings("unchecked") public final void serialization_fails_for_not_serializable_java_lambdas() { contexts.forEach(ctx -> { - SerializationContext context = new SerializationContextImpl(KryoSerializationSchemeKt.getKryoMagic(), + SerializationContext context = new SerializationContextImpl(SchemaKt.getAmqpMagic(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx, null); String value = "Hey"; Callable target = () -> value; Throwable throwable = catchThrowable(() -> serialize(target, context)); - assertThat(throwable).isNotNull(); - assertThat(throwable).isInstanceOf(IllegalArgumentException.class); - assertThat(throwable).isInstanceOf(IllegalArgumentException.class); - if (ctx != SerializationContext.UseCase.RPCServer && ctx != SerializationContext.UseCase.Storage) { - assertThat(throwable).hasMessage(CordaClosureBlacklistSerializer.ERROR_MESSAGE); - } else { - assertThat(throwable).hasMessageContaining("RPC not allowed to deserialise internal classes"); - } + assertThat(throwable) + .isNotNull() + .isInstanceOf(NotSerializableException.class) + .hasMessageContaining(getClass().getName()); }); } diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt index 5986848c74..ed9345c00e 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt @@ -8,13 +8,13 @@ import net.corda.core.internal.div import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize -import net.corda.node.serialization.kryo.KryoServerSerializationScheme +import net.corda.node.serialization.amqp.AMQPServerSerializationScheme import net.corda.nodeapi.internal.config.SSLConfiguration import net.corda.nodeapi.internal.createDevKeyStores import net.corda.nodeapi.internal.serialization.AllWhitelist import net.corda.nodeapi.internal.serialization.SerializationContextImpl import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl -import net.corda.nodeapi.internal.serialization.kryo.kryoMagic +import net.corda.nodeapi.internal.serialization.amqp.amqpMagic import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.BOB_NAME import net.corda.testing.core.TestIdentity @@ -335,8 +335,8 @@ class X509UtilitiesTest { @Test fun `serialize - deserialize X509Certififcate`() { - val factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) } - val context = SerializationContextImpl(kryoMagic, + val factory = SerializationFactoryImpl().apply { registerScheme(AMQPServerSerializationScheme()) } + val context = SerializationContextImpl(amqpMagic, javaClass.classLoader, AllWhitelist, emptyMap(), @@ -351,8 +351,8 @@ class X509UtilitiesTest { @Test fun `serialize - deserialize X509CertPath`() { - val factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) } - val context = SerializationContextImpl(kryoMagic, + val factory = SerializationFactoryImpl().apply { registerScheme(AMQPServerSerializationScheme()) } + val context = SerializationContextImpl(amqpMagic, javaClass.classLoader, AllWhitelist, emptyMap(), diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationOutputTests.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationOutputTests.kt index d3bea49d1b..468a46a465 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationOutputTests.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationOutputTests.kt @@ -30,6 +30,7 @@ import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationFactory import net.corda.core.transactions.LedgerTransaction import net.corda.core.utilities.OpaqueBytes +import net.corda.node.serialization.amqp.AMQPServerSerializationScheme import net.corda.nodeapi.internal.DEV_INTERMEDIATE_CA import net.corda.nodeapi.internal.crypto.ContentSignerBuilder import net.corda.nodeapi.internal.serialization.AllWhitelist diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt index 7f8a8c302b..8607d04d8c 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt @@ -5,6 +5,7 @@ import com.esotericsoftware.kryo.KryoException import com.esotericsoftware.kryo.KryoSerializable import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output +import com.esotericsoftware.kryo.pool.KryoPool import com.google.common.primitives.Ints import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.whenever @@ -12,9 +13,9 @@ import net.corda.core.contracts.PrivacySalt import net.corda.core.crypto.* import net.corda.core.internal.FetchDataFlow import net.corda.core.serialization.* +import net.corda.core.utilities.ByteSequence import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.sequence -import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.node.services.persistence.NodeAttachmentService import net.corda.nodeapi.internal.serialization.* import net.corda.testing.core.ALICE_NAME @@ -34,6 +35,17 @@ import java.time.Instant import java.util.* import kotlin.test.* +class TestScheme : AbstractKryoSerializationScheme() { + override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { + return magic == kryoMagic && target != SerializationContext.UseCase.RPCClient + } + + override fun rpcClientKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException() + + override fun rpcServerKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException() + +} + @RunWith(Parameterized::class) class KryoTests(private val compression: CordaSerializationEncoding?) { companion object { @@ -48,7 +60,7 @@ class KryoTests(private val compression: CordaSerializationEncoding?) { @Before fun setup() { - factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) } + factory = SerializationFactoryImpl().apply { registerScheme(TestScheme()) } context = SerializationContextImpl(kryoMagic, javaClass.classLoader, AllWhitelist, @@ -76,11 +88,12 @@ class KryoTests(private val compression: CordaSerializationEncoding?) { assertThat(bits.deserialize(factory, context)).isEqualTo(Person("bob", null)) } + @Test fun `serialised form is stable when the same object instance is added to the deserialised object graph`() { val noReferencesContext = context.withoutReferences() - val obj = Ints.toByteArray(0x01234567).sequence() - val originalList = arrayListOf(obj) + val obj : ByteSequence = Ints.toByteArray(0x01234567).sequence() + val originalList : ArrayList = arrayListOf(obj) val deserialisedList = originalList.serialize(factory, noReferencesContext).deserialize(factory, noReferencesContext) originalList += obj deserialisedList += obj @@ -268,7 +281,7 @@ class KryoTests(private val compression: CordaSerializationEncoding?) { } } Tmp() - val factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) } + val factory = SerializationFactoryImpl().apply { registerScheme(TestScheme()) } val context = SerializationContextImpl(kryoMagic, javaClass.classLoader, AllWhitelist, diff --git a/node/src/integration-test/kotlin/net/corda/node/BootTests.kt b/node/src/integration-test/kotlin/net/corda/node/BootTests.kt index da6c55249c..375c96f0b7 100644 --- a/node/src/integration-test/kotlin/net/corda/node/BootTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/BootTests.kt @@ -2,6 +2,7 @@ package net.corda.node import co.paralleluniverse.fibers.Suspendable import net.corda.client.rpc.CordaRPCClient +import net.corda.core.CordaRuntimeException import net.corda.core.flows.FlowLogic import net.corda.core.flows.StartableByRPC import net.corda.core.internal.div @@ -11,6 +12,7 @@ import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow import net.corda.node.internal.NodeStartup import net.corda.node.services.Permissions.Companion.startFlow +import net.corda.nodeapi.exceptions.InternalNodeException import net.corda.testing.common.internal.ProjectStructure.projectRootDir import net.corda.testing.core.ALICE_NAME import net.corda.testing.driver.DriverParameters @@ -28,8 +30,11 @@ class BootTests { fun `java deserialization is disabled`() { driver { val user = User("u", "p", setOf(startFlow())) - val future = CordaRPCClient(startNode(rpcUsers = listOf(user)).getOrThrow().rpcAddress).start(user.username, user.password).proxy.startFlow(::ObjectInputStreamFlow).returnValue - assertThatThrownBy { future.getOrThrow() }.isInstanceOf(InvalidClassException::class.java).hasMessage("filter status: REJECTED") + val future = CordaRPCClient(startNode(rpcUsers = listOf(user)).getOrThrow().rpcAddress). + start(user.username, user.password).proxy.startFlow(::ObjectInputStreamFlow).returnValue + assertThatThrownBy { future.getOrThrow() } + .isInstanceOf(CordaRuntimeException::class.java) + .hasMessageContaining(InternalNodeException.defaultMessage()) } } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcExceptionHandlingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcExceptionHandlingTest.kt index 21c6be7f50..d6714f506f 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcExceptionHandlingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcExceptionHandlingTest.kt @@ -33,12 +33,12 @@ class RpcExceptionHandlingTest { val node = startNode(NodeParameters(rpcUsers = users)).getOrThrow() - assertThatCode { node.rpc.startFlow(::Flow).returnValue.getOrThrow() } - .isInstanceOfSatisfying(InternalNodeException::class.java) { exception -> - assertThat(exception).hasNoCause() - assertThat(exception.stackTrace).isEmpty() - assertThat(exception.message).isEqualTo(InternalNodeException.defaultMessage()) - } + assertThatCode { node.rpc.startFlow(::Flow).returnValue.getOrThrow() }.isInstanceOfSatisfying(InternalNodeException::class.java) { exception -> + + assertThat(exception).hasNoCause() + assertThat(exception.stackTrace).isEmpty() + assertThat(exception.message).isEqualTo(InternalNodeException.defaultMessage()) + } } } @@ -49,12 +49,12 @@ class RpcExceptionHandlingTest { val node = startNode(NodeParameters(rpcUsers = users)).getOrThrow() val clientRelevantMessage = "This is for the players!" - assertThatCode { node.rpc.startFlow(::ClientRelevantErrorFlow, clientRelevantMessage).returnValue.getOrThrow() } - .isInstanceOfSatisfying(ClientRelevantException::class.java) { exception -> - assertThat(exception).hasNoCause() - assertThat(exception.stackTrace).isEmpty() - assertThat(exception.message).isEqualTo(clientRelevantMessage) - } + assertThatCode { node.rpc.startFlow(::ClientRelevantErrorFlow, clientRelevantMessage).returnValue.getOrThrow() }.isInstanceOfSatisfying(ClientRelevantException::class.java) { exception -> + + assertThat(exception).hasNoCause() + assertThat(exception.stackTrace).isEmpty() + assertThat(exception.message).isEqualTo(clientRelevantMessage) + } } } @@ -81,10 +81,11 @@ class RpcExceptionHandlingTest { assertThatCode { nodeA.rpc.startFlow(::InitFlow, nodeB.nodeInfo.singleIdentity()).returnValue.getOrThrow() } .isInstanceOfSatisfying(InternalNodeException::class.java) { exception -> - assertThat(exception).hasNoCause() - assertThat(exception.stackTrace).isEmpty() - assertThat(exception.message).isEqualTo(InternalNodeException.defaultMessage()) - } + + assertThat(exception).hasNoCause() + assertThat(exception.stackTrace).isEmpty() + assertThat(exception.message).isEqualTo(InternalNodeException.defaultMessage()) + } } } } diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 16dcdeec41..944faf6579 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -1,7 +1,7 @@ package net.corda.node.internal import com.codahale.metrics.JmxReporter -import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme +import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.core.concurrent.CordaFuture import net.corda.core.internal.Emoji import net.corda.core.internal.concurrent.openFuture @@ -26,6 +26,7 @@ import net.corda.node.internal.artemis.BrokerAddresses import net.corda.node.internal.cordapp.CordappLoader import net.corda.node.internal.security.RPCSecurityManagerImpl import net.corda.node.internal.security.RPCSecurityManagerWithAdditionalUser +import net.corda.node.serialization.amqp.AMQPServerSerializationScheme import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.node.services.api.NodePropertiesStore import net.corda.node.services.api.SchemaService @@ -42,7 +43,6 @@ import net.corda.nodeapi.internal.addShutdownHook import net.corda.nodeapi.internal.bridging.BridgeControlListener import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.serialization.* -import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme import org.slf4j.Logger import org.slf4j.LoggerFactory import rx.Scheduler @@ -384,15 +384,15 @@ open class Node(configuration: NodeConfiguration, val classloader = cordappLoader.appClassLoader nodeSerializationEnv = SerializationEnvironmentImpl( SerializationFactoryImpl().apply { - registerScheme(KryoServerSerializationScheme()) registerScheme(AMQPServerSerializationScheme(cordappLoader.cordapps)) - registerScheme(KryoClientSerializationScheme()) + registerScheme(AMQPClientSerializationScheme(cordappLoader.cordapps)) + registerScheme(KryoServerSerializationScheme() ) }, p2pContext = AMQP_P2P_CONTEXT.withClassLoader(classloader), - rpcServerContext = KRYO_RPC_SERVER_CONTEXT.withClassLoader(classloader), + rpcServerContext = AMQP_RPC_SERVER_CONTEXT.withClassLoader(classloader), storageContext = AMQP_STORAGE_CONTEXT.withClassLoader(classloader), checkpointContext = KRYO_CHECKPOINT_CONTEXT.withClassLoader(classloader), - rpcClientContext = if (configuration.shouldInitCrashShell()) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classloader) else null) //even Shell embeded in the node connects via RPC to the node + rpcClientContext = if (configuration.shouldInitCrashShell()) AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classloader) else null) //even Shell embeded in the node connects via RPC to the node } private var rpcMessagingClient: RPCMessagingClient? = null diff --git a/node/src/main/kotlin/net/corda/node/serialization/amqp/AMQPServerSerializationScheme.kt b/node/src/main/kotlin/net/corda/node/serialization/amqp/AMQPServerSerializationScheme.kt new file mode 100644 index 0000000000..dc9ea63428 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/serialization/amqp/AMQPServerSerializationScheme.kt @@ -0,0 +1,46 @@ +package net.corda.node.serialization.amqp + +import net.corda.core.cordapp.Cordapp +import net.corda.core.serialization.ClassWhitelist +import net.corda.core.serialization.SerializationContext +import net.corda.core.serialization.SerializationCustomSerializer +import net.corda.nodeapi.internal.serialization.CordaSerializationMagic +import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme +import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory +import net.corda.nodeapi.internal.serialization.amqp.custom.RxNotificationSerializer + +import java.util.concurrent.ConcurrentHashMap + +/** + * When set as the serialization scheme, defines the RPC Server serialization scheme as using the Corda + * AMQP implementation. + */ +class AMQPServerSerializationScheme( + cordappCustomSerializers: Set>, + serializerFactoriesForContexts: MutableMap, SerializerFactory> +) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) { + constructor(cordapps: List) : this(cordapps.customSerializers, ConcurrentHashMap()) + + constructor() : this(emptySet(), ConcurrentHashMap()) + + override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { + throw UnsupportedOperationException() + } + + override fun rpcServerSerializerFactory(context: SerializationContext) = + SerializerFactory( + context.whitelist, + context.deserializationClassLoader + ).apply { + register(RpcServerObservableSerializer()) + register(RpcServerCordaFutureSerializer(this)) + register(RxNotificationSerializer(this)) + } + + override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { + return canDeserializeVersion(magic) && + ( target == SerializationContext.UseCase.P2P + || target == SerializationContext.UseCase.Storage + || target == SerializationContext.UseCase.RPCServer) + } +} diff --git a/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerCordaFutureSerialiser.kt b/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerCordaFutureSerialiser.kt new file mode 100644 index 0000000000..f66e3c5daf --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerCordaFutureSerialiser.kt @@ -0,0 +1,35 @@ +package net.corda.node.serialization.amqp + +import net.corda.core.concurrent.CordaFuture +import net.corda.core.toObservable +import net.corda.nodeapi.internal.serialization.amqp.CustomSerializer +import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory +import rx.Observable +import java.io.NotSerializableException + +/** + * Serializer for [CordaFuture] objects where Futures are converted to Observables and + * are thus dealt with by the [RpcServerObservableSerializer] + */ +class RpcServerCordaFutureSerializer(factory: SerializerFactory) + : CustomSerializer.Proxy, + RpcServerCordaFutureSerializer.FutureProxy>( + CordaFuture::class.java, RpcServerCordaFutureSerializer.FutureProxy::class.java, factory +) { + override fun fromProxy(proxy: RpcServerCordaFutureSerializer.FutureProxy): CordaFuture<*> { + throw UnsupportedOperationException() + } + + override fun toProxy(obj: CordaFuture<*>): RpcServerCordaFutureSerializer.FutureProxy { + try { + return FutureProxy(obj.toObservable()) + } catch (e: NotSerializableException) { + throw (NotSerializableException("Failed to serialize Future as proxy Observable - ${e.message}")) + } + } + + data class FutureProxy(val observable: Observable<*>) +} + + + diff --git a/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerObservableSerializer.kt b/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerObservableSerializer.kt new file mode 100644 index 0000000000..312a4906cc --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerObservableSerializer.kt @@ -0,0 +1,140 @@ +package net.corda.node.serialization.amqp + +import net.corda.core.context.Trace +import net.corda.core.serialization.SerializationContext +import net.corda.core.utilities.loggerFor +import net.corda.node.services.messaging.ObservableContextInterface +import net.corda.node.services.messaging.ObservableSubscription +import net.corda.nodeapi.RPCApi +import net.corda.nodeapi.internal.serialization.amqp.* +import org.apache.qpid.proton.codec.Data + +import rx.Notification +import rx.Observable +import rx.Subscriber +import java.io.NotSerializableException + +import java.lang.reflect.Type + +/** + * Server side serializer that notionally serializes RxObservables when used by the RPC + * framework for event subscriptions. Notional in the sense that the actual observable + * isn't serialized, rather a reference to the observable is, this is then used by + * the client side RPC handler to subscribe to the observable stream. + */ +class RpcServerObservableSerializer : CustomSerializer.Implements>( + Observable::class.java +) { + // Would be great to make this private, but then it's so much harder to unit test + object RpcObservableContextKey + + companion object { + fun createContext( + serializationContext: SerializationContext, + observableContext: ObservableContextInterface + ) = serializationContext.withProperty( + RpcServerObservableSerializer.RpcObservableContextKey, observableContext) + } + + override val schemaForDocumentation = Schema( + listOf( + CompositeType( + name = type.toString(), + label = "", + provides = emptyList(), + descriptor = descriptor, + fields = listOf( + Field( + name = "observableId", + type = "string", + requires = emptyList(), + default = null, + label = null, + mandatory = true, + multiple = false), + Field( + name = "observableInstant", + type = "long", + requires = emptyList(), + default = null, + label = null, + mandatory = true, + multiple = false) + )))) + + override fun readObject( + obj: Any, schemas: SerializationSchemas, + input: DeserializationInput, + context: SerializationContext + ): Observable<*> { + throw UnsupportedOperationException() + } + + override fun writeDescribedObject( + obj: Observable<*>, + data: Data, + type: Type, + output: SerializationOutput, + context: SerializationContext + ) { + val observableId = Trace.InvocationId.newInstance() + if (RpcServerObservableSerializer.RpcObservableContextKey !in context.properties) { + throw NotSerializableException("Missing Observable Key on serialization context - $type") + } + + val observableContext = context.properties[RpcServerObservableSerializer.RpcObservableContextKey] + as ObservableContextInterface + + data.withList { + data.putString(observableId.value) + data.putLong(observableId.timestamp.toEpochMilli()) + } + + val observableWithSubscription = ObservableSubscription( + subscription = obj.materialize().subscribe( + object : Subscriber>() { + override fun onNext(observation: Notification<*>) { + if (!isUnsubscribed) { + val message = RPCApi.ServerToClient.Observation( + id = observableId, + content = observation, + deduplicationIdentity = observableContext.deduplicationIdentity + ) + observableContext.sendMessage(message) + } + } + + override fun onError(exception: Throwable) { + loggerFor().error( + "onError called in materialize()d RPC Observable", exception) + } + + override fun onCompleted() { + observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> + if (observables != null) { + observables.remove(observableId) + if (observables.isEmpty()) { + null + } else { + observables + } + } else { + null + } + } + } + } + ) + ) + + observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> + if (observables == null) { + hashSetOf(observableId) + } else { + observables.add(observableId) + observables + } + } + observableContext.observableMap.put(observableId, observableWithSubscription) + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/serialization/kryo/KryoServerSerializationScheme.kt b/node/src/main/kotlin/net/corda/node/serialization/kryo/KryoServerSerializationScheme.kt index acf4c7f461..83eec03995 100644 --- a/node/src/main/kotlin/net/corda/node/serialization/kryo/KryoServerSerializationScheme.kt +++ b/node/src/main/kotlin/net/corda/node/serialization/kryo/KryoServerSerializationScheme.kt @@ -4,22 +4,15 @@ import com.esotericsoftware.kryo.pool.KryoPool import net.corda.core.serialization.SerializationContext import net.corda.nodeapi.internal.serialization.CordaSerializationMagic import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme -import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer import net.corda.nodeapi.internal.serialization.kryo.kryoMagic -import net.corda.nodeapi.internal.serialization.kryo.RPCKryo class KryoServerSerializationScheme : AbstractKryoSerializationScheme() { override fun canDeserializeVersion(magic: CordaSerializationMagic, target: SerializationContext.UseCase): Boolean { - return magic == kryoMagic && target != SerializationContext.UseCase.RPCClient + return magic == kryoMagic && target == SerializationContext.UseCase.Checkpoint } override fun rpcClientKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException() - override fun rpcServerKryoPool(context: SerializationContext): KryoPool { - return KryoPool.Builder { - DefaultKryoCustomizer.customize(RPCKryo(RpcServerObservableSerializer, context), publicKeySerializer).apply { - classLoader = context.deserializationClassLoader - } - }.build() - } + override fun rpcServerKryoPool(context: SerializationContext): KryoPool = throw UnsupportedOperationException() + } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt b/node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt deleted file mode 100644 index 51013a9bce..0000000000 --- a/node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt +++ /dev/null @@ -1,87 +0,0 @@ -package net.corda.node.serialization.kryo - -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.Serializer -import com.esotericsoftware.kryo.io.Input -import com.esotericsoftware.kryo.io.Output -import net.corda.core.context.Trace -import net.corda.core.serialization.SerializationContext -import net.corda.core.serialization.SerializationDefaults -import net.corda.node.services.messaging.ObservableSubscription -import net.corda.node.services.messaging.RPCServer -import net.corda.nodeapi.RPCApi -import org.slf4j.LoggerFactory -import rx.Notification -import rx.Observable -import rx.Subscriber - -object RpcServerObservableSerializer : Serializer>() { - private object RpcObservableContextKey - - private val log = LoggerFactory.getLogger(javaClass) - fun createContext(observableContext: RPCServer.ObservableContext): SerializationContext { - return SerializationDefaults.RPC_SERVER_CONTEXT.withProperty(RpcServerObservableSerializer.RpcObservableContextKey, observableContext) - } - - override fun read(kryo: Kryo?, input: Input?, type: Class>?): Observable { - throw UnsupportedOperationException() - } - - override fun write(kryo: Kryo, output: Output, observable: Observable<*>) { - val observableId = Trace.InvocationId.newInstance() - val observableContext = kryo.context[RpcObservableContextKey] as RPCServer.ObservableContext - output.writeInvocationId(observableId) - val observableWithSubscription = ObservableSubscription( - // We capture [observableContext] in the subscriber. Note that all synchronisation/kryo borrowing - // must be done again within the subscriber - subscription = observable.materialize().subscribe( - object : Subscriber>() { - override fun onNext(observation: Notification<*>) { - if (!isUnsubscribed) { - val message = RPCApi.ServerToClient.Observation( - id = observableId, - content = observation, - deduplicationIdentity = observableContext.deduplicationIdentity - ) - observableContext.sendMessage(message) - } - } - - override fun onError(exception: Throwable) { - log.error("onError called in materialize()d RPC Observable", exception) - } - - override fun onCompleted() { - observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> - if (observables != null) { - observables.remove(observableId) - if (observables.isEmpty()) { - null - } else { - observables - } - } else { - null - } - } - } - } - ) - ) - observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> - if (observables == null) { - hashSetOf(observableId) - } else { - observables.add(observableId) - observables - } - } - observableContext.observableMap.put(observableId, observableWithSubscription) - } - - private fun Output.writeInvocationId(id: Trace.InvocationId) { - - writeString(id.value) - writeLong(id.timestamp.toEpochMilli()) - } -} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ObservableContextInterface.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ObservableContextInterface.kt new file mode 100644 index 0000000000..32adde2e64 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ObservableContextInterface.kt @@ -0,0 +1,21 @@ +package net.corda.node.services.messaging + +import com.github.benmanes.caffeine.cache.Cache +import net.corda.core.context.Trace +import net.corda.nodeapi.RPCApi +import org.apache.activemq.artemis.api.core.SimpleString +import java.util.concurrent.ConcurrentHashMap + +/** + * An observable context is constructed on each RPC request. If subsequently a nested Observable is encountered this + * same context is propagated by the serialization context. This way all observations rooted in a single RPC will be + * muxed correctly. Note that the context construction itself is quite cheap. + */ +interface ObservableContextInterface { + fun sendMessage(serverToClient: RPCApi.ServerToClient) + + val observableMap: Cache + val clientAddressToObservables: ConcurrentHashMap> + val deduplicationIdentity: String + val clientAddress: SimpleString +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index 7c9ed1cf86..4574afffef 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -15,13 +15,14 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.internal.LifeCycle import net.corda.core.messaging.RPCOps import net.corda.core.serialization.SerializationContext +import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults.RPC_SERVER_CONTEXT import net.corda.core.serialization.deserialize import net.corda.core.utilities.* import net.corda.node.internal.security.AuthorizingSubject import net.corda.node.internal.security.RPCSecurityManager -import net.corda.node.serialization.kryo.RpcServerObservableSerializer import net.corda.node.services.logging.pushToLoggingContext +import net.corda.node.serialization.amqp.RpcServerObservableSerializer import net.corda.nodeapi.RPCApi import net.corda.nodeapi.externalTrace import net.corda.nodeapi.impersonatedActor @@ -45,6 +46,8 @@ import java.util.* import java.util.concurrent.* import kotlin.concurrent.thread +private typealias ObservableSubscriptionMap = Cache + data class RPCServerConfiguration( /** The number of threads to use for handling RPC requests */ val rpcThreadPoolSize: Int, @@ -406,19 +409,22 @@ class RPCServer( /* * We construct an observable context on each RPC request. If subsequently a nested Observable is encountered this - * same context is propagated by the instrumented KryoPool. This way all observations rooted in a single RPC will be + * same context is propagated by serialization context. This way all observations rooted in a single RPC will be * muxed correctly. Note that the context construction itself is quite cheap. */ inner class ObservableContext( - val observableMap: ObservableSubscriptionMap, - val clientAddressToObservables: ConcurrentHashMap>, - val deduplicationIdentity: String, - val clientAddress: SimpleString - ) { - private val serializationContextWithObservableContext = RpcServerObservableSerializer.createContext(this) + override val observableMap: ObservableSubscriptionMap, + override val clientAddressToObservables: ConcurrentHashMap>, + override val deduplicationIdentity: String, + override val clientAddress: SimpleString + ) : ObservableContextInterface { + private val serializationContextWithObservableContext = RpcServerObservableSerializer.createContext( + observableContext = this, + serializationContext = SerializationDefaults.RPC_SERVER_CONTEXT) - fun sendMessage(serverToClient: RPCApi.ServerToClient) { - sendJobQueue.put(RpcSendJob.Send(contextDatabaseOrNull, clientAddress, serializationContextWithObservableContext, serverToClient)) + override fun sendMessage(serverToClient: RPCApi.ServerToClient) { + sendJobQueue.put(RpcSendJob.Send(contextDatabaseOrNull, clientAddress, + serializationContextWithObservableContext, serverToClient)) } } @@ -478,4 +484,4 @@ class ObservableSubscription( val subscription: Subscription ) -typealias ObservableSubscriptionMap = Cache + diff --git a/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt b/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt index 2e30a1b711..2f54a90cea 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NodeInfoWatcher.kt @@ -10,12 +10,12 @@ import net.corda.core.serialization.serialize import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug import net.corda.core.utilities.seconds +import net.corda.node.serialization.amqp.AMQPServerSerializationScheme import net.corda.nodeapi.internal.NodeInfoAndSigned import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.network.NodeInfoFilesCopier import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl -import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme import rx.Observable import rx.Scheduler import java.nio.file.Path diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt new file mode 100644 index 0000000000..4467a271d6 --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt @@ -0,0 +1,101 @@ +package net.corda.node.internal.serialization + +import net.corda.client.rpc.internal.ObservableContext as ClientObservableContext +import net.corda.core.internal.ThreadBox +import net.corda.core.context.Trace +import net.corda.node.internal.serialization.testutils.AMQPRoundTripRPCSerializationScheme +import net.corda.node.internal.serialization.testutils.TestObservableContext as ServerObservableContext +import net.corda.node.services.messaging.ObservableSubscription +import net.corda.nodeapi.internal.serialization.amqp.DeserializationInput +import net.corda.nodeapi.internal.serialization.amqp.SerializationOutput + +import co.paralleluniverse.common.util.SameThreadExecutor +import com.github.benmanes.caffeine.cache.Cache +import com.github.benmanes.caffeine.cache.Caffeine +import com.github.benmanes.caffeine.cache.RemovalListener +import com.nhaarman.mockito_kotlin.mock +import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer +import net.corda.node.internal.serialization.testutils.serializationContext +import net.corda.node.serialization.amqp.RpcServerObservableSerializer +import net.corda.nodeapi.RPCApi +import org.apache.activemq.artemis.api.core.SimpleString +import org.junit.Test +import rx.Notification +import rx.Observable +import rx.Subscription +import rx.subjects.UnicastSubject +import java.time.Instant +import java.util.* +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit + +class RoundTripObservableSerializerTests { + private fun getID() = Trace.InvocationId("test1", Instant.now()) + + private fun subscriptionMap( + id: Trace.InvocationId + ) : Cache { + val subMap: Cache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES) + .maximumSize(100) + .build() + + subMap.put(id, ObservableSubscription(mock())) + + return subMap + } + + private val observablesToReap = ThreadBox(object { + var observables = ArrayList() + }) + + private fun createRpcObservableMap(): Cache>> { + val onObservableRemove = RemovalListener>> { key, value, cause -> + val observableId = key!! + + observablesToReap.locked { observables.add(observableId) } + } + + return Caffeine.newBuilder().weakValues().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()).build() + } + + @Test + fun roundTripTest1() { + val serializationScheme = AMQPRoundTripRPCSerializationScheme( + serializationContext, emptySet(), ConcurrentHashMap()) + + // Fake up a message ID, needs to be used on both "sides". The server setting it in the subscriptionMap, + // the client as a property of the deserializer which, in the actual RPC client, is pulled off of + // the received message + val id : Trace.InvocationId = getID() + + val serverObservableContext = ServerObservableContext( + subscriptionMap(id), + clientAddressToObservables = ConcurrentHashMap(), + deduplicationIdentity = "thisIsATest", + clientAddress = SimpleString("clientAddress")) + + val serverSerializer = serializationScheme.rpcServerSerializerFactory(serverObservableContext) + + val clientObservableContext = ClientObservableContext( + callSiteMap = null, + observableMap = createRpcObservableMap(), + hardReferenceStore = Collections.synchronizedSet(mutableSetOf>()) + ) + + val clientSerializer = serializationScheme.rpcClientSerializerFactory(clientObservableContext, id) + + + // What we're actually going to serialize then deserialize + val obs = Observable.create({ 12 }) + + val serverSerializationContext = RpcServerObservableSerializer.createContext( + serializationContext, serverObservableContext) + + val clientSerializationContext = RpcClientObservableSerializer.createContext( + serializationContext, clientObservableContext).withProperty(RPCApi.RpcRequestOrObservableIdKey, id) + + + val blob = SerializationOutput(serverSerializer).serialize(obs, serverSerializationContext) + val obs2 = DeserializationInput(clientSerializer).deserialize(blob, clientSerializationContext) + } +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/RpcServerObservableSerializerTests.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/RpcServerObservableSerializerTests.kt new file mode 100644 index 0000000000..6778a2ef91 --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/RpcServerObservableSerializerTests.kt @@ -0,0 +1,84 @@ +package net.corda.node.internal.serialization + +import com.github.benmanes.caffeine.cache.Cache +import com.github.benmanes.caffeine.cache.Caffeine +import com.nhaarman.mockito_kotlin.mock +import net.corda.core.context.Trace +import net.corda.node.internal.serialization.testutils.* +import net.corda.node.serialization.amqp.RpcServerObservableSerializer +import net.corda.node.services.messaging.ObservableSubscription +import net.corda.nodeapi.internal.serialization.AllWhitelist +import net.corda.nodeapi.internal.serialization.amqp.SerializationOutput +import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory + +import org.apache.activemq.artemis.api.core.SimpleString +import org.junit.Test +import rx.Observable +import rx.Subscription +import java.time.Instant +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class RpcServerObservableSerializerTests { + + private fun subscriptionMap(): Cache { + val subMap: Cache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES) + .maximumSize(100) + .build() + + subMap.put(Trace.InvocationId("test1", Instant.now()), ObservableSubscription(mock())) + + return subMap + } + + @Test + fun canSerializerBeRegistered() { + val sf = SerializerFactory(AllWhitelist, javaClass.classLoader) + + try { + sf.register(RpcServerObservableSerializer()) + } catch (e: Exception) { + throw Error("Observable serializer must be registerable with factory, unexpected exception - ${e.message}") + } + } + + @Test + fun canAssociateWithContext() { + val observable = TestObservableContext( + subscriptionMap(), + clientAddressToObservables = ConcurrentHashMap(), + deduplicationIdentity = "thisIsATest", + clientAddress = SimpleString("clientAddress")) + + val newContext = RpcServerObservableSerializer.createContext(serializationContext, observable) + + assertEquals(1, newContext.properties.size) + assertTrue(newContext.properties.containsKey(RpcServerObservableSerializer.RpcObservableContextKey)) + assertEquals(observable, newContext.properties[RpcServerObservableSerializer.RpcObservableContextKey]) + } + + @Test + fun serialiseFakeObservable() { + val testClientAddress = "clientAddres" + val observable = TestObservableContext( + subscriptionMap(), + clientAddressToObservables = ConcurrentHashMap(), + deduplicationIdentity = "thisIsATest", + clientAddress = SimpleString(testClientAddress)) + + val sf = SerializerFactory(AllWhitelist, javaClass.classLoader).apply { + register(RpcServerObservableSerializer()) + } + + val obs = Observable.create({ 12 }) + val newContext = RpcServerObservableSerializer.createContext(serializationContext, observable) + + try { + SerializationOutput(sf).serializeAndReturnSchema(obs, newContext) + } catch (e: Exception) { + throw Error("Serialization of observable should not throw - ${e.message}") + } + } +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt new file mode 100644 index 0000000000..73a48147ba --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt @@ -0,0 +1,55 @@ +package net.corda.node.internal.serialization.testutils + +import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer +import net.corda.core.context.Trace +import net.corda.core.cordapp.Cordapp +import net.corda.core.serialization.ClassWhitelist +import net.corda.core.serialization.SerializationContext +import net.corda.core.serialization.SerializationCustomSerializer +import net.corda.node.serialization.amqp.RpcServerObservableSerializer +import net.corda.nodeapi.RPCApi +import net.corda.nodeapi.internal.serialization.AllWhitelist +import net.corda.nodeapi.internal.serialization.CordaSerializationMagic +import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme +import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory +import java.util.concurrent.ConcurrentHashMap +import net.corda.client.rpc.internal.ObservableContext as ClientObservableContext + +/** + * Special serialization context for the round trip tests that allows for both server and client RPC + * operations + */ + + +class AMQPRoundTripRPCSerializationScheme( + private val serializationContext: SerializationContext, + cordappCustomSerializers: Set>, + serializerFactoriesForContexts: MutableMap, SerializerFactory>) + : AbstractAMQPSerializationScheme( + cordappCustomSerializers, serializerFactoriesForContexts +) { + override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { + return SerializerFactory(AllWhitelist, javaClass.classLoader).apply { + register(RpcClientObservableSerializer) + } + } + + override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory { + return SerializerFactory(AllWhitelist, javaClass.classLoader).apply { + register(RpcServerObservableSerializer()) + } + } + + override fun canDeserializeVersion( + magic: CordaSerializationMagic, + target: SerializationContext.UseCase) = true + + fun rpcClientSerializerFactory(observableContext: ClientObservableContext, id: Trace.InvocationId) = + rpcClientSerializerFactory( + RpcClientObservableSerializer.createContext(serializationContext, observableContext) + .withProperty(RPCApi.RpcRequestOrObservableIdKey, id)) + + fun rpcServerSerializerFactory(observableContext: TestObservableContext) = + rpcServerSerializerFactory( + RpcServerObservableSerializer.createContext(serializationContext, observableContext)) +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestObservableContext.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestObservableContext.kt new file mode 100644 index 0000000000..cbfcaa8f5c --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestObservableContext.kt @@ -0,0 +1,18 @@ +package net.corda.node.internal.serialization.testutils + +import com.github.benmanes.caffeine.cache.Cache +import net.corda.core.context.Trace +import net.corda.node.services.messaging.ObservableContextInterface +import net.corda.node.services.messaging.ObservableSubscription +import net.corda.nodeapi.RPCApi +import org.apache.activemq.artemis.api.core.SimpleString +import java.util.concurrent.ConcurrentHashMap + +class TestObservableContext( + override val observableMap: Cache, + override val clientAddressToObservables: ConcurrentHashMap>, + override val deduplicationIdentity: String, + override val clientAddress: SimpleString +) : ObservableContextInterface { + override fun sendMessage(serverToClient: RPCApi.ServerToClient) { } +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestSerializationContext.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestSerializationContext.kt new file mode 100644 index 0000000000..54ad5e55ba --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/TestSerializationContext.kt @@ -0,0 +1,17 @@ +package net.corda.node.internal.serialization.testutils + +import net.corda.core.serialization.SerializationContext +import net.corda.nodeapi.internal.serialization.AllWhitelist +import net.corda.nodeapi.internal.serialization.SerializationContextImpl +import net.corda.nodeapi.internal.serialization.amqp.amqpMagic + +val serializationProperties: MutableMap = mutableMapOf() + +val serializationContext = SerializationContextImpl( + preferredSerializationVersion = amqpMagic, + deserializationClassLoader = ClassLoader.getSystemClassLoader(), + whitelist = AllWhitelist, + properties = serializationProperties, + objectReferencesEnabled = false, + useCase = SerializationContext.UseCase.Testing, + encoding = null) \ No newline at end of file diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt index 5b3f1a964b..13abc74202 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt @@ -1,9 +1,9 @@ package net.corda.testing.node.internal import net.corda.client.mock.Generator -import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme import net.corda.client.rpc.internal.RPCClient import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl +import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.core.concurrent.CordaFuture import net.corda.core.context.AuthServiceId import net.corda.core.context.Trace @@ -23,7 +23,7 @@ import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.nodeapi.ArtemisTcpTransport import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.RPCApi -import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT +import net.corda.nodeapi.internal.serialization.AMQP_RPC_CLIENT_CONTEXT import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.core.MAX_MESSAGE_SIZE import net.corda.testing.driver.JmxPolicy @@ -513,8 +513,8 @@ class RandomRpcUser { val hostAndPort = NetworkHostAndPort.parse(args[1]) val username = args[2] val password = args[3] - KryoClientSerializationScheme.initialiseSerialization() - val handle = RPCClient(hostAndPort, null, serializationContext = KRYO_RPC_CLIENT_CONTEXT).start(rpcClass, username, password) + AMQPClientSerializationScheme.initialiseSerialization() + val handle = RPCClient(hostAndPort, null, serializationContext = AMQP_RPC_CLIENT_CONTEXT).start(rpcClass, username, password) val callGenerators = rpcClass.declaredMethods.map { method -> Generator.sequence(method.parameters.map { generatorStore[it.type] ?: throw Exception("No generator for ${it.type}") diff --git a/testing/smoke-test-utils/src/main/kotlin/net/corda/smoketesting/NodeProcess.kt b/testing/smoke-test-utils/src/main/kotlin/net/corda/smoketesting/NodeProcess.kt index ee3936c4c2..a339d1040a 100644 --- a/testing/smoke-test-utils/src/main/kotlin/net/corda/smoketesting/NodeProcess.kt +++ b/testing/smoke-test-utils/src/main/kotlin/net/corda/smoketesting/NodeProcess.kt @@ -2,7 +2,7 @@ package net.corda.smoketesting import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCConnection -import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme +import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.core.internal.* import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger @@ -59,7 +59,7 @@ class NodeProcess( val javaPath: Path = Paths.get(System.getProperty("java.home"), "bin", "java") val formatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(systemDefault()) val defaultNetworkParameters = run { - KryoClientSerializationScheme.createSerializationEnv().asContextEnv { + AMQPClientSerializationScheme.createSerializationEnv().asContextEnv { // There are no notaries in the network parameters for smoke test nodes. If this is required then we would // need to introduce the concept of a "network" which predefines the notaries, like the driver and MockNetwork NetworkParametersCopier(testNetworkParameters()) diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/core/SerializationTestHelpers.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/core/SerializationTestHelpers.kt index 5cfe318790..d896710d26 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/core/SerializationTestHelpers.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/core/SerializationTestHelpers.kt @@ -3,6 +3,7 @@ package net.corda.testing.core import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.doAnswer import com.nhaarman.mockito_kotlin.whenever +import net.corda.core.DoNotImplement import net.corda.core.internal.staticField import net.corda.core.serialization.internal.SerializationEnvironment import net.corda.core.serialization.internal.effectiveSerializationEnv diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt index ae2c734f96..abc75b4eba 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt @@ -2,13 +2,12 @@ package net.corda.testing.internal import com.nhaarman.mockito_kotlin.doNothing import com.nhaarman.mockito_kotlin.whenever -import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme import net.corda.core.DoNotImplement +import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.core.serialization.internal.* +import net.corda.node.serialization.amqp.AMQPServerSerializationScheme import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.nodeapi.internal.serialization.* -import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme -import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme import net.corda.testing.core.SerializationEnvironmentRule import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ExecutorService @@ -31,16 +30,16 @@ fun withoutTestSerialization(callable: () -> T): T { // TODO: Delete this, s internal fun createTestSerializationEnv(label: String): SerializationEnvironmentImpl { val factory = SerializationFactoryImpl().apply { - registerScheme(KryoClientSerializationScheme()) - registerScheme(KryoServerSerializationScheme()) registerScheme(AMQPClientSerializationScheme(emptyList())) registerScheme(AMQPServerSerializationScheme(emptyList())) + // needed for checkpointing + registerScheme(KryoServerSerializationScheme()) } return object : SerializationEnvironmentImpl( factory, AMQP_P2P_CONTEXT, - KRYO_RPC_SERVER_CONTEXT, - KRYO_RPC_CLIENT_CONTEXT, + AMQP_RPC_SERVER_CONTEXT, + AMQP_RPC_CLIENT_CONTEXT, AMQP_STORAGE_CONTEXT, KRYO_CHECKPOINT_CONTEXT ) { diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/DemoBench.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/DemoBench.kt index df5a534352..4b909ed4f6 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/DemoBench.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/DemoBench.kt @@ -1,13 +1,12 @@ package net.corda.demobench import javafx.scene.image.Image -import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme +import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.core.serialization.internal.SerializationEnvironmentImpl import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.demobench.views.DemoBenchView import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl -import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme import tornadofx.* import java.io.InputStreamReader import java.nio.charset.StandardCharsets.UTF_8 @@ -59,8 +58,7 @@ class DemoBench : App(DemoBenchView::class) { private fun initialiseSerialization() { nodeSerializationEnv = SerializationEnvironmentImpl( SerializationFactoryImpl().apply { - registerScheme(KryoClientSerializationScheme()) - registerScheme(AMQPClientSerializationScheme()) + registerScheme(AMQPClientSerializationScheme(emptyList())) }, AMQP_P2P_CONTEXT) } diff --git a/tools/shell/src/integration-test/kotlin/net/corda/tools/shell/SSHServerTest.kt b/tools/shell/src/integration-test/kotlin/net/corda/tools/shell/SSHServerTest.kt index 919acd78cf..9aceb2cff2 100644 --- a/tools/shell/src/integration-test/kotlin/net/corda/tools/shell/SSHServerTest.kt +++ b/tools/shell/src/integration-test/kotlin/net/corda/tools/shell/SSHServerTest.kt @@ -22,6 +22,7 @@ import org.assertj.core.api.Assertions.assertThat import org.bouncycastle.util.io.Streams import org.junit.Ignore import org.junit.Test +import java.lang.Thread.sleep import java.net.ConnectException import kotlin.test.assertTrue import kotlin.test.fail @@ -55,7 +56,7 @@ class SSHServerTest { // The driver will automatically pick up the annotated flows below driver { val node = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user), - customOverrides = mapOf("sshd" to mapOf("port" to 2222))) + customOverrides = mapOf("sshd" to mapOf("port" to 2222)) /*, startInSameProcess = true */) node.getOrThrow() val session = JSch().getSession("u", "localhost", 2222)