diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index b1aa48af5d..3f7a0481bb 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -6,11 +6,13 @@ release, see :doc:`upgrade-notes`. Unreleased ========== - -* Refactor RPC Client Kryo observable serialiser into it's own sub module -* Fix CORDA-1403 where a property of a class that implemented a generic interface could not be deserialised in - a factory without a serialiser as the subtype check for the class instance failed. Fix is to compare the raw +* Refactor RPC Server Kryo observable serializer into it's own sub module + +* Refactor RPC Client Kryo observable serializer into it's own sub module + +* Fix CORDA-1403 where a property of a class that implemented a generic interface could not be deserialized in + a factory without a serializer as the subtype check for the class instance failed. Fix is to compare the raw type. * Due to ongoing work the experimental interfaces for defining custom notary services have been moved to the internal package. diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt index 5a76745aee..5986848c74 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/crypto/X509UtilitiesTest.kt @@ -8,7 +8,7 @@ import net.corda.core.internal.div import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize -import net.corda.node.serialization.KryoServerSerializationScheme +import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.nodeapi.internal.config.SSLConfiguration import net.corda.nodeapi.internal.createDevKeyStores import net.corda.nodeapi.internal.serialization.AllWhitelist diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt index d7acb47e7f..7f8a8c302b 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt @@ -14,7 +14,7 @@ import net.corda.core.internal.FetchDataFlow import net.corda.core.serialization.* import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.sequence -import net.corda.node.serialization.KryoServerSerializationScheme +import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.node.services.persistence.NodeAttachmentService import net.corda.nodeapi.internal.serialization.* import net.corda.testing.core.ALICE_NAME diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 3b4b2719f1..993a26e7a4 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -25,7 +25,7 @@ import net.corda.node.internal.artemis.BrokerAddresses import net.corda.node.internal.cordapp.CordappLoader import net.corda.node.internal.security.RPCSecurityManagerImpl import net.corda.node.internal.security.RPCSecurityManagerWithAdditionalUser -import net.corda.node.serialization.KryoServerSerializationScheme +import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.node.services.api.NodePropertiesStore import net.corda.node.services.api.SchemaService import net.corda.node.services.config.* diff --git a/node/src/main/kotlin/net/corda/node/serialization/KryoServerSerializationScheme.kt b/node/src/main/kotlin/net/corda/node/serialization/kryo/KryoServerSerializationScheme.kt similarity index 91% rename from node/src/main/kotlin/net/corda/node/serialization/KryoServerSerializationScheme.kt rename to node/src/main/kotlin/net/corda/node/serialization/kryo/KryoServerSerializationScheme.kt index 2941bc479a..acf4c7f461 100644 --- a/node/src/main/kotlin/net/corda/node/serialization/KryoServerSerializationScheme.kt +++ b/node/src/main/kotlin/net/corda/node/serialization/kryo/KryoServerSerializationScheme.kt @@ -1,9 +1,8 @@ -package net.corda.node.serialization +package net.corda.node.serialization.kryo import com.esotericsoftware.kryo.pool.KryoPool import net.corda.core.serialization.SerializationContext import net.corda.nodeapi.internal.serialization.CordaSerializationMagic -import net.corda.node.services.messaging.RpcServerObservableSerializer import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer import net.corda.nodeapi.internal.serialization.kryo.kryoMagic diff --git a/node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt b/node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt new file mode 100644 index 0000000000..51013a9bce --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/serialization/kryo/RpcServerObservableSerializer.kt @@ -0,0 +1,87 @@ +package net.corda.node.serialization.kryo + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.Serializer +import com.esotericsoftware.kryo.io.Input +import com.esotericsoftware.kryo.io.Output +import net.corda.core.context.Trace +import net.corda.core.serialization.SerializationContext +import net.corda.core.serialization.SerializationDefaults +import net.corda.node.services.messaging.ObservableSubscription +import net.corda.node.services.messaging.RPCServer +import net.corda.nodeapi.RPCApi +import org.slf4j.LoggerFactory +import rx.Notification +import rx.Observable +import rx.Subscriber + +object RpcServerObservableSerializer : Serializer>() { + private object RpcObservableContextKey + + private val log = LoggerFactory.getLogger(javaClass) + fun createContext(observableContext: RPCServer.ObservableContext): SerializationContext { + return SerializationDefaults.RPC_SERVER_CONTEXT.withProperty(RpcServerObservableSerializer.RpcObservableContextKey, observableContext) + } + + override fun read(kryo: Kryo?, input: Input?, type: Class>?): Observable { + throw UnsupportedOperationException() + } + + override fun write(kryo: Kryo, output: Output, observable: Observable<*>) { + val observableId = Trace.InvocationId.newInstance() + val observableContext = kryo.context[RpcObservableContextKey] as RPCServer.ObservableContext + output.writeInvocationId(observableId) + val observableWithSubscription = ObservableSubscription( + // We capture [observableContext] in the subscriber. Note that all synchronisation/kryo borrowing + // must be done again within the subscriber + subscription = observable.materialize().subscribe( + object : Subscriber>() { + override fun onNext(observation: Notification<*>) { + if (!isUnsubscribed) { + val message = RPCApi.ServerToClient.Observation( + id = observableId, + content = observation, + deduplicationIdentity = observableContext.deduplicationIdentity + ) + observableContext.sendMessage(message) + } + } + + override fun onError(exception: Throwable) { + log.error("onError called in materialize()d RPC Observable", exception) + } + + override fun onCompleted() { + observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> + if (observables != null) { + observables.remove(observableId) + if (observables.isEmpty()) { + null + } else { + observables + } + } else { + null + } + } + } + } + ) + ) + observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> + if (observables == null) { + hashSetOf(observableId) + } else { + observables.add(observableId) + observables + } + } + observableContext.observableMap.put(observableId, observableWithSubscription) + } + + private fun Output.writeInvocationId(id: Trace.InvocationId) { + + writeString(id.value) + writeLong(id.timestamp.toEpochMilli()) + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index 39c8a8c3b5..5f7d193582 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -1,10 +1,6 @@ package net.corda.node.services.messaging import co.paralleluniverse.common.util.SameThreadExecutor -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.Serializer -import com.esotericsoftware.kryo.io.Input -import com.esotericsoftware.kryo.io.Output import com.github.benmanes.caffeine.cache.Cache import com.github.benmanes.caffeine.cache.Caffeine import com.github.benmanes.caffeine.cache.RemovalListener @@ -24,6 +20,7 @@ import net.corda.core.serialization.deserialize import net.corda.core.utilities.* import net.corda.node.internal.security.AuthorizingSubject import net.corda.node.internal.security.RPCSecurityManager +import net.corda.node.serialization.kryo.RpcServerObservableSerializer import net.corda.node.services.logging.pushToLoggingContext import net.corda.nodeapi.RPCApi import net.corda.nodeapi.externalTrace @@ -39,11 +36,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BA import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl import org.apache.activemq.artemis.api.core.management.CoreNotificationType import org.apache.activemq.artemis.api.core.management.ManagementHelper -import org.slf4j.LoggerFactory import org.slf4j.MDC -import rx.Notification -import rx.Observable -import rx.Subscriber import rx.Subscription import java.lang.reflect.InvocationTargetException import java.lang.reflect.Method @@ -486,74 +479,3 @@ class ObservableSubscription( ) typealias ObservableSubscriptionMap = Cache - -object RpcServerObservableSerializer : Serializer>() { - private object RpcObservableContextKey - - private val log = LoggerFactory.getLogger(javaClass) - fun createContext(observableContext: RPCServer.ObservableContext): SerializationContext { - return RPC_SERVER_CONTEXT.withProperty(RpcServerObservableSerializer.RpcObservableContextKey, observableContext) - } - - override fun read(kryo: Kryo?, input: Input?, type: Class>?): Observable { - throw UnsupportedOperationException() - } - - override fun write(kryo: Kryo, output: Output, observable: Observable<*>) { - val observableId = InvocationId.newInstance() - val observableContext = kryo.context[RpcObservableContextKey] as RPCServer.ObservableContext - output.writeInvocationId(observableId) - val observableWithSubscription = ObservableSubscription( - // We capture [observableContext] in the subscriber. Note that all synchronisation/kryo borrowing - // must be done again within the subscriber - subscription = observable.materialize().subscribe( - object : Subscriber>() { - override fun onNext(observation: Notification<*>) { - if (!isUnsubscribed) { - val message = RPCApi.ServerToClient.Observation( - id = observableId, - content = observation, - deduplicationIdentity = observableContext.deduplicationIdentity - ) - observableContext.sendMessage(message) - } - } - - override fun onError(exception: Throwable) { - log.error("onError called in materialize()d RPC Observable", exception) - } - - override fun onCompleted() { - observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> - if (observables != null) { - observables.remove(observableId) - if (observables.isEmpty()) { - null - } else { - observables - } - } else { - null - } - } - } - } - ) - ) - observableContext.clientAddressToObservables.compute(observableContext.clientAddress) { _, observables -> - if (observables == null) { - hashSetOf(observableId) - } else { - observables.add(observableId) - observables - } - } - observableContext.observableMap.put(observableId, observableWithSubscription) - } - - private fun Output.writeInvocationId(id: InvocationId) { - - writeString(id.value) - writeLong(id.timestamp.toEpochMilli()) - } -} diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt index d270ab1d93..ae2c734f96 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/internal/InternalSerializationTestHelpers.kt @@ -5,7 +5,7 @@ import com.nhaarman.mockito_kotlin.whenever import net.corda.client.rpc.internal.serialization.kryo.KryoClientSerializationScheme import net.corda.core.DoNotImplement import net.corda.core.serialization.internal.* -import net.corda.node.serialization.KryoServerSerializationScheme +import net.corda.node.serialization.kryo.KryoServerSerializationScheme import net.corda.nodeapi.internal.serialization.* import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme