diff --git a/core/src/main/kotlin/com/r3corda/core/node/ServiceHub.kt b/core/src/main/kotlin/com/r3corda/core/node/ServiceHub.kt index e436574204..1e6173c08c 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/ServiceHub.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/ServiceHub.kt @@ -12,9 +12,8 @@ import java.time.Clock * mocked out. This class is useful to pass to chunks of pluggable code that might have need of many different kinds of * functionality and you don't want to hard-code which types in the interface. * - * All services exposed to protocols (public view) need to implement [SerializeAsToken] or similar to avoid being serialized in checkpoints. - * - * TODO: Split into a public (to contracts etc) and private (to node) view + * Any services exposed to protocols (public view) need to implement [SerializeAsToken] or similar to avoid their internal + * state from being serialized in checkpoints. */ interface ServiceHub { val walletService: WalletService diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt b/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt index e67d4d9f1b..48ccb86659 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/services/testing/MockServices.kt @@ -10,6 +10,7 @@ import com.r3corda.core.node.services.AttachmentStorage import com.r3corda.core.node.services.IdentityService import com.r3corda.core.node.services.KeyManagementService import com.r3corda.core.node.services.StorageService +import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.utilities.RecordingMap import org.slf4j.LoggerFactory import java.io.ByteArrayInputStream @@ -36,7 +37,7 @@ class MockIdentityService(val identities: List) : IdentityService { } -class MockKeyManagementService(vararg initialKeys: KeyPair) : KeyManagementService { +class MockKeyManagementService(vararg initialKeys: KeyPair) : SingletonSerializeAsToken(), KeyManagementService { override val keys: MutableMap init { @@ -88,7 +89,7 @@ class MockStorageService(override val attachments: AttachmentStorage = MockAttac override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public), // This parameter is for unit tests that want to observe operation details. val recordingAs: (String) -> String = { tableName -> "" }) -: StorageService { +: SingletonSerializeAsToken(), StorageService { protected val tables = HashMap>() private fun getMapOriginal(tableName: String): MutableMap { diff --git a/core/src/main/kotlin/com/r3corda/core/serialization/Kryo.kt b/core/src/main/kotlin/com/r3corda/core/serialization/Kryo.kt index 5d4c397e92..39a773add2 100644 --- a/core/src/main/kotlin/com/r3corda/core/serialization/Kryo.kt +++ b/core/src/main/kotlin/com/r3corda/core/serialization/Kryo.kt @@ -284,6 +284,8 @@ fun createKryo(k: Kryo = Kryo()): Kryo { // This ensures a SerializedBytes wrapper is written out as just a byte array. register(SerializedBytes::class.java, SerializedBytesSerializer) + + addDefaultSerializer(SerializeAsToken::class.java, SerializeAsTokenSerializer()) } } diff --git a/core/src/main/kotlin/com/r3corda/core/serialization/SerializationToken.kt b/core/src/main/kotlin/com/r3corda/core/serialization/SerializationToken.kt index 0e2e50418d..9aa1fa4d4d 100644 --- a/core/src/main/kotlin/com/r3corda/core/serialization/SerializationToken.kt +++ b/core/src/main/kotlin/com/r3corda/core/serialization/SerializationToken.kt @@ -1,12 +1,10 @@ package com.r3corda.core.serialization -import com.esotericsoftware.kryo.DefaultSerializer import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.KryoException import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output -import java.lang.ref.WeakReference import java.util.* /** @@ -23,76 +21,113 @@ import java.util.* * they are serialized because they have a lot of internal state that does not serialize (well). * * This models a similar pattern to the readReplace/writeReplace methods in Java serialization. - * - * With Kryo serialisation, these classes should also annotate themselves with @DefaultSerializer. See below. - * */ interface SerializeAsToken { - val token: SerializationToken + fun toToken(context: SerializeAsTokenContext): SerializationToken } /** * This represents a token in the serialized stream for an instance of a type that implements [SerializeAsToken] */ interface SerializationToken { - fun fromToken(): Any + fun fromToken(context: SerializeAsTokenContext): Any } /** * A Kryo serializer for [SerializeAsToken] implementations. * - * Annotate the [SerializeAsToken] with @DefaultSerializer(SerializeAsTokenSerializer::class) + * This is registered in [createKryo]. */ class SerializeAsTokenSerializer : Serializer() { override fun write(kryo: Kryo, output: Output, obj: T) { - kryo.writeClassAndObject(output, obj.token) + kryo.writeClassAndObject(output, obj.toToken(getContext(kryo) ?: throw KryoException("Attempt to write a ${SerializeAsToken::class.simpleName} instance of ${obj.javaClass.name} without initialising a context"))) } override fun read(kryo: Kryo, input: Input, type: Class): T { val token = (kryo.readClassAndObject(input) as? SerializationToken) ?: throw KryoException("Non-token read for tokenized type: ${type.name}") - val fromToken = token.fromToken() + val fromToken = token.fromToken(getContext(kryo) ?: throw KryoException("Attempt to read a token for a ${SerializeAsToken::class.simpleName} instance of ${type.name} without initialising a context")) if (type.isAssignableFrom(fromToken.javaClass)) { return type.cast(fromToken) } else { - throw KryoException("Token read did not return tokenized type: ${type.name}") + throw KryoException("Token read ($token) did not return expected tokenized type: ${type.name}") + } + } + + companion object { + private fun getContext(kryo: Kryo): SerializeAsTokenContext? = kryo.context.get(SerializeAsTokenContext::class.java) as? SerializeAsTokenContext + + fun setContext(kryo: Kryo, context: SerializeAsTokenContext) { + kryo.context.put(SerializeAsTokenContext::class.java, context) + } + + fun clearContext(kryo: Kryo) { + kryo.context.remove(SerializeAsTokenContext::class.java) } } } /** - * A class representing a [SerializationToken] for some object that is not serializable but can be re-created or looked up - * (when deserialized) via a [String] key. + * A context for mapping SerializationTokens to/from SerializeAsTokens. + * + * A context is initialised with an object containing all the instances of [SerializeAsToken] to eagerly register all the tokens. + * In our case this can be the [ServiceHub]. + * + * Then it is a case of using the companion object methods on [SerializeAsTokenSerializer] to set and clear context as necessary + * on the Kryo instance when serializing to enable/disable tokenization. */ -private data class SerializationStringToken(private val key: String, private val className: String) : SerializationToken { +class SerializeAsTokenContext(toBeTokenized: Any, kryo: Kryo = createKryo()) { + internal val tokenToTokenized = HashMap() + internal var readOnly = false - constructor(key: String, toBeProxied: SerializeAsStringToken) : this(key, toBeProxied.javaClass.name) { - tokenized.put(this, WeakReference(toBeProxied)) + init { + /* + * Go ahead and eagerly serialize the object to register all of the tokens in the context. + * + * This results in the toToken() method getting called for any [SerializeAsStringToken] instances which + * are encountered in the object graph as they are serialized by Kryo and will therefore register the token to + * object mapping for those instances. We then immediately set the readOnly flag to stop further adhoc or + * accidental registrations from occuring as these could not be deserialized in a deserialization-first + * scenario if they are not part of this iniital context construction serialization. + */ + SerializeAsTokenSerializer.setContext(kryo, this) + toBeTokenized.serialize(kryo) + SerializeAsTokenSerializer.clearContext(kryo) + readOnly = true } +} + +/** + * A class representing a [SerializationToken] for some object that is not serializable but can be looked up + * (when deserialized) via just the class name. + */ +data class SingletonSerializationToken private constructor(private val className: String) : SerializationToken { + + constructor(toBeTokenized: SerializeAsToken) : this(toBeTokenized.javaClass.name) + + override fun fromToken(context: SerializeAsTokenContext): Any = context.tokenToTokenized[this] ?: + throw IllegalStateException("Unable to find tokenized instance of ${className} in context $context") companion object { - val tokenized = Collections.synchronizedMap(WeakHashMap>()) - } + fun registerWithContext(token: SingletonSerializationToken, toBeTokenized: SerializeAsToken, context: SerializeAsTokenContext): SerializationToken = + if (token in context.tokenToTokenized) token else registerNewToken(token, toBeTokenized, context) - override fun fromToken(): Any = tokenized.get(this)?.get() ?: - throw IllegalStateException("Unable to find tokenized instance of ${className} for key $key") + // Only allowable if we are in SerializeAsTokenContext init (readOnly == false) + private fun registerNewToken(token: SingletonSerializationToken, toBeTokenized: SerializeAsToken, context: SerializeAsTokenContext): SerializationToken { + if (context.readOnly) throw UnsupportedOperationException("Attempt to write token for lazy registered ${toBeTokenized.javaClass.name}. " + + "All tokens should be registered during context construction.") + context.tokenToTokenized[token] = toBeTokenized + return token + } + } } /** * A base class for implementing large objects / components / services that need to serialize themselves to a string token * to indicate which instance the token is a serialized form of. - * - * This class will also double check that the class is annotated for Kryo serialization. Note it does this on every - * instance constructed but given this is designed to represent heavyweight services or components, this should not be significant. */ -abstract class SerializeAsStringToken(val key: String) : SerializeAsToken { +abstract class SingletonSerializeAsToken() : SerializeAsToken { - init { - // Verify we have the annotation - val annotation = javaClass.getAnnotation(DefaultSerializer::class.java) - if (annotation == null || annotation.value.java.name != SerializeAsTokenSerializer::class.java.name) { - throw IllegalStateException("${this.javaClass.name} is not annotated with @${DefaultSerializer::class.java.simpleName} set to ${SerializeAsTokenSerializer::class.java.simpleName}") - } - } + private val token = SingletonSerializationToken(this) - override val token: SerializationToken = SerializationStringToken(key, this) + override fun toToken(context: SerializeAsTokenContext) = SingletonSerializationToken.registerWithContext(token, this, context) } \ No newline at end of file diff --git a/core/src/test/kotlin/com/r3corda/core/serialization/SerializationTokenTest.kt b/core/src/test/kotlin/com/r3corda/core/serialization/SerializationTokenTest.kt index 786bc62601..44f27702e8 100644 --- a/core/src/test/kotlin/com/r3corda/core/serialization/SerializationTokenTest.kt +++ b/core/src/test/kotlin/com/r3corda/core/serialization/SerializationTokenTest.kt @@ -1,17 +1,34 @@ package com.r3corda.core.serialization -import com.esotericsoftware.kryo.DefaultSerializer +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.KryoException +import com.esotericsoftware.kryo.io.Output import org.assertj.core.api.Assertions.assertThat +import org.junit.After +import org.junit.Before import org.junit.Test -import kotlin.test.assertEquals -import kotlin.test.assertNotEquals +import java.io.ByteArrayOutputStream class SerializationTokenTest { + lateinit var kryo: Kryo + + @Before + fun setup() { + kryo = THREAD_LOCAL_KRYO.get() + } + + @After + fun cleanup() { + SerializeAsTokenSerializer.clearContext(kryo) + } + // Large tokenizable object so we can tell from the smaller number of serialized bytes it was actually tokenized - @DefaultSerializer(SerializeAsTokenSerializer::class) - private class LargeTokenizable(size: Int) : SerializeAsStringToken(size.toString()) { - val bytes = OpaqueBytes(ByteArray(size)) + private class LargeTokenizable : SingletonSerializeAsToken() { + val bytes = OpaqueBytes(ByteArray(1024)) + + val numBytes: Int + get() = bytes.size override fun hashCode() = bytes.bits.size @@ -20,61 +37,78 @@ class SerializationTokenTest { @Test fun `write token and read tokenizable`() { - val numBytes = 1024 - val tokenizableBefore = LargeTokenizable(numBytes) - val serializedBytes = tokenizableBefore.serialize() - assertThat(serializedBytes.size).isLessThan(numBytes) - val tokenizableAfter = serializedBytes.deserialize() - assertEquals(tokenizableBefore, tokenizableAfter) - } - - @Test - fun `check same sized tokenizable equal`() { - val tokenizableBefore = LargeTokenizable(1024) - val tokenizableAfter = LargeTokenizable(1024) - assertEquals(tokenizableBefore, tokenizableAfter) - } - - @Test - fun `check different sized tokenizable not equal`() { - val tokenizableBefore = LargeTokenizable(1024) - val tokenizableAfter = LargeTokenizable(1025) - assertNotEquals(tokenizableBefore, tokenizableAfter) - } - - @DefaultSerializer(SerializeAsTokenSerializer::class) - private class IntegerSerializeAsKeyedToken(val value: Int) : SerializeAsStringToken(value.toString()) - - @Test - fun `write and read keyed`() { - val tokenizableBefore1 = IntegerSerializeAsKeyedToken(123) - val tokenizableBefore2 = IntegerSerializeAsKeyedToken(456) - - val serializedBytes1 = tokenizableBefore1.serialize() - val tokenizableAfter1 = serializedBytes1.deserialize() - val serializedBytes2 = tokenizableBefore2.serialize() - val tokenizableAfter2 = serializedBytes2.deserialize() - - assertThat(tokenizableAfter1).isSameAs(tokenizableBefore1) - assertThat(tokenizableAfter2).isSameAs(tokenizableBefore2) - } - - @DefaultSerializer(SerializeAsTokenSerializer::class) - private class UnitSerializeAsSingletonToken : SerializeAsStringToken("Unit0") - - @Test - fun `write and read singleton`() { - val tokenizableBefore = UnitSerializeAsSingletonToken() - val serializedBytes = tokenizableBefore.serialize() - val tokenizableAfter = serializedBytes.deserialize() + val tokenizableBefore = LargeTokenizable() + val context = SerializeAsTokenContext(tokenizableBefore, kryo) + SerializeAsTokenSerializer.setContext(kryo, context) + val serializedBytes = tokenizableBefore.serialize(kryo) + assertThat(serializedBytes.size).isLessThan(tokenizableBefore.numBytes) + val tokenizableAfter = serializedBytes.deserialize(kryo) assertThat(tokenizableAfter).isSameAs(tokenizableBefore) } - private class UnannotatedSerializeAsSingletonToken : SerializeAsStringToken("Unannotated0") + private class UnitSerializeAsToken : SingletonSerializeAsToken() - @Test(expected = IllegalStateException::class) - fun `unannotated throws`() { - @Suppress("UNUSED_VARIABLE") - val tokenizableBefore = UnannotatedSerializeAsSingletonToken() + @Test + fun `write and read singleton`() { + val tokenizableBefore = UnitSerializeAsToken() + val context = SerializeAsTokenContext(tokenizableBefore, kryo) + SerializeAsTokenSerializer.setContext(kryo, context) + val serializedBytes = tokenizableBefore.serialize(kryo) + val tokenizableAfter = serializedBytes.deserialize(kryo) + assertThat(tokenizableAfter).isSameAs(tokenizableBefore) + } + + @Test(expected = UnsupportedOperationException::class) + fun `new token encountered after context init`() { + val tokenizableBefore = UnitSerializeAsToken() + val context = SerializeAsTokenContext(emptyList(), kryo) + SerializeAsTokenSerializer.setContext(kryo, context) + tokenizableBefore.serialize(kryo) + } + + @Test(expected = UnsupportedOperationException::class) + fun `deserialize unregistered token`() { + val tokenizableBefore = UnitSerializeAsToken() + val context = SerializeAsTokenContext(emptyList(), kryo) + SerializeAsTokenSerializer.setContext(kryo, context) + val serializedBytes = tokenizableBefore.toToken(SerializeAsTokenContext(emptyList(), kryo)).serialize(kryo) + serializedBytes.deserialize(kryo) + } + + @Test(expected = KryoException::class) + fun `no context set`() { + val tokenizableBefore = UnitSerializeAsToken() + tokenizableBefore.serialize(kryo) + } + + @Test(expected = KryoException::class) + fun `deserialize non-token`() { + val tokenizableBefore = UnitSerializeAsToken() + val context = SerializeAsTokenContext(tokenizableBefore, kryo) + SerializeAsTokenSerializer.setContext(kryo, context) + val stream = ByteArrayOutputStream() + Output(stream).use { + kryo.writeClass(it, SingletonSerializeAsToken::class.java) + kryo.writeObject(it, emptyList()) + } + val serializedBytes = SerializedBytes(stream.toByteArray()) + serializedBytes.deserialize(kryo) + } + + private class WrongTypeSerializeAsToken : SerializeAsToken { + override fun toToken(context: SerializeAsTokenContext): SerializationToken { + return object : SerializationToken { + override fun fromToken(context: SerializeAsTokenContext): Any = UnitSerializeAsToken() + } + } + } + + @Test(expected = KryoException::class) + fun `token returns unexpected type`() { + val tokenizableBefore = WrongTypeSerializeAsToken() + val context = SerializeAsTokenContext(tokenizableBefore, kryo) + SerializeAsTokenSerializer.setContext(kryo, context) + val serializedBytes = tokenizableBefore.serialize(kryo) + serializedBytes.deserialize(kryo) } } \ No newline at end of file diff --git a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt index 0c51a7470a..7ce76b0e18 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt @@ -7,6 +7,7 @@ import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.ServiceType import com.r3corda.core.utilities.loggerFor import com.r3corda.node.api.APIServer +import com.r3corda.node.serialization.NodeClock import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.messaging.ArtemisMessagingService import com.r3corda.node.servlets.AttachmentDownloadServlet @@ -52,7 +53,7 @@ class ConfigurationException(message: String) : Exception(message) */ class Node(dir: Path, val p2pAddr: HostAndPort, configuration: NodeConfiguration, networkMapAddress: NodeInfo?, advertisedServices: Set, - clock: Clock = Clock.systemUTC(), + clock: Clock = NodeClock(), val clientAPIs: List> = listOf()) : AbstractNode(dir, configuration, networkMapAddress, advertisedServices, clock) { companion object { /** The port that is used by default if none is specified. As you know, 31337 is the most elite number. */ diff --git a/node/src/main/kotlin/com/r3corda/node/internal/testing/MockNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/testing/MockNode.kt index be3f5d3287..64573a7c1a 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/testing/MockNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/testing/MockNode.kt @@ -12,6 +12,7 @@ import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.testing.MockIdentityService import com.r3corda.core.utilities.loggerFor import com.r3corda.node.internal.AbstractNode +import com.r3corda.node.serialization.NodeClock import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.network.InMemoryMessagingNetwork import com.r3corda.node.services.network.NetworkMapService @@ -21,7 +22,6 @@ import org.slf4j.Logger import java.nio.file.Files import java.nio.file.Path import java.security.KeyPair -import java.time.Clock import java.util.* /** @@ -66,7 +66,7 @@ class MockNetwork(private val threadPerNode: Boolean = false, } open class MockNode(dir: Path, config: NodeConfiguration, val mockNet: MockNetwork, networkMapAddr: NodeInfo?, - advertisedServices: Set, val id: Int, val keyPair: KeyPair?) : AbstractNode(dir, config, networkMapAddr, advertisedServices, Clock.systemUTC()) { + advertisedServices: Set, val id: Int, val keyPair: KeyPair?) : AbstractNode(dir, config, networkMapAddr, advertisedServices, NodeClock()) { override val log: Logger = loggerFor() override val serverThread: AffinityExecutor = if (mockNet.threadPerNode) diff --git a/node/src/main/kotlin/com/r3corda/node/serialization/NodeClock.kt b/node/src/main/kotlin/com/r3corda/node/serialization/NodeClock.kt new file mode 100644 index 0000000000..a7ca24fd3f --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/serialization/NodeClock.kt @@ -0,0 +1,35 @@ +package com.r3corda.node.serialization + +import com.r3corda.core.serialization.SerializeAsToken +import com.r3corda.core.serialization.SerializeAsTokenContext +import com.r3corda.core.serialization.SingletonSerializationToken +import java.time.Clock +import java.time.Instant +import java.time.ZoneId +import javax.annotation.concurrent.ThreadSafe + + +/** + * A [Clock] that tokenizes itself when serialized, and delegates to an underlying [Clock] implementation. + */ +@ThreadSafe +class NodeClock(private val delegateClock: Clock = Clock.systemUTC()) : Clock(), SerializeAsToken { + + private val token = SingletonSerializationToken(this) + + override fun toToken(context: SerializeAsTokenContext) = SingletonSerializationToken.registerWithContext(token, this, context) + + override fun instant(): Instant { + return delegateClock.instant() + } + + // Do not use this. Instead seek to use ZonedDateTime methods. + override fun withZone(zone: ZoneId): Clock { + throw UnsupportedOperationException("Tokenized clock does not support withZone()") + } + + override fun getZone(): ZoneId { + return delegateClock.zone + } + +} \ No newline at end of file diff --git a/node/src/main/kotlin/com/r3corda/node/services/api/MonitoringService.kt b/node/src/main/kotlin/com/r3corda/node/services/api/MonitoringService.kt index 3b7fd2a2dc..97d84b8b79 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/api/MonitoringService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/api/MonitoringService.kt @@ -1,10 +1,11 @@ package com.r3corda.node.services.api import com.codahale.metrics.MetricRegistry +import com.r3corda.core.serialization.SingletonSerializeAsToken /** * Provides access to various metrics and ways to notify monitoring services of things, for sysadmin purposes. * This is not an interface because it is too lightweight to bother mocking out. */ -class MonitoringService(val metrics: MetricRegistry) \ No newline at end of file +class MonitoringService(val metrics: MetricRegistry) : SingletonSerializeAsToken() \ No newline at end of file diff --git a/node/src/main/kotlin/com/r3corda/node/services/identity/InMemoryIdentityService.kt b/node/src/main/kotlin/com/r3corda/node/services/identity/InMemoryIdentityService.kt index d245b9f5b9..243dbda4c9 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/identity/InMemoryIdentityService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/identity/InMemoryIdentityService.kt @@ -2,6 +2,7 @@ package com.r3corda.node.services.identity import com.r3corda.core.crypto.Party import com.r3corda.core.node.services.IdentityService +import com.r3corda.core.serialization.SingletonSerializeAsToken import java.security.PublicKey import java.util.concurrent.ConcurrentHashMap import javax.annotation.concurrent.ThreadSafe @@ -10,7 +11,7 @@ import javax.annotation.concurrent.ThreadSafe * Simple identity service which caches parties and provides functionality for efficient lookup. */ @ThreadSafe -class InMemoryIdentityService() : IdentityService { +class InMemoryIdentityService() : SingletonSerializeAsToken(), IdentityService { private val keyToParties = ConcurrentHashMap() private val nameToParties = ConcurrentHashMap() diff --git a/node/src/main/kotlin/com/r3corda/node/services/keys/E2ETestKeyManagementService.kt b/node/src/main/kotlin/com/r3corda/node/services/keys/E2ETestKeyManagementService.kt index b3528b4138..05c4247caf 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/keys/E2ETestKeyManagementService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/keys/E2ETestKeyManagementService.kt @@ -3,6 +3,7 @@ package com.r3corda.node.services.keys import com.r3corda.core.ThreadBox import com.r3corda.core.crypto.generateKeyPair import com.r3corda.core.node.services.KeyManagementService +import com.r3corda.core.serialization.SingletonSerializeAsToken import java.security.KeyPair import java.security.PrivateKey import java.security.PublicKey @@ -21,7 +22,7 @@ import javax.annotation.concurrent.ThreadSafe * etc */ @ThreadSafe -class E2ETestKeyManagementService : KeyManagementService { +class E2ETestKeyManagementService() : SingletonSerializeAsToken(), KeyManagementService { private class InnerState { val keys = HashMap() } diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt index d99485fe91..479d89c210 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt @@ -4,8 +4,9 @@ import com.google.common.net.HostAndPort import com.r3corda.core.RunOnCallerThread import com.r3corda.core.ThreadBox import com.r3corda.core.messaging.* -import com.r3corda.node.internal.Node +import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.utilities.loggerFor +import com.r3corda.node.internal.Node import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.TransportConfiguration import org.apache.activemq.artemis.api.core.client.* @@ -52,7 +53,7 @@ import javax.annotation.concurrent.ThreadSafe */ @ThreadSafe class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort, - val defaultExecutor: Executor = RunOnCallerThread) : MessagingService { + val defaultExecutor: Executor = RunOnCallerThread) : SingletonSerializeAsToken(), MessagingService { // In future: can contain onion routing info, etc. private data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt index 03192a4b53..fa5e61e6f0 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt @@ -6,6 +6,7 @@ import com.google.common.util.concurrent.MoreExecutors import com.r3corda.core.ThreadBox import com.r3corda.core.crypto.sha256 import com.r3corda.core.messaging.* +import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.utilities.loggerFor import com.r3corda.core.utilities.trace import org.slf4j.LoggerFactory @@ -28,7 +29,7 @@ import kotlin.concurrent.thread * testing). */ @ThreadSafe -class InMemoryMessagingNetwork { +class InMemoryMessagingNetwork() : SingletonSerializeAsToken() { companion object { val MESSAGES_LOG_NAME = "messages" private val log = LoggerFactory.getLogger(MESSAGES_LOG_NAME) @@ -167,7 +168,7 @@ class InMemoryMessagingNetwork { * An instance can be obtained by creating a builder and then using the start method. */ @ThreadSafe - inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val handle: Handle) : MessagingService { + inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val handle: Handle) : SingletonSerializeAsToken(), MessagingService { inner class Handler(val executor: Executor?, val topic: String, val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt index 8fb9f2ae74..6c182f2988 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt @@ -15,6 +15,7 @@ import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.TOPIC_DEFAULT_POSTFIX import com.r3corda.core.random63BitValue +import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.serialize import com.r3corda.node.services.api.RegulatorService @@ -30,7 +31,7 @@ import javax.annotation.concurrent.ThreadSafe * Extremely simple in-memory cache of the network map. */ @ThreadSafe -open class InMemoryNetworkMapCache() : NetworkMapCache { +open class InMemoryNetworkMapCache() : SingletonSerializeAsToken(), NetworkMapCache { override val networkMapNodes: List get() = get(NetworkMapService.Type) override val regulators: List diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/MockNetworkMapCache.kt b/node/src/main/kotlin/com/r3corda/node/services/network/MockNetworkMapCache.kt index 4c9e1751c8..a59c936e5a 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/MockNetworkMapCache.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/MockNetworkMapCache.kt @@ -1,15 +1,8 @@ -/* - * Copyright 2016 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members - * pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms - * set forth therein. - * - * All other rights reserved. - */ package com.r3corda.node.services.network import co.paralleluniverse.common.util.VisibleForTesting -import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.DummyPublicKey +import com.r3corda.core.crypto.Party import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.NodeInfo diff --git a/node/src/main/kotlin/com/r3corda/node/services/persistence/StorageServiceImpl.kt b/node/src/main/kotlin/com/r3corda/node/services/persistence/StorageServiceImpl.kt index 3c81f85ae8..86a485773e 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/persistence/StorageServiceImpl.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/persistence/StorageServiceImpl.kt @@ -1,10 +1,11 @@ package com.r3corda.node.services.persistence -import com.r3corda.core.crypto.Party import com.r3corda.core.contracts.SignedTransaction +import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SecureHash import com.r3corda.core.node.services.AttachmentStorage import com.r3corda.core.node.services.StorageService +import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.utilities.RecordingMap import org.slf4j.LoggerFactory import java.security.KeyPair @@ -15,7 +16,7 @@ open class StorageServiceImpl(override val attachments: AttachmentStorage, override val myLegalIdentity: Party = Party("Unit test party", myLegalIdentityKey.public), // This parameter is for unit tests that want to observe operation details. val recordingAs: (String) -> String = { tableName -> "" }) -: StorageService { +: SingletonSerializeAsToken(), StorageService { protected val tables = HashMap>() private fun getMapOriginal(tableName: String): MutableMap { diff --git a/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt b/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt index 3233daf20f..e4120e96d9 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt @@ -3,17 +3,11 @@ package com.r3corda.node.services.statemachine import co.paralleluniverse.fibers.Fiber import co.paralleluniverse.fibers.FiberScheduler import co.paralleluniverse.fibers.Suspendable -import co.paralleluniverse.io.serialization.kryo.KryoSerializer import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.SettableFuture import com.r3corda.core.messaging.MessageRecipients -import com.r3corda.node.services.statemachine.StateMachineManager -import com.r3corda.core.node.ServiceHub import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolStateMachine -import com.r3corda.core.serialization.SerializedBytes -import com.r3corda.core.serialization.createKryo -import com.r3corda.core.serialization.serialize import com.r3corda.core.utilities.UntrustworthyData import com.r3corda.node.services.api.ServiceHubInternal import org.slf4j.Logger @@ -30,7 +24,7 @@ import org.slf4j.LoggerFactory class ProtocolStateMachineImpl(val logic: ProtocolLogic, scheduler: FiberScheduler, val loggerName: String) : Fiber("protocol", scheduler), ProtocolStateMachine { // These fields shouldn't be serialised, so they are marked @Transient. - @Transient private var suspendAction: ((result: StateMachineManager.FiberRequest, serialisedFiber: SerializedBytes>) -> Unit)? = null + @Transient private var suspendAction: ((result: StateMachineManager.FiberRequest, fiber: ProtocolStateMachineImpl<*>) -> Unit)? = null @Transient private var resumeWithObject: Any? = null @Transient lateinit override var serviceHub: ServiceHubInternal @@ -59,7 +53,7 @@ class ProtocolStateMachineImpl(val logic: ProtocolLogic, scheduler: FiberS fun prepareForResumeWith(serviceHub: ServiceHubInternal, withObject: Any?, - suspendAction: (StateMachineManager.FiberRequest, SerializedBytes>) -> Unit) { + suspendAction: (StateMachineManager.FiberRequest, ProtocolStateMachineImpl<*>) -> Unit) { this.suspendAction = suspendAction this.resumeWithObject = withObject this.serviceHub = serviceHub @@ -108,10 +102,7 @@ class ProtocolStateMachineImpl(val logic: ProtocolLogic, scheduler: FiberS @Suspendable private fun suspend(with: StateMachineManager.FiberRequest) { parkAndSerialize { fiber, serializer -> - // We don't use the passed-in serializer here, because we need to use our own augmented Kryo. - val deserializer = getFiberSerializer(false) as KryoSerializer - val kryo = createKryo(deserializer.kryo) - suspendAction!!(with, this.serialize(kryo)) + suspendAction!!(with, this) } } diff --git a/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt index 506d8a038a..319c03897c 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt @@ -12,10 +12,7 @@ import com.r3corda.core.messaging.runOnNextMessage import com.r3corda.core.messaging.send import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolStateMachine -import com.r3corda.core.serialization.SerializedBytes -import com.r3corda.core.serialization.THREAD_LOCAL_KRYO -import com.r3corda.core.serialization.createKryo -import com.r3corda.core.serialization.deserialize +import com.r3corda.core.serialization.* import com.r3corda.core.then import com.r3corda.core.utilities.ProgressTracker import com.r3corda.core.utilities.trace @@ -51,8 +48,6 @@ import javax.annotation.concurrent.ThreadSafe * TODO: Timeouts * TODO: Surfacing of exceptions via an API and/or management UI * TODO: Ability to control checkpointing explicitly, for cases where you know replaying a message can't hurt - * TODO: Make Kryo (de)serialize markers for heavy objects that are currently in the service hub. This avoids mistakes - * where services are temporarily put on the stack. * TODO: Implement stub/skel classes that provide a basic RPC framework on top of this. */ @ThreadSafe @@ -76,6 +71,9 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor private val totalStartedProtocols = metrics.counter("Protocols.Started") private val totalFinishedProtocols = metrics.counter("Protocols.Finished") + // Context for tokenized services in checkpoints + private val serializationContext = SerializeAsTokenContext(serviceHub) + /** Returns a list of all state machines executing the given protocol logic at the top level (subprotocols do not count) */ fun findStateMachines(klass: Class>): List, ListenableFuture>> { synchronized(stateMachines) { @@ -139,6 +137,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor private fun deserializeFiber(serialisedFiber: SerializedBytes>): ProtocolStateMachineImpl<*> { val deserializer = Fiber.getFiberSerializer(false) as KryoSerializer val kryo = createKryo(deserializer.kryo) + // put the map of token -> tokenized into the kryo context + SerializeAsTokenSerializer.setContext(kryo, serializationContext) return serialisedFiber.deserialize(kryo) as ProtocolStateMachineImpl<*> } @@ -202,9 +202,15 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val checkpointStor obj: Any?, resumeFunc: (ProtocolStateMachineImpl<*>) -> Unit) { executor.checkOnThread() - val onSuspend = fun(request: FiberRequest, serialisedFiber: SerializedBytes>) { + val onSuspend = fun(request: FiberRequest, fiber: ProtocolStateMachineImpl<*>) { // We have a request to do something: send, receive, or send-and-receive. if (request is FiberRequest.ExpectingResponse<*>) { + // We don't use the passed-in serializer here, because we need to use our own augmented Kryo. + val deserializer = Fiber.getFiberSerializer(false) as KryoSerializer + val kryo = createKryo(deserializer.kryo) + // add the map of tokens -> tokenizedServices to the kyro context + SerializeAsTokenSerializer.setContext(kryo, serializationContext) + val serialisedFiber = fiber.serialize(kryo) // Prepare a listener on the network that runs in the background thread when we received a message. checkpointAndSetupMessageHandler(psm, request, serialisedFiber) } diff --git a/node/src/main/kotlin/com/r3corda/node/services/wallet/NodeWalletService.kt b/node/src/main/kotlin/com/r3corda/node/services/wallet/NodeWalletService.kt index 6f73708568..ce0a5a656d 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/wallet/NodeWalletService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/wallet/NodeWalletService.kt @@ -8,6 +8,7 @@ import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SecureHash import com.r3corda.core.node.services.Wallet import com.r3corda.core.node.services.WalletService +import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.utilities.loggerFor import com.r3corda.core.utilities.trace import com.r3corda.node.services.api.ServiceHubInternal @@ -21,7 +22,7 @@ import javax.annotation.concurrent.ThreadSafe * states relevant to us into a database and once such a wallet is implemented, this scaffolding can be removed. */ @ThreadSafe -class NodeWalletService(private val services: ServiceHubInternal) : WalletService { +class NodeWalletService(private val services: ServiceHubInternal) : SingletonSerializeAsToken(), WalletService { private val log = loggerFor() // Variables inside InnerState are protected with a lock by the ThreadBox and aren't in scope unless you're diff --git a/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt b/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt index bc24e12c91..b4331bf88a 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt @@ -5,6 +5,7 @@ import com.r3corda.core.messaging.MessagingService import com.r3corda.core.node.services.* import com.r3corda.core.node.services.testing.MockStorageService import com.r3corda.core.testing.MOCK_IDENTITY_SERVICE +import com.r3corda.node.serialization.NodeClock import com.r3corda.node.services.api.Checkpoint import com.r3corda.node.services.api.CheckpointStorage import com.r3corda.node.services.api.MonitoringService @@ -40,7 +41,7 @@ class MockServices( val storage: StorageService? = MockStorageService(), val mapCache: NetworkMapCache? = MockNetworkMapCache(), val mapService: NetworkMapService? = null, - val overrideClock: Clock? = Clock.systemUTC() + val overrideClock: Clock? = NodeClock() ) : ServiceHubInternal { override val walletService: WalletService = customWallet ?: NodeWalletService(this) diff --git a/src/main/kotlin/com/r3corda/demos/DemoClock.kt b/src/main/kotlin/com/r3corda/demos/DemoClock.kt index 3021a4e7aa..d113d31056 100644 --- a/src/main/kotlin/com/r3corda/demos/DemoClock.kt +++ b/src/main/kotlin/com/r3corda/demos/DemoClock.kt @@ -1,5 +1,8 @@ package com.r3corda.demos +import com.r3corda.core.serialization.SerializeAsToken +import com.r3corda.core.serialization.SerializeAsTokenContext +import com.r3corda.core.serialization.SingletonSerializationToken import com.r3corda.node.utilities.MutableClock import java.time.* import javax.annotation.concurrent.ThreadSafe @@ -8,7 +11,11 @@ import javax.annotation.concurrent.ThreadSafe * A [Clock] that can have the date advanced for use in demos */ @ThreadSafe -class DemoClock(private var delegateClock: Clock = Clock.systemUTC()) : MutableClock() { +class DemoClock(private var delegateClock: Clock = Clock.systemUTC()) : MutableClock(), SerializeAsToken { + + private val token = SingletonSerializationToken(this) + + override fun toToken(context: SerializeAsTokenContext) = SingletonSerializationToken.registerWithContext(token, this, context) @Synchronized fun updateDate(date: LocalDate): Boolean { val currentDate = LocalDate.now(this) @@ -25,8 +32,9 @@ class DemoClock(private var delegateClock: Clock = Clock.systemUTC()) : MutableC return delegateClock.instant() } - @Synchronized override fun withZone(zone: ZoneId): Clock { - return DemoClock(delegateClock.withZone(zone)) + // Do not use this. Instead seek to use ZonedDateTime methods. + override fun withZone(zone: ZoneId): Clock { + throw UnsupportedOperationException("Tokenized clock does not support withZone()") } @Synchronized override fun getZone(): ZoneId {