From c24d916f5a01c8f7502baef689252fc0ba4d2149 Mon Sep 17 00:00:00 2001 From: Viktor Kolomeyko Date: Mon, 6 Aug 2018 09:11:15 +0100 Subject: [PATCH] RPC changes (#3697) * Eliminate un-necessary reference to Kryo in the comment * RPC documentation update. * Rename RpcClientObservableSerializer into RpcClientObservableDeSerializer --- .../corda/client/rpc/internal/RPCClientProxyHandler.kt | 6 +++--- .../serialization/amqp/AMQPClientSerializationScheme.kt | 3 +-- ...leSerializer.kt => RpcClientObservableDeSerializer.kt} | 8 ++++---- docs/source/clientrpc.rst | 5 ++++- .../serialization/amqp/RpcServerObservableSerializer.kt | 2 ++ .../serialization/RoundTripObservableSerializerTests.kt | 4 ++-- .../serialization/testutils/AMQPTestSerialiationScheme.kt | 6 +++--- 7 files changed, 19 insertions(+), 15 deletions(-) rename client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/{RpcClientObservableSerializer.kt => RpcClientObservableDeSerializer.kt} (91%) diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 45d65c6644..35c1baccaf 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -10,7 +10,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.RPCException import net.corda.client.rpc.RPCSinceVersion -import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer +import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSerializer import net.corda.core.context.Actor import net.corda.core.context.Trace import net.corda.core.context.Trace.InvocationId @@ -155,7 +155,7 @@ class RPCClientProxyHandler( private val observablesToReap = ThreadBox(object { var observables = ArrayList() }) - private val serializationContextWithObservableContext = RpcClientObservableSerializer.createContext(serializationContext, observableContext) + private val serializationContextWithObservableContext = RpcClientObservableDeSerializer.createContext(serializationContext, observableContext) private fun createRpcObservableMap(): RpcObservableMap { val onObservableRemove = RemovalListener>> { key, _, cause -> @@ -559,7 +559,7 @@ private typealias RpcReplyMap = ConcurrentHashMap /** - * Holds a context available during Kryo deserialisation of messages that are expected to contain Observables. + * Holds a context available during de-serialisation of messages that are expected to contain Observables. * * @param observableMap holds the Observables that are ultimately exposed to the user. * @param hardReferenceStore holds references to Observables we want to keep alive while they are subscribed to. diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/AMQPClientSerializationScheme.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/AMQPClientSerializationScheme.kt index aa0e6618aa..df12645479 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/AMQPClientSerializationScheme.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/AMQPClientSerializationScheme.kt @@ -14,7 +14,6 @@ import net.corda.serialization.internal.amqp.AccessOrderLinkedHashMap import net.corda.serialization.internal.amqp.SerializerFactory import net.corda.serialization.internal.amqp.amqpMagic import net.corda.serialization.internal.amqp.custom.RxNotificationSerializer -import java.util.concurrent.ConcurrentHashMap /** * When set as the serialization scheme for a process, sets it to be the Corda AMQP implementation. @@ -54,7 +53,7 @@ class AMQPClientSerializationScheme( override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { return SerializerFactory(context.whitelist, context.deserializationClassLoader, context.lenientCarpenterEnabled).apply { - register(RpcClientObservableSerializer) + register(RpcClientObservableDeSerializer) register(RpcClientCordaFutureSerializer(this)) register(RxNotificationSerializer(this)) } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientObservableSerializer.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientObservableDeSerializer.kt similarity index 91% rename from client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientObservableSerializer.kt rename to client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientObservableDeSerializer.kt index 5536e10b4c..52e9dc7cab 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientObservableSerializer.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/RpcClientObservableDeSerializer.kt @@ -17,11 +17,11 @@ import java.util.concurrent.atomic.AtomicInteger import javax.transaction.NotSupportedException /** - * Serializer for Rx[Observable] instances for the RPC Client library. Can only be used to deserialize such objects, - * just as the corresponding RPC server side code can only serialize them. Observables are only notionally serialized, + * De-serializer for Rx[Observable] instances for the RPC Client library. Can only be used to deserialize such objects, + * just as the corresponding RPC server side code ([RpcServerObservableSerializer]) can only serialize them. Observables are only notionally serialized, * what is actually sent is a reference to the observable that can then be subscribed to. */ -object RpcClientObservableSerializer : CustomSerializer.Implements>(Observable::class.java) { +object RpcClientObservableDeSerializer : CustomSerializer.Implements>(Observable::class.java) { private object RpcObservableContextKey fun createContext( @@ -83,7 +83,7 @@ object RpcClientObservableSerializer : CustomSerializer.Implements } val observableContext = - context.properties[RpcClientObservableSerializer.RpcObservableContextKey] as ObservableContext + context.properties[RpcClientObservableDeSerializer.RpcObservableContextKey] as ObservableContext if (obj !is List<*>) throw NotSerializableException("Input must be a serialised list") if (obj.size != 2) throw NotSerializableException("Expecting two elements, have ${obj.size}") diff --git a/docs/source/clientrpc.rst b/docs/source/clientrpc.rst index 4598787596..546744e8a6 100644 --- a/docs/source/clientrpc.rst +++ b/docs/source/clientrpc.rst @@ -275,9 +275,12 @@ will be freed automatically. printed to the logs and the observable will be unsubscribed for you. But don't rely on this, as garbage collection is non-deterministic. +.. note:: Observables can only be used as return arguments of an RPC call. It is not currently possible to pass +Observables as parameters to the RPC methods. + Futures ------- -A method can also return a ``ListenableFuture`` in its object graph and it will be treated in a similar manner to +A method can also return a ``CordaFuture`` in its object graph and it will be treated in a similar manner to observables. Calling the ``cancel`` method on the future will unsubscribe it from any future value and release any resources. Versioning diff --git a/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerObservableSerializer.kt b/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerObservableSerializer.kt index bb952e9788..7cdd638152 100644 --- a/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerObservableSerializer.kt +++ b/node/src/main/kotlin/net/corda/node/serialization/amqp/RpcServerObservableSerializer.kt @@ -65,6 +65,8 @@ class RpcServerObservableSerializer : CustomSerializer.Implements> input: DeserializationInput, context: SerializationContext ): Observable<*> { + // Note: this type of server Serializer is never meant to read postings arriving from clients. + // I.e. Observables cannot be used as parameters for RPC methods and can only be used as return values. throw UnsupportedOperationException() } diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt index 7289a9a959..9a0a8d042c 100644 --- a/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt @@ -5,7 +5,7 @@ import com.github.benmanes.caffeine.cache.Cache import com.github.benmanes.caffeine.cache.Caffeine import com.github.benmanes.caffeine.cache.RemovalListener import com.nhaarman.mockito_kotlin.mock -import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer +import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSerializer import net.corda.core.context.Trace import net.corda.core.internal.ThreadBox import net.corda.node.internal.serialization.testutils.AMQPRoundTripRPCSerializationScheme @@ -90,7 +90,7 @@ class RoundTripObservableSerializerTests { val serverSerializationContext = RpcServerObservableSerializer.createContext( serializationContext, serverObservableContext) - val clientSerializationContext = RpcClientObservableSerializer.createContext( + val clientSerializationContext = RpcClientObservableDeSerializer.createContext( serializationContext, clientObservableContext).withProperty(RPCApi.RpcRequestOrObservableIdKey, id) diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt index c397920197..d17755fa4c 100644 --- a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt @@ -1,6 +1,6 @@ package net.corda.node.internal.serialization.testutils -import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer +import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSerializer import net.corda.core.context.Trace import net.corda.core.serialization.ClassWhitelist import net.corda.core.serialization.SerializationContext @@ -29,7 +29,7 @@ class AMQPRoundTripRPCSerializationScheme( ) { override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { return SerializerFactory(AllWhitelist, javaClass.classLoader).apply { - register(RpcClientObservableSerializer) + register(RpcClientObservableDeSerializer) } } @@ -45,7 +45,7 @@ class AMQPRoundTripRPCSerializationScheme( fun rpcClientSerializerFactory(observableContext: ClientObservableContext, id: Trace.InvocationId) = rpcClientSerializerFactory( - RpcClientObservableSerializer.createContext(serializationContext, observableContext) + RpcClientObservableDeSerializer.createContext(serializationContext, observableContext) .withProperty(RPCApi.RpcRequestOrObservableIdKey, id)) fun rpcServerSerializerFactory(observableContext: TestObservableContext) =