RPC changes (#3697)

* Eliminate un-necessary reference to Kryo in the comment

* RPC documentation update.

* Rename RpcClientObservableSerializer into RpcClientObservableDeSerializer
This commit is contained in:
Viktor Kolomeyko
2018-08-06 09:11:15 +01:00
committed by GitHub
parent 40b922c1f2
commit c24d916f5a
7 changed files with 19 additions and 15 deletions

View File

@ -10,7 +10,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.RPCException import net.corda.client.rpc.RPCException
import net.corda.client.rpc.RPCSinceVersion import net.corda.client.rpc.RPCSinceVersion
import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSerializer
import net.corda.core.context.Actor import net.corda.core.context.Actor
import net.corda.core.context.Trace import net.corda.core.context.Trace
import net.corda.core.context.Trace.InvocationId import net.corda.core.context.Trace.InvocationId
@ -155,7 +155,7 @@ class RPCClientProxyHandler(
private val observablesToReap = ThreadBox(object { private val observablesToReap = ThreadBox(object {
var observables = ArrayList<InvocationId>() var observables = ArrayList<InvocationId>()
}) })
private val serializationContextWithObservableContext = RpcClientObservableSerializer.createContext(serializationContext, observableContext) private val serializationContextWithObservableContext = RpcClientObservableDeSerializer.createContext(serializationContext, observableContext)
private fun createRpcObservableMap(): RpcObservableMap { private fun createRpcObservableMap(): RpcObservableMap {
val onObservableRemove = RemovalListener<InvocationId, UnicastSubject<Notification<*>>> { key, _, cause -> val onObservableRemove = RemovalListener<InvocationId, UnicastSubject<Notification<*>>> { key, _, cause ->
@ -559,7 +559,7 @@ private typealias RpcReplyMap = ConcurrentHashMap<InvocationId, SettableFuture<A
private typealias CallSiteMap = ConcurrentHashMap<InvocationId, Throwable?> private typealias CallSiteMap = ConcurrentHashMap<InvocationId, Throwable?>
/** /**
* Holds a context available during Kryo deserialisation of messages that are expected to contain Observables. * Holds a context available during de-serialisation of messages that are expected to contain Observables.
* *
* @param observableMap holds the Observables that are ultimately exposed to the user. * @param observableMap holds the Observables that are ultimately exposed to the user.
* @param hardReferenceStore holds references to Observables we want to keep alive while they are subscribed to. * @param hardReferenceStore holds references to Observables we want to keep alive while they are subscribed to.

View File

@ -14,7 +14,6 @@ import net.corda.serialization.internal.amqp.AccessOrderLinkedHashMap
import net.corda.serialization.internal.amqp.SerializerFactory import net.corda.serialization.internal.amqp.SerializerFactory
import net.corda.serialization.internal.amqp.amqpMagic import net.corda.serialization.internal.amqp.amqpMagic
import net.corda.serialization.internal.amqp.custom.RxNotificationSerializer import net.corda.serialization.internal.amqp.custom.RxNotificationSerializer
import java.util.concurrent.ConcurrentHashMap
/** /**
* When set as the serialization scheme for a process, sets it to be the Corda AMQP implementation. * When set as the serialization scheme for a process, sets it to be the Corda AMQP implementation.
@ -54,7 +53,7 @@ class AMQPClientSerializationScheme(
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
return SerializerFactory(context.whitelist, context.deserializationClassLoader, context.lenientCarpenterEnabled).apply { return SerializerFactory(context.whitelist, context.deserializationClassLoader, context.lenientCarpenterEnabled).apply {
register(RpcClientObservableSerializer) register(RpcClientObservableDeSerializer)
register(RpcClientCordaFutureSerializer(this)) register(RpcClientCordaFutureSerializer(this))
register(RxNotificationSerializer(this)) register(RxNotificationSerializer(this))
} }

View File

@ -17,11 +17,11 @@ import java.util.concurrent.atomic.AtomicInteger
import javax.transaction.NotSupportedException import javax.transaction.NotSupportedException
/** /**
* Serializer for Rx[Observable] instances for the RPC Client library. Can only be used to deserialize such objects, * De-serializer for Rx[Observable] instances for the RPC Client library. Can only be used to deserialize such objects,
* just as the corresponding RPC server side code can only serialize them. Observables are only notionally serialized, * just as the corresponding RPC server side code ([RpcServerObservableSerializer]) can only serialize them. Observables are only notionally serialized,
* what is actually sent is a reference to the observable that can then be subscribed to. * what is actually sent is a reference to the observable that can then be subscribed to.
*/ */
object RpcClientObservableSerializer : CustomSerializer.Implements<Observable<*>>(Observable::class.java) { object RpcClientObservableDeSerializer : CustomSerializer.Implements<Observable<*>>(Observable::class.java) {
private object RpcObservableContextKey private object RpcObservableContextKey
fun createContext( fun createContext(
@ -83,7 +83,7 @@ object RpcClientObservableSerializer : CustomSerializer.Implements<Observable<*>
} }
val observableContext = val observableContext =
context.properties[RpcClientObservableSerializer.RpcObservableContextKey] as ObservableContext context.properties[RpcClientObservableDeSerializer.RpcObservableContextKey] as ObservableContext
if (obj !is List<*>) throw NotSerializableException("Input must be a serialised list") if (obj !is List<*>) throw NotSerializableException("Input must be a serialised list")
if (obj.size != 2) throw NotSerializableException("Expecting two elements, have ${obj.size}") if (obj.size != 2) throw NotSerializableException("Expecting two elements, have ${obj.size}")

View File

@ -275,9 +275,12 @@ will be freed automatically.
printed to the logs and the observable will be unsubscribed for you. But don't rely on this, as garbage collection printed to the logs and the observable will be unsubscribed for you. But don't rely on this, as garbage collection
is non-deterministic. is non-deterministic.
.. note:: Observables can only be used as return arguments of an RPC call. It is not currently possible to pass
Observables as parameters to the RPC methods.
Futures Futures
------- -------
A method can also return a ``ListenableFuture`` in its object graph and it will be treated in a similar manner to A method can also return a ``CordaFuture`` in its object graph and it will be treated in a similar manner to
observables. Calling the ``cancel`` method on the future will unsubscribe it from any future value and release any resources. observables. Calling the ``cancel`` method on the future will unsubscribe it from any future value and release any resources.
Versioning Versioning

View File

@ -65,6 +65,8 @@ class RpcServerObservableSerializer : CustomSerializer.Implements<Observable<*>>
input: DeserializationInput, input: DeserializationInput,
context: SerializationContext context: SerializationContext
): Observable<*> { ): Observable<*> {
// Note: this type of server Serializer is never meant to read postings arriving from clients.
// I.e. Observables cannot be used as parameters for RPC methods and can only be used as return values.
throw UnsupportedOperationException() throw UnsupportedOperationException()
} }

View File

@ -5,7 +5,7 @@ import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.RemovalListener import com.github.benmanes.caffeine.cache.RemovalListener
import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.mock
import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSerializer
import net.corda.core.context.Trace import net.corda.core.context.Trace
import net.corda.core.internal.ThreadBox import net.corda.core.internal.ThreadBox
import net.corda.node.internal.serialization.testutils.AMQPRoundTripRPCSerializationScheme import net.corda.node.internal.serialization.testutils.AMQPRoundTripRPCSerializationScheme
@ -90,7 +90,7 @@ class RoundTripObservableSerializerTests {
val serverSerializationContext = RpcServerObservableSerializer.createContext( val serverSerializationContext = RpcServerObservableSerializer.createContext(
serializationContext, serverObservableContext) serializationContext, serverObservableContext)
val clientSerializationContext = RpcClientObservableSerializer.createContext( val clientSerializationContext = RpcClientObservableDeSerializer.createContext(
serializationContext, clientObservableContext).withProperty(RPCApi.RpcRequestOrObservableIdKey, id) serializationContext, clientObservableContext).withProperty(RPCApi.RpcRequestOrObservableIdKey, id)

View File

@ -1,6 +1,6 @@
package net.corda.node.internal.serialization.testutils package net.corda.node.internal.serialization.testutils
import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableSerializer import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSerializer
import net.corda.core.context.Trace import net.corda.core.context.Trace
import net.corda.core.serialization.ClassWhitelist import net.corda.core.serialization.ClassWhitelist
import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationContext
@ -29,7 +29,7 @@ class AMQPRoundTripRPCSerializationScheme(
) { ) {
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
return SerializerFactory(AllWhitelist, javaClass.classLoader).apply { return SerializerFactory(AllWhitelist, javaClass.classLoader).apply {
register(RpcClientObservableSerializer) register(RpcClientObservableDeSerializer)
} }
} }
@ -45,7 +45,7 @@ class AMQPRoundTripRPCSerializationScheme(
fun rpcClientSerializerFactory(observableContext: ClientObservableContext, id: Trace.InvocationId) = fun rpcClientSerializerFactory(observableContext: ClientObservableContext, id: Trace.InvocationId) =
rpcClientSerializerFactory( rpcClientSerializerFactory(
RpcClientObservableSerializer.createContext(serializationContext, observableContext) RpcClientObservableDeSerializer.createContext(serializationContext, observableContext)
.withProperty(RPCApi.RpcRequestOrObservableIdKey, id)) .withProperty(RPCApi.RpcRequestOrObservableIdKey, id))
fun rpcServerSerializerFactory(observableContext: TestObservableContext) = fun rpcServerSerializerFactory(observableContext: TestObservableContext) =