From 97e04ba7d042167ffbeb952b566a3aa1e2e58b9d Mon Sep 17 00:00:00 2001 From: "rick.parker" Date: Tue, 7 Jun 2016 15:37:52 +0100 Subject: [PATCH] Fix registration of services due to Kryo not following synthetic fields by default Review feedback --- .../core/node/services/testing/MockServices.kt | 2 +- .../com/r3corda/node/internal/AbstractNode.kt | 4 ++-- .../services/statemachine/StateMachineManager.kt | 16 ++++++++++------ 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt b/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt index 48ccb86659..54647fca4a 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt @@ -25,7 +25,7 @@ import java.util.jar.JarInputStream import javax.annotation.concurrent.ThreadSafe @ThreadSafe -class MockIdentityService(val identities: List) : IdentityService { +class MockIdentityService(val identities: List) : IdentityService, SingletonSerializeAsToken() { private val keyToParties: Map get() = synchronized(identities) { identities.associateBy { it.owningKey } } private val nameToParties: Map diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index f7ff0d5244..364a6a8966 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -126,12 +126,12 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, storage = storageServices.first checkpointStorage = storageServices.second net = makeMessagingService() - smm = StateMachineManager(services, checkpointStorage, serverThread) wallet = NodeWalletService(services) keyManagement = E2ETestKeyManagementService() makeInterestRatesOracleService() - api = APIServerImpl(this) identity = makeIdentityService() + api = APIServerImpl(this) + smm = StateMachineManager(services, listOf(storage, net, wallet, keyManagement, identity, platformClock), checkpointStorage, serverThread) // This object doesn't need to be referenced from this class because it registers handlers on the network // service and so that keeps it from being collected. diff --git a/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt index e361f5fd9c..e1b4b24513 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt @@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Fiber import co.paralleluniverse.fibers.FiberExecutorScheduler import co.paralleluniverse.io.serialization.kryo.KryoSerializer import com.codahale.metrics.Gauge +import com.esotericsoftware.kryo.Kryo import com.google.common.base.Throwables import com.google.common.util.concurrent.ListenableFuture import com.r3corda.core.abbreviate @@ -49,7 +50,7 @@ import javax.annotation.concurrent.ThreadSafe * TODO: Implement stub/skel classes that provide a basic RPC framework on top of this. */ @ThreadSafe -class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStorage: CheckpointStorage, val executor: AffinityExecutor) { +class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableServices: List, val checkpointStorage: CheckpointStorage, val executor: AffinityExecutor) { inner class FiberScheduler : FiberExecutorScheduler("Same thread scheduler", executor) val scheduler = FiberScheduler() @@ -70,7 +71,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor private val totalFinishedProtocols = metrics.counter("Protocols.Finished") // Context for tokenized services in checkpoints - private val serializationContext = SerializeAsTokenContext(serviceHub) + private val serializationContext = SerializeAsTokenContext(tokenizableServices, quasarKryo()) /** Returns a list of all state machines executing the given protocol logic at the top level (subprotocols do not count) */ fun

, T> findStateMachines(protocolClass: Class

): List>> { @@ -130,13 +131,17 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor } private fun deserializeFiber(serialisedFiber: SerializedBytes>): ProtocolStateMachineImpl<*> { - val deserializer = Fiber.getFiberSerializer(false) as KryoSerializer - val kryo = createKryo(deserializer.kryo) + val kryo = quasarKryo() // put the map of token -> tokenized into the kryo context SerializeAsTokenSerializer.setContext(kryo, serializationContext) return serialisedFiber.deserialize(kryo) } + private fun quasarKryo(): Kryo { + val serializer = Fiber.getFiberSerializer(false) as KryoSerializer + return createKryo(serializer.kryo) + } + private fun logError(e: Throwable, payload: Any?, topic: String?, psm: ProtocolStateMachineImpl<*>) { psm.logger.error("Protocol state machine ${psm.javaClass.name} threw '${Throwables.getRootCause(e)}' " + "when handling a message of type ${payload?.javaClass?.name} on topic $topic") @@ -214,8 +219,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor // We have a request to do something: send, receive, or send-and-receive. if (request is FiberRequest.ExpectingResponse<*>) { // We don't use the passed-in serializer here, because we need to use our own augmented Kryo. - val deserializer = Fiber.getFiberSerializer(false) as KryoSerializer - val kryo = createKryo(deserializer.kryo) + val kryo = quasarKryo() // add the map of tokens -> tokenizedServices to the kyro context SerializeAsTokenSerializer.setContext(kryo, serializationContext) val serialisedFiber = fiber.serialize(kryo)