From f3a5f8e659348b7308d5fef61adba95a8f4b1a3f Mon Sep 17 00:00:00 2001 From: Rick Parker Date: Thu, 16 Mar 2017 08:24:06 +0000 Subject: [PATCH] Pool Kryo instances for efficiency. (#352) Pooled Kryo --- .../net/corda/core/serialization/Kryo.kt | 77 +++++++-------- .../core/serialization/SerializationToken.kt | 16 +-- .../core/transactions/MerkleTransaction.kt | 7 +- .../core/transactions/WireTransaction.kt | 6 +- .../corda/core/utilities/ProgressTracker.kt | 2 +- .../core/crypto/PartialMerkleTreeTest.kt | 25 +++-- .../core/node/AttachmentClassLoaderTests.kt | 25 +++-- .../net/corda/core/serialization/KryoTests.kt | 15 ++- .../serialization/SerializationTokenTest.kt | 17 ++-- .../core/utilities/ProgressTrackerTest.kt | 4 +- .../node/serialization/DefaultWhitelist.kt | 4 +- .../services/messaging/CordaRPCClientImpl.kt | 55 ++++++----- .../node/services/messaging/RPCDispatcher.kt | 33 ++++--- .../node/services/messaging/RPCStructures.kt | 98 ++++++++++++------- .../persistence/DBCheckpointStorage.kt | 4 +- .../statemachine/StateMachineManager.kt | 37 ++++--- .../node/services/vault/NodeVaultService.kt | 13 +-- .../net/corda/node/utilities/JDBCHashMap.kt | 4 +- .../database/RequeryConfigurationTest.kt | 4 +- .../kotlin/net/corda/irs/testing/IRSTests.kt | 13 ++- 20 files changed, 259 insertions(+), 200 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt b/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt index cb6d20523c..a93e5e5140 100644 --- a/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt +++ b/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt @@ -3,9 +3,11 @@ package net.corda.core.serialization import com.esotericsoftware.kryo.* import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output +import com.esotericsoftware.kryo.pool.KryoPool import com.esotericsoftware.kryo.serializers.JavaSerializer import com.esotericsoftware.kryo.serializers.MapSerializer import com.esotericsoftware.kryo.util.MapReferenceResolver +import com.google.common.annotations.VisibleForTesting import net.corda.core.contracts.* import net.corda.core.crypto.* import net.corda.core.node.AttachmentsClassLoader @@ -60,12 +62,9 @@ import kotlin.reflect.jvm.javaType */ // A convenient instance of Kryo pre-configured with some useful things. Used as a default by various functions. -private val THREAD_LOCAL_KRYO: ThreadLocal = ThreadLocal.withInitial { createKryo() } +fun p2PKryo(): KryoPool = kryoPool // Same again, but this has whitelisting turned off for internal storage use only. -private val INTERNAL_THREAD_LOCAL_KRYO: ThreadLocal = ThreadLocal.withInitial { createInternalKryo() } - -fun threadLocalP2PKryo(): Kryo = THREAD_LOCAL_KRYO.get() -fun threadLocalStorageKryo(): Kryo = INTERNAL_THREAD_LOCAL_KRYO.get() +fun storageKryo(): KryoPool = internalKryoPool /** * A type safe wrapper around a byte array that contains a serialised object. You can call [SerializedBytes.deserialize] @@ -82,26 +81,34 @@ class SerializedBytes(bytes: ByteArray, val internalOnly: Boolean = fal private val KryoHeaderV0_1: OpaqueBytes = OpaqueBytes("corda\u0000\u0000\u0001".toByteArray()) // Some extension functions that make deserialisation convenient and provide auto-casting of the result. -fun ByteArray.deserialize(kryo: Kryo = threadLocalP2PKryo()): T { +fun ByteArray.deserialize(kryo: KryoPool = p2PKryo()): T { Input(this).use { val header = OpaqueBytes(it.readBytes(8)) if (header != KryoHeaderV0_1) { throw KryoException("Serialized bytes header does not match any known format.") } @Suppress("UNCHECKED_CAST") - return kryo.readClassAndObject(it) as T + return kryo.run { k -> k.readClassAndObject(it) as T } } } -fun OpaqueBytes.deserialize(kryo: Kryo = threadLocalP2PKryo()): T { +// TODO: The preferred usage is with a pool. Try and eliminate use of this from RPC. +fun ByteArray.deserialize(kryo: Kryo): T = deserialize(kryo.asPool()) + +fun OpaqueBytes.deserialize(kryo: KryoPool = p2PKryo()): T { return this.bytes.deserialize(kryo) } // The more specific deserialize version results in the bytes being cached, which is faster. @JvmName("SerializedBytesWireTransaction") -fun SerializedBytes.deserialize(kryo: Kryo = threadLocalP2PKryo()): WireTransaction = WireTransaction.deserialize(this, kryo) +fun SerializedBytes.deserialize(kryo: KryoPool = p2PKryo()): WireTransaction = WireTransaction.deserialize(this, kryo) -fun SerializedBytes.deserialize(kryo: Kryo = if (internalOnly) threadLocalStorageKryo() else threadLocalP2PKryo()): T = bytes.deserialize(kryo) +fun SerializedBytes.deserialize(kryo: KryoPool = if (internalOnly) storageKryo() else p2PKryo()): T = bytes.deserialize(kryo) + +fun SerializedBytes.deserialize(kryo: Kryo): T = bytes.deserialize(kryo.asPool()) + +// Internal adapter for use when we haven't yet converted to a pool, or for tests. +private fun Kryo.asPool(): KryoPool = (KryoPool.Builder { this }.build()) /** * A serialiser that avoids writing the wrapper class to the byte stream, thus ensuring [SerializedBytes] is a pure @@ -122,7 +129,11 @@ object SerializedBytesSerializer : Serializer>() { * Can be called on any object to convert it to a byte array (wrapped by [SerializedBytes]), regardless of whether * the type is marked as serializable or was designed for it (so be careful!). */ -fun T.serialize(kryo: Kryo = threadLocalP2PKryo(), internalOnly: Boolean = false): SerializedBytes { +fun T.serialize(kryo: KryoPool = p2PKryo(), internalOnly: Boolean = false): SerializedBytes { + return kryo.run { k -> serialize(k, internalOnly) } +} + +fun T.serialize(kryo: Kryo, internalOnly: Boolean = false): SerializedBytes { val stream = ByteArrayOutputStream() Output(stream).use { it.writeBytes(KryoHeaderV0_1.bytes) @@ -399,14 +410,12 @@ object KotlinObjectSerializer : Serializer() { } // No ClassResolver only constructor. MapReferenceResolver is the default as used by Kryo in other constructors. -fun createInternalKryo(k: Kryo = CordaKryo(makeNoWhitelistClassResolver())): Kryo { - return DefaultKryoCustomizer.customize(k) -} +private val internalKryoPool = KryoPool.Builder { DefaultKryoCustomizer.customize(CordaKryo(makeNoWhitelistClassResolver())) }.build() +private val kryoPool = KryoPool.Builder { DefaultKryoCustomizer.customize(CordaKryo(makeStandardClassResolver())) }.build() // No ClassResolver only constructor. MapReferenceResolver is the default as used by Kryo in other constructors. -fun createKryo(k: Kryo = CordaKryo(makeStandardClassResolver())): Kryo { - return DefaultKryoCustomizer.customize(k) -} +@VisibleForTesting +fun createTestKryo(): Kryo = DefaultKryoCustomizer.customize(CordaKryo(makeNoWhitelistClassResolver())) /** * We need to disable whitelist checking during calls from our Kryo code to register a serializer, since it checks @@ -475,21 +484,20 @@ inline fun Kryo.noReferencesWithin() { class NoReferencesSerializer(val baseSerializer: Serializer) : Serializer() { override fun read(kryo: Kryo, input: Input, type: Class): T { - val previousValue = kryo.setReferences(false) - try { - return baseSerializer.read(kryo, input, type) - } finally { - kryo.references = previousValue - } + return kryo.withoutReferences { baseSerializer.read(kryo, input, type) } } override fun write(kryo: Kryo, output: Output, obj: T) { - val previousValue = kryo.setReferences(false) - try { - baseSerializer.write(kryo, output, obj) - } finally { - kryo.references = previousValue - } + kryo.withoutReferences { baseSerializer.write(kryo, output, obj) } + } +} + +fun Kryo.withoutReferences(block: () -> T): T { + val previousValue = setReferences(false) + try { + return block() + } finally { + references = previousValue } } @@ -524,17 +532,6 @@ var Kryo.attachmentStorage: AttachmentStorage? this.context.put(ATTACHMENT_STORAGE, value) } - -//TODO: It's a little workaround for serialization of HashMaps inside contract states. -//Used in Merkle tree calculation. It doesn't cover all the cases of unstable serialization format. -fun extendKryoHash(kryo: Kryo): Kryo { - return kryo.apply { - references = false - register(LinkedHashMap::class.java, MapSerializer()) - register(HashMap::class.java, OrderedSerializer) - } -} - object OrderedSerializer : Serializer>() { override fun write(kryo: Kryo, output: Output, obj: HashMap) { //Change a HashMap to LinkedHashMap. diff --git a/core/src/main/kotlin/net/corda/core/serialization/SerializationToken.kt b/core/src/main/kotlin/net/corda/core/serialization/SerializationToken.kt index e2e0a16b4d..798db529d9 100644 --- a/core/src/main/kotlin/net/corda/core/serialization/SerializationToken.kt +++ b/core/src/main/kotlin/net/corda/core/serialization/SerializationToken.kt @@ -5,7 +5,7 @@ import com.esotericsoftware.kryo.KryoException import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output -import java.util.* +import com.esotericsoftware.kryo.pool.KryoPool /** * The interfaces and classes in this file allow large, singleton style classes to @@ -36,8 +36,6 @@ interface SerializationToken { /** * A Kryo serializer for [SerializeAsToken] implementations. - * - * This is registered in [createKryo]. */ class SerializeAsTokenSerializer : Serializer() { override fun write(kryo: Kryo, output: Output, obj: T) { @@ -76,8 +74,8 @@ class SerializeAsTokenSerializer : Serializer() { * Then it is a case of using the companion object methods on [SerializeAsTokenSerializer] to set and clear context as necessary * on the Kryo instance when serializing to enable/disable tokenization. */ -class SerializeAsTokenContext(toBeTokenized: Any, kryo: Kryo = createKryo()) { - internal val tokenToTokenized = HashMap() +class SerializeAsTokenContext(toBeTokenized: Any, kryoPool: KryoPool) { + internal val tokenToTokenized = mutableMapOf() internal var readOnly = false init { @@ -90,9 +88,11 @@ class SerializeAsTokenContext(toBeTokenized: Any, kryo: Kryo = createKryo()) { * accidental registrations from occuring as these could not be deserialized in a deserialization-first * scenario if they are not part of this iniital context construction serialization. */ - SerializeAsTokenSerializer.setContext(kryo, this) - toBeTokenized.serialize(kryo) - SerializeAsTokenSerializer.clearContext(kryo) + kryoPool.run { kryo -> + SerializeAsTokenSerializer.setContext(kryo, this) + toBeTokenized.serialize(kryo) + SerializeAsTokenSerializer.clearContext(kryo) + } readOnly = true } } diff --git a/core/src/main/kotlin/net/corda/core/transactions/MerkleTransaction.kt b/core/src/main/kotlin/net/corda/core/transactions/MerkleTransaction.kt index 96609692c0..dd080f5b1b 100644 --- a/core/src/main/kotlin/net/corda/core/transactions/MerkleTransaction.kt +++ b/core/src/main/kotlin/net/corda/core/transactions/MerkleTransaction.kt @@ -3,13 +3,12 @@ package net.corda.core.transactions import net.corda.core.contracts.* import net.corda.core.crypto.* import net.corda.core.serialization.CordaSerializable -import net.corda.core.serialization.createKryo -import net.corda.core.serialization.extendKryoHash +import net.corda.core.serialization.p2PKryo import net.corda.core.serialization.serialize +import net.corda.core.serialization.withoutReferences fun serializedHash(x: T): SecureHash { - val kryo = extendKryoHash(createKryo()) // Dealing with HashMaps inside states. - return x.serialize(kryo).hash + return p2PKryo().run { kryo -> kryo.withoutReferences { x.serialize(kryo).hash } } } /** diff --git a/core/src/main/kotlin/net/corda/core/transactions/WireTransaction.kt b/core/src/main/kotlin/net/corda/core/transactions/WireTransaction.kt index 244d06b192..77e0c5cd28 100644 --- a/core/src/main/kotlin/net/corda/core/transactions/WireTransaction.kt +++ b/core/src/main/kotlin/net/corda/core/transactions/WireTransaction.kt @@ -1,6 +1,6 @@ package net.corda.core.transactions -import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.pool.KryoPool import net.corda.core.contracts.* import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.MerkleTree @@ -10,8 +10,8 @@ import net.corda.core.indexOfOrThrow import net.corda.core.node.ServicesForResolution import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.deserialize +import net.corda.core.serialization.p2PKryo import net.corda.core.serialization.serialize -import net.corda.core.serialization.threadLocalP2PKryo import net.corda.core.utilities.Emoji import java.security.PublicKey @@ -45,7 +45,7 @@ class WireTransaction( override val id: SecureHash by lazy { merkleTree.hash } companion object { - fun deserialize(data: SerializedBytes, kryo: Kryo = threadLocalP2PKryo()): WireTransaction { + fun deserialize(data: SerializedBytes, kryo: KryoPool = p2PKryo()): WireTransaction { val wtx = data.bytes.deserialize(kryo) wtx.cachedBytes = data return wtx diff --git a/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt b/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt index 924042f586..cbdb8704bc 100644 --- a/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt +++ b/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt @@ -88,7 +88,7 @@ class ProgressTracker(vararg steps: Step) { @CordaSerializable private data class Child(val tracker: ProgressTracker, @Transient val subscription: Subscription?) - private val childProgressTrackers = HashMap() + private val childProgressTrackers = mutableMapOf() init { steps.forEach { diff --git a/core/src/test/kotlin/net/corda/core/crypto/PartialMerkleTreeTest.kt b/core/src/test/kotlin/net/corda/core/crypto/PartialMerkleTreeTest.kt index fa68ffb1d6..f6dc90b2dc 100644 --- a/core/src/test/kotlin/net/corda/core/crypto/PartialMerkleTreeTest.kt +++ b/core/src/test/kotlin/net/corda/core/crypto/PartialMerkleTreeTest.kt @@ -1,18 +1,20 @@ package net.corda.core.crypto -import com.esotericsoftware.kryo.serializers.MapSerializer +import com.esotericsoftware.kryo.KryoException import net.corda.contracts.asset.Cash import net.corda.core.contracts.* import net.corda.core.crypto.SecureHash.Companion.zeroHash -import net.corda.core.serialization.* -import net.corda.core.transactions.* -import net.corda.core.utilities.* +import net.corda.core.serialization.p2PKryo +import net.corda.core.serialization.serialize +import net.corda.core.transactions.WireTransaction +import net.corda.core.utilities.DUMMY_NOTARY +import net.corda.core.utilities.DUMMY_PUBKEY_1 +import net.corda.core.utilities.TEST_TX_TIME import net.corda.testing.MEGA_CORP import net.corda.testing.MEGA_CORP_PUBKEY import net.corda.testing.ledger import org.junit.Test -import java.util.* import kotlin.test.* class PartialMerkleTreeTest { @@ -208,15 +210,12 @@ class PartialMerkleTreeTest { assertFalse(pmt.verify(wrongRoot, inclHashes)) } - @Test - fun `hash map serialization`() { + @Test(expected = KryoException::class) + fun `hash map serialization not allowed`() { val hm1 = hashMapOf("a" to 1, "b" to 2, "c" to 3, "e" to 4) - assert(serializedHash(hm1) == serializedHash(hm1.serialize().deserialize())) // It internally uses the ordered HashMap extension. - val kryo = extendKryoHash(createKryo()) - assertTrue(kryo.getSerializer(HashMap::class.java) is OrderedSerializer) - assertTrue(kryo.getSerializer(LinkedHashMap::class.java) is MapSerializer) - val hm2 = hm1.serialize(kryo).deserialize(kryo) - assert(hm1.hashCode() == hm2.hashCode()) + p2PKryo().run { kryo -> + hm1.serialize(kryo) + } } private fun makeSimpleCashWtx(notary: Party, timestamp: Timestamp? = null, attachments: List = emptyList()): WireTransaction { diff --git a/core/src/test/kotlin/net/corda/core/node/AttachmentClassLoaderTests.kt b/core/src/test/kotlin/net/corda/core/node/AttachmentClassLoaderTests.kt index f37a1680c1..8583abb27a 100644 --- a/core/src/test/kotlin/net/corda/core/node/AttachmentClassLoaderTests.kt +++ b/core/src/test/kotlin/net/corda/core/node/AttachmentClassLoaderTests.kt @@ -1,5 +1,6 @@ package net.corda.core.node +import com.esotericsoftware.kryo.Kryo import net.corda.core.contracts.* import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.Party @@ -11,7 +12,9 @@ import net.corda.core.utilities.DUMMY_NOTARY import net.corda.testing.MEGA_CORP import net.corda.testing.node.MockAttachmentStorage import org.apache.commons.io.IOUtils +import org.junit.After import org.junit.Assert +import org.junit.Before import org.junit.Test import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream @@ -75,6 +78,21 @@ class AttachmentClassLoaderTests { class ClassLoaderForTests : URLClassLoader(arrayOf(ISOLATED_CONTRACTS_JAR_PATH), FilteringClassLoader) + lateinit var kryo: Kryo + lateinit var kryo2: Kryo + + @Before + fun setup() { + kryo = p2PKryo().borrow() + kryo2 = p2PKryo().borrow() + } + + @After + fun teardown() { + p2PKryo().release(kryo) + p2PKryo().release(kryo2) + } + @Test fun `dynamically load AnotherDummyContract from isolated contracts jar`() { val child = ClassLoaderForTests() @@ -205,7 +223,6 @@ class AttachmentClassLoaderTests { val cl = AttachmentsClassLoader(arrayOf(att0, att1, att2).map { storage.openAttachment(it)!! }, FilteringClassLoader) - val kryo = createKryo() kryo.classLoader = cl kryo.addToWhitelist(contract.javaClass) @@ -224,7 +241,6 @@ class AttachmentClassLoaderTests { assertNotNull(data.contract) - val kryo2 = createKryo() kryo2.addToWhitelist(data.contract.javaClass) val bytes = data.serialize(kryo2) @@ -236,7 +252,6 @@ class AttachmentClassLoaderTests { val cl = AttachmentsClassLoader(arrayOf(att0, att1, att2).map { storage.openAttachment(it)!! }, FilteringClassLoader) - val kryo = createKryo() kryo.classLoader = cl kryo.addToWhitelist(Class.forName("net.corda.contracts.isolated.AnotherDummyContract", true, cl)) @@ -263,7 +278,6 @@ class AttachmentClassLoaderTests { val contract = contractClass.newInstance() as DummyContractBackdoor val tx = contract.generateInitial(MEGA_CORP.ref(0), 42, DUMMY_NOTARY) val storage = MockAttachmentStorage() - val kryo = createKryo() kryo.addToWhitelist(contract.javaClass) kryo.addToWhitelist(Class.forName("net.corda.contracts.isolated.AnotherDummyContract\$State", true, child)) kryo.addToWhitelist(Class.forName("net.corda.contracts.isolated.AnotherDummyContract\$Commands\$Create", true, child)) @@ -279,7 +293,6 @@ class AttachmentClassLoaderTests { val bytes = wireTransaction.serialize(kryo) - val kryo2 = createKryo() // use empty attachmentStorage kryo2.attachmentStorage = storage @@ -297,7 +310,6 @@ class AttachmentClassLoaderTests { val contract = contractClass.newInstance() as DummyContractBackdoor val tx = contract.generateInitial(MEGA_CORP.ref(0), 42, DUMMY_NOTARY) val storage = MockAttachmentStorage() - val kryo = createKryo() // todo - think about better way to push attachmentStorage down to serializer kryo.attachmentStorage = storage @@ -310,7 +322,6 @@ class AttachmentClassLoaderTests { val bytes = wireTransaction.serialize(kryo) - val kryo2 = createKryo() // use empty attachmentStorage kryo2.attachmentStorage = MockAttachmentStorage() diff --git a/core/src/test/kotlin/net/corda/core/serialization/KryoTests.kt b/core/src/test/kotlin/net/corda/core/serialization/KryoTests.kt index ac4e8ebf6a..4b25645434 100644 --- a/core/src/test/kotlin/net/corda/core/serialization/KryoTests.kt +++ b/core/src/test/kotlin/net/corda/core/serialization/KryoTests.kt @@ -1,5 +1,6 @@ package net.corda.core.serialization +import com.esotericsoftware.kryo.Kryo import com.google.common.primitives.Ints import net.corda.core.crypto.* import net.corda.core.messaging.Ack @@ -7,6 +8,8 @@ import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.bouncycastle.jce.provider.BouncyCastleProvider import org.bouncycastle.pqc.jcajce.provider.BouncyCastlePQCProvider +import org.junit.After +import org.junit.Before import org.junit.Test import java.io.InputStream import java.security.Security @@ -16,7 +19,17 @@ import kotlin.test.assertEquals class KryoTests { - private val kryo = createKryo() + private lateinit var kryo: Kryo + + @Before + fun setup() { + kryo = p2PKryo().borrow() + } + + @After + fun teardown() { + p2PKryo().release(kryo) + } @Test fun ok() { diff --git a/core/src/test/kotlin/net/corda/core/serialization/SerializationTokenTest.kt b/core/src/test/kotlin/net/corda/core/serialization/SerializationTokenTest.kt index 23ab02a725..7b16b654a1 100644 --- a/core/src/test/kotlin/net/corda/core/serialization/SerializationTokenTest.kt +++ b/core/src/test/kotlin/net/corda/core/serialization/SerializationTokenTest.kt @@ -15,12 +15,13 @@ class SerializationTokenTest { @Before fun setup() { - kryo = threadLocalStorageKryo() + kryo = storageKryo().borrow() } @After fun cleanup() { SerializeAsTokenSerializer.clearContext(kryo) + storageKryo().release(kryo) } // Large tokenizable object so we can tell from the smaller number of serialized bytes it was actually tokenized @@ -38,7 +39,7 @@ class SerializationTokenTest { @Test fun `write token and read tokenizable`() { val tokenizableBefore = LargeTokenizable() - val context = SerializeAsTokenContext(tokenizableBefore, kryo) + val context = SerializeAsTokenContext(tokenizableBefore, storageKryo()) SerializeAsTokenSerializer.setContext(kryo, context) val serializedBytes = tokenizableBefore.serialize(kryo) assertThat(serializedBytes.size).isLessThan(tokenizableBefore.numBytes) @@ -51,7 +52,7 @@ class SerializationTokenTest { @Test fun `write and read singleton`() { val tokenizableBefore = UnitSerializeAsToken() - val context = SerializeAsTokenContext(tokenizableBefore, kryo) + val context = SerializeAsTokenContext(tokenizableBefore, storageKryo()) SerializeAsTokenSerializer.setContext(kryo, context) val serializedBytes = tokenizableBefore.serialize(kryo) val tokenizableAfter = serializedBytes.deserialize(kryo) @@ -61,7 +62,7 @@ class SerializationTokenTest { @Test(expected = UnsupportedOperationException::class) fun `new token encountered after context init`() { val tokenizableBefore = UnitSerializeAsToken() - val context = SerializeAsTokenContext(emptyList(), kryo) + val context = SerializeAsTokenContext(emptyList(), storageKryo()) SerializeAsTokenSerializer.setContext(kryo, context) tokenizableBefore.serialize(kryo) } @@ -69,9 +70,9 @@ class SerializationTokenTest { @Test(expected = UnsupportedOperationException::class) fun `deserialize unregistered token`() { val tokenizableBefore = UnitSerializeAsToken() - val context = SerializeAsTokenContext(emptyList(), kryo) + val context = SerializeAsTokenContext(emptyList(), storageKryo()) SerializeAsTokenSerializer.setContext(kryo, context) - val serializedBytes = tokenizableBefore.toToken(SerializeAsTokenContext(emptyList(), kryo)).serialize(kryo) + val serializedBytes = tokenizableBefore.toToken(SerializeAsTokenContext(emptyList(), storageKryo())).serialize(kryo) serializedBytes.deserialize(kryo) } @@ -84,7 +85,7 @@ class SerializationTokenTest { @Test(expected = KryoException::class) fun `deserialize non-token`() { val tokenizableBefore = UnitSerializeAsToken() - val context = SerializeAsTokenContext(tokenizableBefore, kryo) + val context = SerializeAsTokenContext(tokenizableBefore, storageKryo()) SerializeAsTokenSerializer.setContext(kryo, context) val stream = ByteArrayOutputStream() Output(stream).use { @@ -106,7 +107,7 @@ class SerializationTokenTest { @Test(expected = KryoException::class) fun `token returns unexpected type`() { val tokenizableBefore = WrongTypeSerializeAsToken() - val context = SerializeAsTokenContext(tokenizableBefore, kryo) + val context = SerializeAsTokenContext(tokenizableBefore, storageKryo()) SerializeAsTokenSerializer.setContext(kryo, context) val serializedBytes = tokenizableBefore.serialize(kryo) serializedBytes.deserialize(kryo) diff --git a/core/src/test/kotlin/net/corda/core/utilities/ProgressTrackerTest.kt b/core/src/test/kotlin/net/corda/core/utilities/ProgressTrackerTest.kt index 01fc410c0d..8b56b4ad13 100644 --- a/core/src/test/kotlin/net/corda/core/utilities/ProgressTrackerTest.kt +++ b/core/src/test/kotlin/net/corda/core/utilities/ProgressTrackerTest.kt @@ -4,7 +4,7 @@ import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.KryoSerializable import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output -import net.corda.core.serialization.createInternalKryo +import net.corda.core.serialization.createTestKryo import net.corda.core.serialization.serialize import org.junit.Before import org.junit.Test @@ -106,7 +106,7 @@ class ProgressTrackerTest { } } - val kryo = createInternalKryo().apply { + val kryo = createTestKryo().apply { // This is required to make sure Kryo walks through the auto-generated members for the lambda below. fieldSerializerConfig.isIgnoreSyntheticFields = false } diff --git a/node/src/main/kotlin/net/corda/node/serialization/DefaultWhitelist.kt b/node/src/main/kotlin/net/corda/node/serialization/DefaultWhitelist.kt index f94e7378c0..fd4e1807b5 100644 --- a/node/src/main/kotlin/net/corda/node/serialization/DefaultWhitelist.kt +++ b/node/src/main/kotlin/net/corda/node/serialization/DefaultWhitelist.kt @@ -11,6 +11,9 @@ import java.time.LocalDate import java.time.Period import java.util.* +/** + * NOTE: We do not whitelist [HashMap] or [HashSet] since they are unstable under serialization. + */ class DefaultWhitelist : CordaPluginRegistry() { override fun customizeSerialization(custom: SerializationCustomization): Boolean { custom.apply { @@ -41,7 +44,6 @@ class DefaultWhitelist : CordaPluginRegistry() { addToWhitelist(java.time.Instant::class.java) addToWhitelist(java.time.LocalDate::class.java) addToWhitelist(java.util.Collections.singletonMap("A", "B").javaClass) - addToWhitelist(java.util.HashMap::class.java) addToWhitelist(java.util.LinkedHashMap::class.java) addToWhitelist(BigDecimal::class.java) addToWhitelist(LocalDate::class.java) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClientImpl.kt b/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClientImpl.kt index 9afacf8ebd..5307f14861 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClientImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClientImpl.kt @@ -113,18 +113,20 @@ class CordaRPCClientImpl(private val session: ClientSession, @GuardedBy("sessionLock") private val addressToQueuedObservables = CacheBuilder.newBuilder().weakValues().build() // This is used to hold a reference counted hard reference when we know there are subscribers. - private val hardReferencesToQueuedObservables = mutableSetOf() + private val hardReferencesToQueuedObservables = Collections.synchronizedSet(mutableSetOf()) private var producer: ClientProducer? = null - private inner class ObservableDeserializer(private val qName: String, - private val rpcName: String, - private val rpcLocation: Throwable) : Serializer>() { + class ObservableDeserializer() : Serializer>() { override fun read(kryo: Kryo, input: Input, type: Class>): Observable { + val qName = kryo.context[RPCKryoQNameKey] as String + val rpcName = kryo.context[RPCKryoMethodNameKey] as String + val rpcLocation = kryo.context[RPCKryoLocationKey] as Throwable + val rpcClient = kryo.context[RPCKryoClientKey] as CordaRPCClientImpl val handle = input.readInt(true) - val ob = sessionLock.withLock { - addressToQueuedObservables.getIfPresent(qName) ?: QueuedObservable(qName, rpcName, rpcLocation, this).apply { - addressToQueuedObservables.put(qName, this) + val ob = rpcClient.sessionLock.withLock { + rpcClient.addressToQueuedObservables.getIfPresent(qName) ?: rpcClient.QueuedObservable(qName, rpcName, rpcLocation).apply { + rpcClient.addressToQueuedObservables.put(qName, this) } } val result = ob.getForHandle(handle) @@ -182,9 +184,17 @@ class CordaRPCClientImpl(private val session: ClientSession, checkMethodVersion(method) - // sendRequest may return a reconfigured Kryo if the method returns observables. - val kryo: Kryo = sendRequest(args, location, method) ?: createRPCKryo() - val next: ErrorOr<*> = receiveResponse(kryo, method, timeout) + val msg: ClientMessage = createMessage(method) + // We could of course also check the return type of the method to see if it's Observable, but I'd + // rather haved the annotation be used consistently. + val returnsObservables = method.isAnnotationPresent(RPCReturnsObservables::class.java) + val kryo = if (returnsObservables) maybePrepareForObservables(location, method, msg) else createRPCKryoForDeserialization(this@CordaRPCClientImpl) + val next: ErrorOr<*> = try { + sendRequest(args, msg) + receiveResponse(kryo, method, timeout) + } finally { + releaseRPCKryoForDeserialization(kryo) + } rpcLog.debug { "<- RPC <- ${method.name} = $next" } return unwrapOrThrow(next) } @@ -215,22 +225,18 @@ class CordaRPCClientImpl(private val session: ClientSession, return next } - private fun sendRequest(args: Array?, location: Throwable, method: Method): Kryo? { - // We could of course also check the return type of the method to see if it's Observable, but I'd - // rather haved the annotation be used consistently. - val returnsObservables = method.isAnnotationPresent(RPCReturnsObservables::class.java) - + private fun sendRequest(args: Array?, msg: ClientMessage) { sessionLock.withLock { - val msg: ClientMessage = createMessage(method) - val kryo = if (returnsObservables) maybePrepareForObservables(location, method, msg) else null + val argsKryo = createRPCKryoForDeserialization(this@CordaRPCClientImpl) val serializedArgs = try { - (args ?: emptyArray()).serialize(createRPCKryo()) + (args ?: emptyArray()).serialize(argsKryo) } catch (e: KryoException) { throw RPCException("Could not serialize RPC arguments", e) + } finally { + releaseRPCKryoForDeserialization(argsKryo) } msg.writeBodyBufferBytes(serializedArgs.bytes) producer!!.send(ArtemisMessagingComponent.RPC_REQUESTS_QUEUE, msg) - return kryo } } @@ -242,7 +248,7 @@ class CordaRPCClientImpl(private val session: ClientSession, msg.putLongProperty(ClientRPCRequestMessage.OBSERVATIONS_TO, observationsId) // And make sure that we deserialise observable handles so that they're linked to the right // queue. Also record a bit of metadata for debugging purposes. - return createRPCKryo(observableSerializer = ObservableDeserializer(observationsQueueName, method.name, location)) + return createRPCKryoForDeserialization(this@CordaRPCClientImpl, observationsQueueName, method.name, location) } private fun createMessage(method: Method): ClientMessage { @@ -278,8 +284,7 @@ class CordaRPCClientImpl(private val session: ClientSession, @ThreadSafe private inner class QueuedObservable(private val qName: String, private val rpcName: String, - private val rpcLocation: Throwable, - private val observableDeserializer: ObservableDeserializer) { + private val rpcLocation: Throwable) { private val root = PublishSubject.create() private val rootShared = root.doOnUnsubscribe { close() }.share() @@ -345,8 +350,10 @@ class CordaRPCClientImpl(private val session: ClientSession, private fun deliver(msg: ClientMessage) { msg.acknowledge() - val kryo = createRPCKryo(observableSerializer = observableDeserializer) - val received: MarshalledObservation = msg.deserialize(kryo) + val kryo = createRPCKryoForDeserialization(this@CordaRPCClientImpl, qName, rpcName, rpcLocation) + val received: MarshalledObservation = try { msg.deserialize(kryo) } finally { + releaseRPCKryoForDeserialization(kryo) + } rpcLog.debug { "<- Observable [$rpcName] <- Received $received" } synchronized(observables) { // Force creation of the buffer if it doesn't already exist. diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCDispatcher.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCDispatcher.kt index daf96cb592..964cbb53e6 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCDispatcher.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCDispatcher.kt @@ -42,6 +42,8 @@ abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService, v private val queueToSubscription = HashMultimap.create() + private val handleCounter = AtomicInteger() + // Created afresh for every RPC that is annotated as returning observables. Every time an observable is // encountered either in the RPC response or in an object graph that is being emitted by one of those // observables, the handle counter is incremented and the server-side observable is subscribed to. The @@ -49,41 +51,48 @@ abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService, v // // When the observables are deserialised on the client side, the handle is read from the byte stream and // the queue is filtered to extract just those observations. - private inner class ObservableSerializer(private val toQName: String) : Serializer>() { - private val handleCounter = AtomicInteger() + class ObservableSerializer() : Serializer>() { + private fun toQName(kryo: Kryo): String = kryo.context[RPCKryoQNameKey] as String + private fun toDispatcher(kryo: Kryo): RPCDispatcher = kryo.context[RPCKryoDispatcherKey] as RPCDispatcher override fun read(kryo: Kryo, input: Input, type: Class>): Observable { throw UnsupportedOperationException("not implemented") } override fun write(kryo: Kryo, output: Output, obj: Observable) { - val handle = handleCounter.andIncrement + val qName = toQName(kryo) + val dispatcher = toDispatcher(kryo) + val handle = dispatcher.handleCounter.andIncrement output.writeInt(handle, true) // Observables can do three kinds of callback: "next" with a content object, "completed" and "error". // Materializing the observable converts these three kinds of callback into a single stream of objects // representing what happened, which is useful for us to send over the wire. val subscription = obj.materialize().subscribe { materialised: Notification -> - val newKryo = createRPCKryo(observableSerializer = this@ObservableSerializer) - val bits = MarshalledObservation(handle, materialised).serialize(newKryo) + val newKryo = createRPCKryoForSerialization(qName, dispatcher) + val bits = try { MarshalledObservation(handle, materialised).serialize(newKryo) } finally { + releaseRPCKryoForSerialization(newKryo) + } rpcLog.debug("RPC sending observation: $materialised") - send(bits, toQName) + dispatcher.send(bits, qName) } - synchronized(queueToSubscription) { - queueToSubscription.put(toQName, subscription) + synchronized(dispatcher.queueToSubscription) { + dispatcher.queueToSubscription.put(qName, subscription) } } } fun dispatch(msg: ClientRPCRequestMessage) { val (argsBytes, replyTo, observationsTo, methodName) = msg - val kryo = createRPCKryo(observableSerializer = if (observationsTo != null) ObservableSerializer(observationsTo) else null) val response: ErrorOr = ErrorOr.catch { val method = methodTable[methodName] ?: throw RPCException("Received RPC for unknown method $methodName - possible client/server version skew?") if (method.isAnnotationPresent(RPCReturnsObservables::class.java) && observationsTo == null) throw RPCException("Received RPC without any destination for observations, but the RPC returns observables") - val args = argsBytes.deserialize(kryo) + val kryo = createRPCKryoForSerialization(observationsTo, this) + val args = try { argsBytes.deserialize(kryo) } finally { + releaseRPCKryoForSerialization(kryo) + } rpcLog.debug { "-> RPC -> $methodName(${args.joinToString()}) [reply to $replyTo]" } @@ -95,13 +104,15 @@ abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService, v } rpcLog.debug { "<- RPC <- $methodName = $response " } - // Serialise, or send back a simple serialised ErrorOr structure if we couldn't do it. + val kryo = createRPCKryoForSerialization(observationsTo, this) val responseBits = try { response.serialize(kryo) } catch (e: KryoException) { rpcLog.error("Failed to respond to inbound RPC $methodName", e) ErrorOr.of(e).serialize(kryo) + } finally { + releaseRPCKryoForSerialization(kryo) } send(responseBits, replyTo) } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt index 828e934259..5662b8d295 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt @@ -7,6 +7,7 @@ import com.esotericsoftware.kryo.Registration import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output +import com.esotericsoftware.kryo.pool.KryoPool import com.google.common.util.concurrent.ListenableFuture import net.corda.core.flows.FlowException import net.corda.core.serialization.* @@ -88,10 +89,16 @@ object ClassSerializer : Serializer>() { @CordaSerializable class PermissionException(msg: String) : RuntimeException(msg) +object RPCKryoClientKey +object RPCKryoDispatcherKey +object RPCKryoQNameKey +object RPCKryoMethodNameKey +object RPCKryoLocationKey + // The Kryo used for the RPC wire protocol. Every type in the wire protocol is listed here explicitly. // This is annoying to write out, but will make it easier to formalise the wire protocol when the time comes, // because we can see everything we're using in one place. -private class RPCKryo(observableSerializer: Serializer>? = null) : CordaKryo(makeStandardClassResolver()) { +private class RPCKryo(observableSerializer: Serializer>) : CordaKryo(makeStandardClassResolver()) { init { DefaultKryoCustomizer.customize(this) @@ -99,49 +106,68 @@ private class RPCKryo(observableSerializer: Serializer>? = null) register(Class::class.java, ClassSerializer) register(MultipartStream.ItemInputStream::class.java, InputStreamSerializer) register(MarshalledObservation::class.java, ImmutableClassSerializer(MarshalledObservation::class)) - } - - // TODO: workaround to prevent Observable registration conflict when using plugin registered kyro classes - private val observableRegistration: Registration? = observableSerializer?.let { register(Observable::class.java, it, 10000) } - - private val listenableFutureRegistration: Registration? = observableSerializer?.let { - // Register ListenableFuture by making use of Observable serialisation. - // TODO Serialisation could be made more efficient as a future can only emit one value (or exception) + register(Observable::class.java, observableSerializer) @Suppress("UNCHECKED_CAST") register(ListenableFuture::class, - read = { kryo, input -> it.read(kryo, input, Observable::class.java as Class>).toFuture() }, - write = { kryo, output, obj -> it.write(kryo, output, obj.toObservable()) } + read = { kryo, input -> observableSerializer.read(kryo, input, Observable::class.java as Class>).toFuture() }, + write = { kryo, output, obj -> observableSerializer.write(kryo, output, obj.toObservable()) } + ) + register( + FlowException::class, + read = { kryo, input -> + val message = input.readString() + val cause = kryo.readObjectOrNull(input, Throwable::class.java) + FlowException(message, cause) + }, + write = { kryo, output, obj -> + // The subclass may have overridden toString so we use that + val message = if (obj.javaClass != FlowException::class.java) obj.toString() else obj.message + output.writeString(message) + kryo.writeObjectOrNull(output, obj.cause, Throwable::class.java) + } ) } - // Avoid having to worry about the subtypes of FlowException by converting all of them to just FlowException. - // This is a temporary hack until a proper serialisation mechanism is in place. - private val flowExceptionRegistration: Registration = register( - FlowException::class, - read = { kryo, input -> - val message = input.readString() - val cause = kryo.readObjectOrNull(input, Throwable::class.java) - FlowException(message, cause) - }, - write = { kryo, output, obj -> - // The subclass may have overridden toString so we use that - val message = if (obj.javaClass != FlowException::class.java) obj.toString() else obj.message - output.writeString(message) - kryo.writeObjectOrNull(output, obj.cause, Throwable::class.java) - } - ) - override fun getRegistration(type: Class<*>): Registration { - if (Observable::class.java.isAssignableFrom(type)) - return observableRegistration ?: - throw IllegalStateException("This RPC was not annotated with @RPCReturnsObservables") - if (ListenableFuture::class.java.isAssignableFrom(type)) - return listenableFutureRegistration ?: - throw IllegalStateException("This RPC was not annotated with @RPCReturnsObservables") + val annotated = context[RPCKryoQNameKey] != null + if (Observable::class.java.isAssignableFrom(type)) { + return if (annotated) super.getRegistration(Observable::class.java) + else throw IllegalStateException("This RPC was not annotated with @RPCReturnsObservables") + } + if (ListenableFuture::class.java.isAssignableFrom(type)) { + return if (annotated) super.getRegistration(ListenableFuture::class.java) + else throw IllegalStateException("This RPC was not annotated with @RPCReturnsObservables") + } if (FlowException::class.java.isAssignableFrom(type)) - return flowExceptionRegistration + return super.getRegistration(FlowException::class.java) return super.getRegistration(type) } } -fun createRPCKryo(observableSerializer: Serializer>? = null): Kryo = RPCKryo(observableSerializer) +private val rpcSerKryoPool = KryoPool.Builder { RPCKryo(RPCDispatcher.ObservableSerializer()) }.build() + +fun createRPCKryoForSerialization(qName: String? = null, dispatcher: RPCDispatcher? = null): Kryo { + val kryo = rpcSerKryoPool.borrow() + kryo.context.put(RPCKryoQNameKey, qName) + kryo.context.put(RPCKryoDispatcherKey, dispatcher) + return kryo +} + +fun releaseRPCKryoForSerialization(kryo: Kryo) { + rpcSerKryoPool.release(kryo) +} + +private val rpcDesKryoPool = KryoPool.Builder { RPCKryo(CordaRPCClientImpl.ObservableDeserializer()) }.build() + +fun createRPCKryoForDeserialization(rpcClient: CordaRPCClientImpl, qName: String? = null, rpcName: String? = null, rpcLocation: Throwable? = null): Kryo { + val kryo = rpcDesKryoPool.borrow() + kryo.context.put(RPCKryoClientKey, rpcClient) + kryo.context.put(RPCKryoQNameKey, qName) + kryo.context.put(RPCKryoMethodNameKey, rpcName) + kryo.context.put(RPCKryoLocationKey, rpcLocation) + return kryo +} + +fun releaseRPCKryoForDeserialization(kryo: Kryo) { + rpcDesKryoPool.release(kryo) +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index bc066032cc..89db282424 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -4,7 +4,7 @@ import net.corda.core.crypto.SecureHash import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize -import net.corda.core.serialization.threadLocalStorageKryo +import net.corda.core.serialization.storageKryo import net.corda.node.services.api.Checkpoint import net.corda.node.services.api.CheckpointStorage import net.corda.node.utilities.* @@ -39,7 +39,7 @@ class DBCheckpointStorage : CheckpointStorage { private val checkpointStorage = synchronizedMap(CheckpointMap()) override fun addCheckpoint(checkpoint: Checkpoint) { - checkpointStorage.put(checkpoint.id, checkpoint.serialize(threadLocalStorageKryo(), true)) + checkpointStorage.put(checkpoint.id, checkpoint.serialize(storageKryo(), true)) } override fun removeCheckpoint(checkpoint: Checkpoint) { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index b8ca8685e9..79d2cffd92 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -6,6 +6,7 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer import co.paralleluniverse.strands.Strand import com.codahale.metrics.Gauge import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.pool.KryoPool import com.google.common.collect.HashMultimap import com.google.common.util.concurrent.ListenableFuture import kotlinx.support.jdk8.collections.removeIf @@ -71,6 +72,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, inner class FiberScheduler : FiberExecutorScheduler("Same thread scheduler", executor) + private val quasarKryoPool = KryoPool.Builder { + val serializer = Fiber.getFiberSerializer(false) as KryoSerializer + DefaultKryoCustomizer.customize(serializer.kryo) + }.build() + companion object { private val logger = loggerFor() internal val sessionTopic = TopicSession("platform.session") @@ -354,32 +360,23 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, } private fun serializeFiber(fiber: FlowStateMachineImpl<*>): SerializedBytes> { - val kryo = quasarKryo() - // add the map of tokens -> tokenizedServices to the kyro context - SerializeAsTokenSerializer.setContext(kryo, serializationContext) - return fiber.serialize(kryo) + return quasarKryo().run { kryo -> + // add the map of tokens -> tokenizedServices to the kyro context + SerializeAsTokenSerializer.setContext(kryo, serializationContext) + fiber.serialize(kryo) + } } private fun deserializeFiber(checkpoint: Checkpoint): FlowStateMachineImpl<*> { - val kryo = quasarKryo() - // put the map of token -> tokenized into the kryo context - SerializeAsTokenSerializer.setContext(kryo, serializationContext) - return checkpoint.serializedFiber.deserialize(kryo).apply { fromCheckpoint = true } - } - - private fun quasarKryo(): Kryo { - val serializer = Fiber.getFiberSerializer(false) as KryoSerializer - return createKryo(serializer.kryo).apply { - // Because we like to stick a Kryo object in a ThreadLocal to speed things up a bit, we can end up trying to - // serialise the Kryo object itself when suspending a fiber. That's dumb, useless AND can cause crashes, so - // we avoid it here. This is checkpointing specific. - register(Kryo::class, - read = { kryo, input -> createKryo((Fiber.getFiberSerializer() as KryoSerializer).kryo) }, - write = { kryo, output, obj -> } - ) + return quasarKryo().run { kryo -> + // put the map of token -> tokenized into the kryo context + SerializeAsTokenSerializer.setContext(kryo, serializationContext) + checkpoint.serializedFiber.deserialize(kryo).apply { fromCheckpoint = true } } } + private fun quasarKryo(): KryoPool = quasarKryoPool + private fun createFiber(logic: FlowLogic): FlowStateMachineImpl { val id = StateMachineRunId.createRandom() return FlowStateMachineImpl(id, logic, scheduler).apply { initFiber(this) } diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index a0912009d6..5a0eb29da4 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -8,7 +8,6 @@ import net.corda.core.ThreadBox import net.corda.core.bufferUntilSubscribed import net.corda.core.contracts.* import net.corda.core.crypto.AbstractParty -import net.corda.core.crypto.AnonymousParty import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.SecureHash import net.corda.core.node.ServiceHub @@ -16,9 +15,9 @@ import net.corda.core.node.services.Vault import net.corda.core.node.services.VaultService import net.corda.core.node.services.unconsumedStates import net.corda.core.serialization.SingletonSerializeAsToken -import net.corda.core.serialization.createKryo import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize +import net.corda.core.serialization.storageKryo import net.corda.core.tee import net.corda.core.transactions.TransactionBuilder import net.corda.core.transactions.WireTransaction @@ -76,8 +75,7 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P index = it.key.index stateStatus = Vault.StateStatus.UNCONSUMED contractStateClassName = it.value.state.data.javaClass.name - // TODO: revisit Kryo bug when using THREAD_LOCAL_KYRO - contractState = it.value.state.serialize(createKryo()).bytes + contractState = it.value.state.serialize(storageKryo()).bytes notaryName = it.value.state.notary.name notaryKey = it.value.state.notary.owningKey.toBase58String() recordedTime = services.clock.instant() @@ -165,8 +163,7 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P Sequence{iterator} .map { it -> val stateRef = StateRef(SecureHash.parse(it.txId), it.index) - // TODO: revisit Kryo bug when using THREAD_LOCAL_KRYO - val state = it.contractState.deserialize>(createKryo()) + val state = it.contractState.deserialize>(storageKryo()) StateAndRef(state, stateRef) } } @@ -184,7 +181,7 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P .and(VaultSchema.VaultStates::index eq it.index) result.get()?.each { val stateRef = StateRef(SecureHash.parse(it.txId), it.index) - val state = it.contractState.deserialize>() + val state = it.contractState.deserialize>(storageKryo()) results += StateAndRef(state, stateRef) } } @@ -353,7 +350,7 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P while (rs.next()) { val txHash = SecureHash.parse(rs.getString(1)) val index = rs.getInt(2) - val state = rs.getBytes(3).deserialize>(createKryo()) + val state = rs.getBytes(3).deserialize>(storageKryo()) consumedStates.add(StateAndRef(state, StateRef(txHash, index))) } } diff --git a/node/src/main/kotlin/net/corda/node/utilities/JDBCHashMap.kt b/node/src/main/kotlin/net/corda/node/utilities/JDBCHashMap.kt index e2de392f42..415b816fcf 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/JDBCHashMap.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/JDBCHashMap.kt @@ -3,7 +3,7 @@ package net.corda.node.utilities import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize -import net.corda.core.serialization.threadLocalStorageKryo +import net.corda.core.serialization.storageKryo import net.corda.core.utilities.loggerFor import net.corda.core.utilities.trace import org.jetbrains.exposed.sql.* @@ -65,7 +65,7 @@ fun bytesToBlob(value: SerializedBytes<*>, finalizables: MutableList<() -> Unit> return blob } -fun serializeToBlob(value: Any, finalizables: MutableList<() -> Unit>): Blob = bytesToBlob(value.serialize(threadLocalStorageKryo(), true), finalizables) +fun serializeToBlob(value: Any, finalizables: MutableList<() -> Unit>): Blob = bytesToBlob(value.serialize(storageKryo(), true), finalizables) fun bytesFromBlob(blob: Blob): SerializedBytes { try { diff --git a/node/src/test/kotlin/net/corda/node/services/database/RequeryConfigurationTest.kt b/node/src/test/kotlin/net/corda/node/services/database/RequeryConfigurationTest.kt index 6b2b87496a..d1d2be7e09 100644 --- a/node/src/test/kotlin/net/corda/node/services/database/RequeryConfigurationTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/database/RequeryConfigurationTest.kt @@ -10,8 +10,8 @@ import net.corda.core.crypto.DigitalSignature import net.corda.core.crypto.NullPublicKey import net.corda.core.crypto.SecureHash import net.corda.core.node.services.Vault -import net.corda.core.serialization.createKryo import net.corda.core.serialization.serialize +import net.corda.core.serialization.storageKryo import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.WireTransaction import net.corda.core.utilities.DUMMY_NOTARY @@ -128,7 +128,7 @@ class RequeryConfigurationTest { index = txnState.index stateStatus = Vault.StateStatus.UNCONSUMED contractStateClassName = DummyContract.SingleOwnerState::class.java.name - contractState = DummyContract.SingleOwnerState(owner = DUMMY_PUBKEY_1).serialize(createKryo()).bytes + contractState = DummyContract.SingleOwnerState(owner = DUMMY_PUBKEY_1).serialize(storageKryo()).bytes notaryName = txn.tx.notary!!.name notaryKey = txn.tx.notary!!.owningKey.toBase58String() recordedTime = Instant.now() diff --git a/samples/irs-demo/src/test/kotlin/net/corda/irs/testing/IRSTests.kt b/samples/irs-demo/src/test/kotlin/net/corda/irs/testing/IRSTests.kt index 47f8644ef8..18c6623191 100644 --- a/samples/irs-demo/src/test/kotlin/net/corda/irs/testing/IRSTests.kt +++ b/samples/irs-demo/src/test/kotlin/net/corda/irs/testing/IRSTests.kt @@ -1,7 +1,6 @@ package net.corda.irs.testing import net.corda.core.contracts.* -import net.corda.core.node.recordTransactions import net.corda.core.seconds import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.DUMMY_NOTARY @@ -77,8 +76,8 @@ fun createDummyIRS(irsSelect: Int): InterestRateSwap.State { expression = Expression("( fixedLeg.notional.pennies * (fixedLeg.fixedRate.ratioUnit.value)) -" + "(floatingLeg.notional.pennies * (calculation.fixingSchedule.get(context.getDate('currentDate')).rate.ratioUnit.value))"), - floatingLegPaymentSchedule = HashMap(), - fixedLegPaymentSchedule = HashMap() + floatingLegPaymentSchedule = mutableMapOf(), + fixedLegPaymentSchedule = mutableMapOf() ) val EUR = currency("EUR") @@ -167,8 +166,8 @@ fun createDummyIRS(irsSelect: Int): InterestRateSwap.State { expression = Expression("( fixedLeg.notional.pennies * (fixedLeg.fixedRate.ratioUnit.value)) -" + "(floatingLeg.notional.pennies * (calculation.fixingSchedule.get(context.getDate('currentDate')).rate.ratioUnit.value))"), - floatingLegPaymentSchedule = HashMap(), - fixedLegPaymentSchedule = HashMap() + floatingLegPaymentSchedule = mutableMapOf(), + fixedLegPaymentSchedule = mutableMapOf() ) val EUR = currency("EUR") @@ -413,7 +412,7 @@ class IRSTests { @Test fun `ensure failure occurs when no events in fix schedule`() { val irs = singleIRS() - val emptySchedule = HashMap() + val emptySchedule = mutableMapOf() transaction { output() { irs.copy(calculation = irs.calculation.copy(fixedLegPaymentSchedule = emptySchedule)) @@ -427,7 +426,7 @@ class IRSTests { @Test fun `ensure failure occurs when no events in floating schedule`() { val irs = singleIRS() - val emptySchedule = HashMap() + val emptySchedule = mutableMapOf() transaction { output() { irs.copy(calculation = irs.calculation.copy(floatingLegPaymentSchedule = emptySchedule))