From 0d3c7e77623a6ba52c8733e2fffcc118751c723a Mon Sep 17 00:00:00 2001 From: Katelyn Baker Date: Wed, 2 May 2018 15:48:41 +0100 Subject: [PATCH] CORDA-847 - RPC Server lib refactoring (#3056) Just as we did for the RPC CLient, refactor kryo specific elements into their own sub module. Also move kryo specific components out of generic RPC files. Thus, adding AMQP support will be a much smoother operation --- docs/source/changelog.rst | 10 ++- .../internal/crypto/X509UtilitiesTest.kt | 2 +- .../internal/serialization/kryo/KryoTests.kt | 2 +- .../kotlin/net/corda/node/internal/Node.kt | 2 +- .../KryoServerSerializationScheme.kt | 3 +- .../kryo/RpcServerObservableSerializer.kt | 87 +++++++++++++++++++ .../node/services/messaging/RPCServer.kt | 80 +---------------- .../InternalSerializationTestHelpers.kt | 2 +- 8 files changed, 99 insertions(+), 89 deletions(-) rename node/src/main/kotlin/net/corda/node/serialization/{ => kryo}/KryoServerSerializationScheme.kt (91%) create mode 100644 node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index b1aa48af5d..3f7a0481bb 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -6,11 +6,13 @@ release, see :doc:`upgrade-notes`. Unreleased ========== - -* Refactor RPC Client Kryo observable serialiser into it's own sub module -* Fix CORDA-1403 where a property of a class that implemented a generic interface could not be deserialised in - a factory without a serialiser as the subtype check for the class instance failed. Fix is to compare the raw +* Refactor RPC Server Kryo observable serializer into it's own sub module + +* Refactor RPC Client Kryo observable serializer into it's own sub module + +* Fix CORDA-1403 where a property of a class that implemented a generic interface could not be deserialized in + a factory without a serializer as the subtype check for the class instance failed. Fix is to compare the raw type. * Due to ongoing work the experimental interfaces for defining custom notary services have been moved to the internal package. diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt index 5a76745aee..5986848c74 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt @@ -8,7 +8,7 @@ import net.corda.core.internal.div import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize -import net.corda.node.serialization.KryoServerSerializationScheme +import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.nodeapi.internal.config.SSLConfiguration import net.corda.nodeapi.internal.createDevKeyStores import net.corda.nodeapi.internal.serialization.AllWhitelist diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt index d7acb47e7f..7f8a8c302b 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt @@ -14,7 +14,7 @@ import net.corda.core.internal.FetchDataFlow import net.corda.core.serialization.* import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.sequence -import net.corda.node.serialization.KryoServerSerializationScheme +import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.node.services.persistence.NodeAttachmentService import net.corda.nodeapi.internal.serialization.* import net.corda.testing.core.ALICE_NAME diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 3b4b2719f1..993a26e7a4 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -25,7 +25,7 @@ import net.corda.node.internal.artemis.BrokerAddresses import net.corda.node.internal.cordapp.CordappLoader import net.corda.node.internal.security.RPCSecurityManagerImpl import net.corda.node.internal.security.RPCSecurityManagerWithAdditionalUser -import net.corda.node.serialization.KryoServerSerializationScheme +import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.node.services.api.NodePropertiesStore import net.corda.node.services.api.SchemaService import net.corda.node.services.config.* diff --git a/node/src/main/kotlin/net/corda/node/serialization/KryoServerSerializationScheme.kt b/node/src/main/kotlin/net/corda/node/serialization/kryo/KryoServerSerializationScheme.kt similarity index 91% rename from node/src/main/kotlin/net/corda/node/serialization/KryoServerSerializationScheme.kt rename to node/src/main/kotlin/net/corda/node/serialization/kryo/KryoServerSerializationScheme.kt index 2941bc479a..acf4c7f461 100644 --- a/node/src/main/kotlin/net/corda/node/serialization/KryoServerSerializationScheme.kt +++ b/node/src/main/kotlin/net/corda/node/serialization/kryo/KryoServerSerializationScheme.kt @@ -1,9 +1,8 @@ -package net.corda.node.serialization +package net.corda.node.serialization.kryo import com.esotericsoftware.kryo.pool.KryoPool import net.corda.core.serialization.SerializationContext import net.corda.nodeapi.internal.serialization.CordaSerializationMagic -import net.corda.node.services.messaging.RpcServerObservableSerializer import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer import net.corda.nodeapi.internal.serialization.kryo.kryoMagic diff --git a/node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt b/node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt new file mode 100644 index 0000000000..51013a9bce --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt @@ -0,0 +1,87 @@ +package net.corda.node.serialization.kryo + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.Serializer +import com.esotericsoftware.kryo.io.Input +import com.esotericsoftware.kryo.io.Output +import net.corda.core.context.Trace +import net.corda.core.serialization.SerializationContext +import net.corda.core.serialization.SerializationDefaults +import net.corda.node.services.messaging.ObservableSubscription +import net.corda.node.services.messaging.RPCServer +import net.corda.nodeapi.RPCApi +import org.slf4j.LoggerFactory +import rx.Notification +import rx.Observable +import rx.Subscriber + +object RpcServerObservableSerializer : Serializer>() { + private object RpcObservableContextKey + + private val log = LoggerFactory.getLogger(javaClass) + fun createContext(observableContext: RPCServer.ObservableContext): SerializationContext { + return SerializationDefaults.RPC_SERVER_CONTEXT.withProperty(RpcServerObservableSerializer.RpcObservableContextKey, observableContext) + } + + override fun read(kryo: Kryo?, input: Input?, type: Class>?): Observable { + throw UnsupportedOperationException() + } + + override fun write(kryo: Kryo, output: Output, observable: Observable<*>) { + val observableId = Trace.InvocationId.newInstance() + val observableContext = kryo.context[RpcObservableContextKey] as RPCServer.ObservableContext + output.writeInvocationId(observableId) + val observableWithSubscription = ObservableSubscription( + // We capture [observableContext] in the subscriber. Note that all synchronisation/kryo borrowing + // must be done again within the subscriber + subscription = observable.materialize().subscribe( + object : Subscriber>() { + override fun onNext(observation: Notification<*>) { + if (!isUnsubscribed) { + val message = RPCApi.ServerToClient.Observation( + id = observableId, + content = observation, + deduplicationIdentity = observableContext.deduplicationIdentity + ) + observableContext.sendMessage(message) + } + } + + override fun onError(exception: Throwable) { + log.error("onError called in materialize()d RPC Observable", exception) + } + + override fun onCompleted() { + observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> + if (observables != null) { + observables.remove(observableId) + if (observables.isEmpty()) { + null + } else { + observables + } + } else { + null + } + } + } + } + ) + ) + observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> + if (observables == null) { + hashSetOf(observableId) + } else { + observables.add(observableId) + observables + } + } + observableContext.observableMap.put(observableId, observableWithSubscription) + } + + private fun Output.writeInvocationId(id: Trace.InvocationId) { + + writeString(id.value) + writeLong(id.timestamp.toEpochMilli()) + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index 39c8a8c3b5..5f7d193582 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -1,10 +1,6 @@ package net.corda.node.services.messaging import co.paralleluniverse.common.util.SameThreadExecutor -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.Serializer -import com.esotericsoftware.kryo.io.Input -import com.esotericsoftware.kryo.io.Output import com.github.benmanes.caffeine.cache.Cache import com.github.benmanes.caffeine.cache.Caffeine import com.github.benmanes.caffeine.cache.RemovalListener @@ -24,6 +20,7 @@ import net.corda.core.serialization.deserialize import net.corda.core.utilities.* import net.corda.node.internal.security.AuthorizingSubject import net.corda.node.internal.security.RPCSecurityManager +import net.corda.node.serialization.kryo.RpcServerObservableSerializer import net.corda.node.services.logging.pushToLoggingContext import net.corda.nodeapi.RPCApi import net.corda.nodeapi.externalTrace @@ -39,11 +36,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BA import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl import org.apache.activemq.artemis.api.core.management.CoreNotificationType import org.apache.activemq.artemis.api.core.management.ManagementHelper -import org.slf4j.LoggerFactory import org.slf4j.MDC -import rx.Notification -import rx.Observable -import rx.Subscriber import rx.Subscription import java.lang.reflect.InvocationTargetException import java.lang.reflect.Method @@ -486,74 +479,3 @@ class ObservableSubscription( ) typealias ObservableSubscriptionMap = Cache - -object RpcServerObservableSerializer : Serializer>() { - private object RpcObservableContextKey - - private val log = LoggerFactory.getLogger(javaClass) - fun createContext(observableContext: RPCServer.ObservableContext): SerializationContext { - return RPC_SERVER_CONTEXT.withProperty(RpcServerObservableSerializer.RpcObservableContextKey, observableContext) - } - - override fun read(kryo: Kryo?, input: Input?, type: Class>?): Observable { - throw UnsupportedOperationException() - } - - override fun write(kryo: Kryo, output: Output, observable: Observable<*>) { - val observableId = InvocationId.newInstance() - val observableContext = kryo.context[RpcObservableContextKey] as RPCServer.ObservableContext - output.writeInvocationId(observableId) - val observableWithSubscription = ObservableSubscription( - // We capture [observableContext] in the subscriber. Note that all synchronisation/kryo borrowing - // must be done again within the subscriber - subscription = observable.materialize().subscribe( - object : Subscriber>() { - override fun onNext(observation: Notification<*>) { - if (!isUnsubscribed) { - val message = RPCApi.ServerToClient.Observation( - id = observableId, - content = observation, - deduplicationIdentity = observableContext.deduplicationIdentity - ) - observableContext.sendMessage(message) - } - } - - override fun onError(exception: Throwable) { - log.error("onError called in materialize()d RPC Observable", exception) - } - - override fun onCompleted() { - observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> - if (observables != null) { - observables.remove(observableId) - if (observables.isEmpty()) { - null - } else { - observables - } - } else { - null - } - } - } - } - ) - ) - observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> - if (observables == null) { - hashSetOf(observableId) - } else { - observables.add(observableId) - observables - } - } - observableContext.observableMap.put(observableId, observableWithSubscription) - } - - private fun Output.writeInvocationId(id: InvocationId) { - - writeString(id.value) - writeLong(id.timestamp.toEpochMilli()) - } -} diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt index d270ab1d93..ae2c734f96 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt @@ -5,7 +5,7 @@ import com.nhaarman.mockito_kotlin.whenever import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme import net.corda.core.DoNotImplement import net.corda.core.serialization.internal.* -import net.corda.node.serialization.KryoServerSerializationScheme +import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.nodeapi.internal.serialization.* import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme