From 0123b141d33c0425c2d3cac759e9bd872b7a2282 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Wed, 3 Jan 2024 16:17:02 +0000 Subject: [PATCH] ENT-11328: Update node initialisation to be flexible if JDK modules are not open to reflection --- .ci/api-current.txt | 2 - build.gradle | 4 +- checkpoint-tests/README.md | 9 + checkpoint-tests/build.gradle | 25 ++ .../corda/core}/flows/FastThreadLocalTest.kt | 12 +- .../serialization/kryo/KryoCheckpointTest.kt | 152 +++++++++++ .../serialization/kryo/KryoStreamsTest.kt | 0 .../internal/serialization/kryo/KryoTests.kt | 8 +- .../internal/SerializationTokenTest.kt | 7 +- constants.properties | 3 +- core-1.2/README.md | 2 +- .../FlowExternalOperationInJavaTest.java | 0 .../flows/ContractUpgradeFlowRPCTest.kt | 0 .../flows/FlowExternalAsyncOperationTest.kt | 4 +- .../FlowExternalOperationStartFlowTest.kt | 0 .../flows/FlowExternalOperationTest.kt | 3 +- .../corda/coretests/flows/FlowIsKilledTest.kt | 0 .../corda/coretests/flows/FlowSleepTest.kt | 0 .../net/corda/core/internal/InternalUtils.kt | 84 ++++++ .../net/corda/core/node/NetworkParameters.kt | 28 +- .../core/transactions/LedgerTransaction.kt | 30 +-- experimental/raft-tests/README.md | 5 + experimental/raft-tests/build.gradle | 33 +++ .../raft/RaftTransactionCommitLogTests.kt | 7 +- .../raft}/UniquenessProviderTests.kt | 5 +- .../kryo/CustomIteratorSerializers.kt | 136 ---------- .../kryo/DefaultKryoCustomizer.kt | 76 ++---- .../serialization/kryo/IteratorSerializer.kt | 52 ---- .../kryo/IteratorSerializerFactory.kt | 64 +++++ .../internal/serialization/kryo/Kryo.kt | 252 +++++------------- .../kryo/KryoCheckpointSerializer.kt | 111 +++++++- .../serialization/kryo/KryoStreams.kt | 8 +- .../kryo/LinkedHashMapEntrySerializer.kt | 36 +++ ...yListItrConcurrentModificationException.kt | 122 --------- .../serialization/kryo/KryoCheckpointTest.kt | 171 ------------ .../src/main/resources/node-jvm-args.txt | 6 - .../CustomSerializationSchemeDriverTest.kt | 4 - .../node/internal/telemetry/TelemetryTests.kt | 0 .../services/schema/NodeSchemaServiceTest.kt | 35 +-- .../statemachine/FlowMetadataRecordingTest.kt | 0 .../raft/RaftNotaryServiceTests.kt | 0 .../net/corda/node/internal/AbstractNode.kt | 21 +- .../persistence/DBTransactionStorage.kt | 8 +- .../statemachine/FlowStateMachineImpl.kt | 9 +- .../schema/PersistentStateServiceTests.kt | 52 +++- .../statemachine/FlowOperatorTests.kt | 60 +++-- .../statemachine/FlowSoftLocksTests.kt | 32 +-- .../internal/CordaClassResolverTests.kt | 7 - .../internal/amqp/CollectionSerializer.kt | 15 +- .../internal/amqp/MapSerializer.kt | 15 +- settings.gradle | 2 + 51 files changed, 782 insertions(+), 935 deletions(-) create mode 100644 checkpoint-tests/README.md create mode 100644 checkpoint-tests/build.gradle rename {core-tests/src/test/kotlin/net/corda/coretests => checkpoint-tests/src/test/kotlin/net/corda/core}/flows/FastThreadLocalTest.kt (93%) create mode 100644 checkpoint-tests/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointTest.kt rename {node-api => checkpoint-tests}/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoStreamsTest.kt (100%) rename {node-api => checkpoint-tests}/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt (97%) rename {serialization-tests => checkpoint-tests}/src/test/kotlin/net/corda/serialization/internal/SerializationTokenTest.kt (94%) rename core-tests/src/{test => integration-test}/java/net/corda/coretests/flows/FlowExternalOperationInJavaTest.java (100%) rename core-tests/src/{test => integration-test}/kotlin/net/corda/coretests/flows/ContractUpgradeFlowRPCTest.kt (100%) rename core-tests/src/{test => integration-test}/kotlin/net/corda/coretests/flows/FlowExternalAsyncOperationTest.kt (99%) rename core-tests/src/{test => integration-test}/kotlin/net/corda/coretests/flows/FlowExternalOperationStartFlowTest.kt (100%) rename core-tests/src/{test => integration-test}/kotlin/net/corda/coretests/flows/FlowExternalOperationTest.kt (99%) rename core-tests/src/{test => integration-test}/kotlin/net/corda/coretests/flows/FlowIsKilledTest.kt (100%) rename core-tests/src/{test => integration-test}/kotlin/net/corda/coretests/flows/FlowSleepTest.kt (100%) create mode 100644 experimental/raft-tests/README.md create mode 100644 experimental/raft-tests/build.gradle rename {node => experimental/raft-tests}/src/test/kotlin/net/corda/notary/experimental/raft/RaftTransactionCommitLogTests.kt (96%) rename {node/src/test/kotlin/net/corda/node/services/transactions => experimental/raft-tests/src/test/kotlin/net/corda/notary/experimental/raft}/UniquenessProviderTests.kt (99%) delete mode 100644 node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/CustomIteratorSerializers.kt delete mode 100644 node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/IteratorSerializer.kt create mode 100644 node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/IteratorSerializerFactory.kt create mode 100644 node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/LinkedHashMapEntrySerializer.kt delete mode 100644 node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/ArrayListItrConcurrentModificationException.kt delete mode 100644 node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointTest.kt rename node/src/{test => integration-test}/kotlin/net/corda/node/internal/telemetry/TelemetryTests.kt (100%) rename node/src/{test => integration-test}/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt (78%) rename node/src/{test => integration-test}/kotlin/net/corda/node/services/statemachine/FlowMetadataRecordingTest.kt (100%) rename node/src/{test => integration-test}/kotlin/net/corda/notary/experimental/raft/RaftNotaryServiceTests.kt (100%) diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 56c1bd80e4..174485f452 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -4624,8 +4624,6 @@ public final class net.corda.core.node.NetworkParameters extends java.lang.Objec @NotNull public String toString() ## -public final class net.corda.core.node.NetworkParametersKt extends java.lang.Object -## @CordaSerializable public final class net.corda.core.node.NodeDiagnosticInfo extends java.lang.Object public (String, String, int, String, java.util.List) diff --git a/build.gradle b/build.gradle index 8f6404af26..652f501416 100644 --- a/build.gradle +++ b/build.gradle @@ -121,7 +121,6 @@ buildscript { ext.fontawesomefx_commons_version = constants.getProperty("fontawesomefxCommonsVersion") ext.fontawesomefx_fontawesome_version = constants.getProperty("fontawesomefxFontawesomeVersion") ext.javaassist_version = constants.getProperty("javaassistVersion") - ext.corda_revision = { try { "git rev-parse HEAD".execute().text.trim() @@ -131,6 +130,8 @@ buildscript { } }() ext.corda_docs_link = "https://docs.corda.net/docs/corda-os/$baseVersion" + ext.node_jvm_args = project(":node:capsule").file("src/main/resources/node-jvm-args.txt").readLines() + repositories { mavenLocal() // Use system environment to activate caching with Artifactory, @@ -310,7 +311,6 @@ allprojects { } tasks.withType(Test).configureEach { - jvmArgs += project(":node:capsule").file("src/main/resources/node-jvm-args.txt").readLines() jvmArgs += "--add-modules=jdk.incubator.foreign" // For the SharedMemoryIncremental forkEvery = 20 ignoreFailures = project.hasProperty('tests.ignoreFailures') ? project.property('tests.ignoreFailures').toBoolean() : false diff --git a/checkpoint-tests/README.md b/checkpoint-tests/README.md new file mode 100644 index 0000000000..049c28d65b --- /dev/null +++ b/checkpoint-tests/README.md @@ -0,0 +1,9 @@ +# Checkpoint tests + +Restoring checkpoints require certain [JDK modules to be open](../node/capsule/src/main/resources/node-jvm-args.txt) (due to the use of +reflection). This isn't an issue for the node, as we can open up these modules via the Capsule and so doesn't impact the user in anyway. For +client code that connects to the node, or uses the Corda API outside of the node, we would rather not mandate that users also have to do +this. So, to ensure we don't accidently do that, we don't add these flags to our tests. + +This module exists for those tests which are not using the out-of-process node driver, but need to test checkpoint deserialisation. The same +node JVM args are used, and so replicates the exact behaviour of checkpoint restoration as the node. diff --git a/checkpoint-tests/build.gradle b/checkpoint-tests/build.gradle new file mode 100644 index 0000000000..60ee00bdff --- /dev/null +++ b/checkpoint-tests/build.gradle @@ -0,0 +1,25 @@ +apply plugin: 'org.jetbrains.kotlin.jvm' +apply plugin: 'net.corda.plugins.quasar-utils' + +description 'Checkpoint tests' + +dependencies { + testImplementation project(path: ':core', configuration: 'testArtifacts') + testImplementation project(":serialization") + testImplementation project(":node-api") + testImplementation project(':core-test-utils') + testImplementation project(':test-utils') + testImplementation project(":node-driver") + testImplementation "junit:junit:$junit_version" + testImplementation "org.junit.jupiter:junit-jupiter-api:$junit_jupiter_version" + testImplementation "org.assertj:assertj-core:$assertj_version" + testImplementation "org.mockito.kotlin:mockito-kotlin:$mockito_kotlin_version" + testImplementation "com.esotericsoftware:kryo:$kryo_version" + testImplementation "com.google.guava:guava:$guava_version" + testImplementation "io.netty:netty-common:$netty_version" + testImplementation "org.slf4j:slf4j-api:$slf4j_version" +} + +test { + jvmArgs += node_jvm_args +} diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FastThreadLocalTest.kt b/checkpoint-tests/src/test/kotlin/net/corda/core/flows/FastThreadLocalTest.kt similarity index 93% rename from core-tests/src/test/kotlin/net/corda/coretests/flows/FastThreadLocalTest.kt rename to checkpoint-tests/src/test/kotlin/net/corda/core/flows/FastThreadLocalTest.kt index f029492750..03ef2e3879 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/flows/FastThreadLocalTest.kt +++ b/checkpoint-tests/src/test/kotlin/net/corda/core/flows/FastThreadLocalTest.kt @@ -1,4 +1,4 @@ -package net.corda.coretests.flows +package net.corda.core.flows import co.paralleluniverse.fibers.Fiber import co.paralleluniverse.fibers.FiberExecutorScheduler @@ -11,11 +11,10 @@ import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.rootCause import net.corda.core.utilities.getOrThrow +import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.catchThrowable -import org.hamcrest.Matchers.lessThanOrEqualTo -import org.junit.Assert.assertThat import org.junit.Test -import java.util.* +import java.util.UUID import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicInteger @@ -65,7 +64,7 @@ class FastThreadLocalTest { override fun initialValue() = ExpensiveObj() } runFibers(100, threadLocal::get) // Return value could be anything. - assertThat(expensiveObjCount.get(), lessThanOrEqualTo(3)) + assertThat(expensiveObjCount.get()).isLessThanOrEqualTo(3) } /** @return the number of times a different expensive object was obtained post-suspend. */ @@ -104,7 +103,6 @@ class FastThreadLocalTest { } private fun contentIsNotSerialized(threadLocalGet: () -> UnserializableObj) = scheduled(1, ::FastThreadLocalThread) { - // Use false like AbstractKryoSerializationScheme, the default of true doesn't work at all: val serializer = Fiber.getFiberSerializer(false) val returnValue = UUID.randomUUID() val deserializedFiber = serializer.read(openFuture().let { @@ -133,7 +131,7 @@ class FastThreadLocalTest { obj = null } // In retainObj false case, check this doesn't attempt to serialize fields of currentThread: - Fiber.parkAndSerialize { fiber, _ -> bytesFuture.capture { serializer.write(fiber) } } + Fiber.parkAndCustomSerialize { fiber -> bytesFuture.capture { serializer.write(fiber) } } return returnValue } } diff --git a/checkpoint-tests/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointTest.kt b/checkpoint-tests/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointTest.kt new file mode 100644 index 0000000000..46b3998401 --- /dev/null +++ b/checkpoint-tests/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointTest.kt @@ -0,0 +1,152 @@ +package net.corda.nodeapi.internal.serialization.kryo + +import net.corda.core.serialization.internal.checkpointDeserialize +import net.corda.core.serialization.internal.checkpointSerialize +import net.corda.testing.core.internal.CheckpointSerializationEnvironmentRule +import org.assertj.core.api.Assertions.assertThat +import org.junit.Ignore +import org.junit.Rule +import org.junit.Test +import java.time.Instant +import java.util.LinkedList +import kotlin.test.assertEquals + +class KryoCheckpointTest { + companion object { + // A value big enough to trigger any stack overflow issues + private const val SIZE = 10_000 + private const val CHUNK = 2 + } + + @Rule + @JvmField + val serializationRule = CheckpointSerializationEnvironmentRule() + + @Ignore("Kryo optimizes boxed primitives so this does not work. Need to customise ReferenceResolver to stop it doing it.") + @Test(timeout = 300_000) + fun `linked hash map values can checkpoint without error, even with repeats for boxed primitives`() { + var lastValue = 0 + val dummyMap = linkedMapOf() + for (i in 0..SIZE) { + dummyMap[i.toString()] = (i % 10) + } + var it = dummyMap.values.iterator() + while (it.hasNext()) { + lastValue = it.next() + val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT) + it = KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT) + } + assertEquals(SIZE % 10, lastValue) + } + + @Test(timeout=300_000) + fun `ArrayList iterator can checkpoint without error`() { + testIteratorCheckpointing(ArrayList()) + } + + @Test(timeout=300_000) + fun `LinkedList iterator can checkpoint without error`() { + testIteratorCheckpointing(LinkedList()) + } + + @Test(timeout=300_000) + fun `HashSet iterator can checkpoint without error`() { + testIteratorCheckpointing(HashSet()) + } + + @Test(timeout=300_000) + fun `LinkedHashSet iterator can checkpoint without error`() { + testIteratorCheckpointing(LinkedHashSet()) + } + + @Test(timeout=300_000) + fun `HashMap iterator can checkpoint without error`() { + testMapIteratorCheckpointing(HashMap()) + } + + @Test(timeout=300_000) + fun `LinkedHashMap iterator can checkpoint without error`() { + testMapIteratorCheckpointing(LinkedHashMap()) + } + + @Test(timeout=300_000) + fun `Instant can checkpoint without error`() { + val original = Instant.now() + assertThat(checkpointRoundtrip(original)).isEqualTo(original) + } + + private fun testIteratorCheckpointing(collection: MutableCollection) { + collection.addAll(0 until SIZE) + testIteratorCheckpointing(collection.iterator()) + if (collection is List<*>) { + testListIteratorCheckpointing(collection) + } + } + + private fun testIteratorCheckpointing(originalIterator: Iterator<*>) { + var endReached = false + for ((_, skip) in testIndices) { + repeat(skip) { + originalIterator.next() + } + val hasNext = originalIterator.hasNext() + val roundtripIterator = checkpointRoundtrip(originalIterator) + assertThat(hasNext).isEqualTo(originalIterator.hasNext()) // Make sure serialising it doesn't change it + assertThat(roundtripIterator.hasNext()).isEqualTo(hasNext) + if (!hasNext) { + endReached = true + break + } + assertThat(roundtripIterator.next()).isEqualTo(originalIterator.next()) + } + assertThat(endReached).isTrue() + } + + private fun testListIteratorCheckpointing(list: List<*>) { + for ((index, _) in testIndices) { + val originalIterator = list.listIterator(index) + while (true) { + val roundtripIterator = checkpointRoundtrip(originalIterator) + assertThat(roundtripIterator.previousIndex()).isEqualTo(originalIterator.previousIndex()) + assertThat(roundtripIterator.hasPrevious()).isEqualTo(originalIterator.hasPrevious()) + if (originalIterator.hasPrevious()) { + assertThat(roundtripIterator.previous()).isEqualTo(originalIterator.previous()) + roundtripIterator.next() + originalIterator.next() + } + assertThat(roundtripIterator.nextIndex()).isEqualTo(originalIterator.nextIndex()) + assertThat(roundtripIterator.hasNext()).isEqualTo(originalIterator.hasNext()) + if (!originalIterator.hasNext()) break + assertThat(roundtripIterator.next()).isEqualTo(originalIterator.next()) + } + } + } + + private fun testMapIteratorCheckpointing(map: MutableMap) { + repeat(SIZE) { index -> + map[index] = index + } + testIteratorCheckpointing(map.keys.iterator()) + testIteratorCheckpointing(map.values.iterator()) + testIteratorCheckpointing(map.entries.iterator()) + } + + private inline fun checkpointRoundtrip(obj: T): T { + val bytes = obj.checkpointSerialize(KRYO_CHECKPOINT_CONTEXT) + return bytes.checkpointDeserialize(KRYO_CHECKPOINT_CONTEXT) + } + + /** + * Return a Sequence of indicies which just iterates over the first and last [CHUNK], otherwise the tests take too long. The second + * value of the [Pair] is the number of elements to skip over from the previous iteration. + */ + private val testIndices: Sequence> + get() = generateSequence(Pair(0, 0)) { (previous, _) -> + when { + previous < CHUNK - 1 -> Pair(previous + 1, 0) + previous == CHUNK - 1 -> Pair(SIZE - CHUNK, SIZE - CHUNK - previous) + previous < SIZE - 1 -> Pair(previous + 1, 0) + else -> null + } + } +} diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoStreamsTest.kt b/checkpoint-tests/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoStreamsTest.kt similarity index 100% rename from node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoStreamsTest.kt rename to checkpoint-tests/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoStreamsTest.kt diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt b/checkpoint-tests/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt similarity index 97% rename from node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt rename to checkpoint-tests/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt index 0692abeae5..161555c45a 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt +++ b/checkpoint-tests/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoTests.kt @@ -37,8 +37,6 @@ import net.corda.serialization.internal.encodingNotPermittedFormat import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.TestIdentity import net.corda.testing.core.internal.CheckpointSerializationEnvironmentRule -import org.apache.commons.lang3.JavaVersion -import org.apache.commons.lang3.SystemUtils import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.assertj.core.api.Assertions.catchThrowable @@ -394,11 +392,7 @@ class KryoTests(private val compression: CordaSerializationEncoding?) { val obj = Holder(ByteArray(20000)) val uncompressedSize = obj.checkpointSerialize(context.withEncoding(null)).size val compressedSize = obj.checkpointSerialize(context.withEncoding(CordaSerializationEncoding.SNAPPY)).size - // If these need fixing, sounds like Kryo wire format changed and checkpoints might not survive an upgrade. - if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_11)) - assertEquals(20127, uncompressedSize) - else - assertEquals(20234, uncompressedSize) + assertEquals(20127, uncompressedSize) assertEquals(1095, compressedSize) } } diff --git a/serialization-tests/src/test/kotlin/net/corda/serialization/internal/SerializationTokenTest.kt b/checkpoint-tests/src/test/kotlin/net/corda/serialization/internal/SerializationTokenTest.kt similarity index 94% rename from serialization-tests/src/test/kotlin/net/corda/serialization/internal/SerializationTokenTest.kt rename to checkpoint-tests/src/test/kotlin/net/corda/serialization/internal/SerializationTokenTest.kt index 367b1441de..fd877fa160 100644 --- a/serialization-tests/src/test/kotlin/net/corda/serialization/internal/SerializationTokenTest.kt +++ b/checkpoint-tests/src/test/kotlin/net/corda/serialization/internal/SerializationTokenTest.kt @@ -1,6 +1,5 @@ package net.corda.serialization.internal -import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.KryoException import com.esotericsoftware.kryo.io.Output import net.corda.core.serialization.SerializationToken @@ -14,9 +13,7 @@ import net.corda.core.serialization.internal.checkpointDeserialize import net.corda.core.serialization.internal.checkpointSerialize import net.corda.core.utilities.OpaqueBytes import net.corda.coretesting.internal.rigorousMock -import net.corda.nodeapi.internal.serialization.kryo.CordaClassResolver -import net.corda.nodeapi.internal.serialization.kryo.CordaKryo -import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer +import net.corda.nodeapi.internal.serialization.kryo.KryoCheckpointSerializer import net.corda.nodeapi.internal.serialization.kryo.kryoMagic import net.corda.testing.core.internal.CheckpointSerializationEnvironmentRule import org.assertj.core.api.Assertions.assertThat @@ -110,7 +107,7 @@ class SerializationTokenTest { val context = serializeAsTokenContext(tokenizableBefore) val testContext = this.context.withTokenContext(context) - val kryo: Kryo = DefaultKryoCustomizer.customize(CordaKryo(CordaClassResolver(this.context))) + val kryo = KryoCheckpointSerializer.createFiberSerializer(this.context).kryo val stream = ByteArrayOutputStream() Output(stream).use { kryoMagic.writeTo(it) diff --git a/constants.properties b/constants.properties index 1ad8361d59..b0d60fa203 100644 --- a/constants.properties +++ b/constants.properties @@ -17,8 +17,7 @@ platformVersion=14 openTelemetryVersion=1.20.1 openTelemetrySemConvVersion=1.20.1-alpha guavaVersion=28.0-jre -# Quasar version to use with Java 8: -quasarVersion=0.9.0_r3 +quasarVersion=0.9.1_r3-SNAPSHOT dockerJavaVersion=3.2.5 proguardVersion=7.3.1 // bouncy castle version must not be changed on a patch release. Needs a full release test cycle to flush out any issues. diff --git a/core-1.2/README.md b/core-1.2/README.md index 9543551417..078a7a0ae6 100644 --- a/core-1.2/README.md +++ b/core-1.2/README.md @@ -1,4 +1,4 @@ This is a Kotlin 1.2 version of the `core` module, which is consumed by the `verifier` module, for verifying contracts written in Kotlin 1.2. This is just a "shell" module which uses the existing the code in `core` and compiles it with the 1.2 compiler. -To allow `core` to benefit from new APIs introduced since 1.2, those APIs much be copied into this module with the same `kotlin` package. +To allow `core` to benefit from new APIs introduced since 1.2, those APIs must be copied into this module with the same `kotlin` package. diff --git a/core-tests/src/test/java/net/corda/coretests/flows/FlowExternalOperationInJavaTest.java b/core-tests/src/integration-test/java/net/corda/coretests/flows/FlowExternalOperationInJavaTest.java similarity index 100% rename from core-tests/src/test/java/net/corda/coretests/flows/FlowExternalOperationInJavaTest.java rename to core-tests/src/integration-test/java/net/corda/coretests/flows/FlowExternalOperationInJavaTest.java diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/ContractUpgradeFlowRPCTest.kt b/core-tests/src/integration-test/kotlin/net/corda/coretests/flows/ContractUpgradeFlowRPCTest.kt similarity index 100% rename from core-tests/src/test/kotlin/net/corda/coretests/flows/ContractUpgradeFlowRPCTest.kt rename to core-tests/src/integration-test/kotlin/net/corda/coretests/flows/ContractUpgradeFlowRPCTest.kt diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalAsyncOperationTest.kt b/core-tests/src/integration-test/kotlin/net/corda/coretests/flows/FlowExternalAsyncOperationTest.kt similarity index 99% rename from core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalAsyncOperationTest.kt rename to core-tests/src/integration-test/kotlin/net/corda/coretests/flows/FlowExternalAsyncOperationTest.kt index e847fd5788..adfb382f22 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalAsyncOperationTest.kt +++ b/core-tests/src/integration-test/kotlin/net/corda/coretests/flows/FlowExternalAsyncOperationTest.kt @@ -201,11 +201,9 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() { FlowWithExternalProcess(party) { @Suspendable override fun testCode(): Any { - val e = createException() - return await(ExternalAsyncOperation(serviceHub, (SerializableLambda2 { _, _ -> CompletableFuture().apply { - completeExceptionally(e) + completeExceptionally(createException()) } }))) } diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalOperationStartFlowTest.kt b/core-tests/src/integration-test/kotlin/net/corda/coretests/flows/FlowExternalOperationStartFlowTest.kt similarity index 100% rename from core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalOperationStartFlowTest.kt rename to core-tests/src/integration-test/kotlin/net/corda/coretests/flows/FlowExternalOperationStartFlowTest.kt diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalOperationTest.kt b/core-tests/src/integration-test/kotlin/net/corda/coretests/flows/FlowExternalOperationTest.kt similarity index 99% rename from core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalOperationTest.kt rename to core-tests/src/integration-test/kotlin/net/corda/coretests/flows/FlowExternalOperationTest.kt index 98a91090da..5215b6ccf6 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowExternalOperationTest.kt +++ b/core-tests/src/integration-test/kotlin/net/corda/coretests/flows/FlowExternalOperationTest.kt @@ -256,8 +256,7 @@ class FlowExternalOperationTest : AbstractFlowExternalOperationTest() { @Suspendable override fun testCode() { - val e = createException() - await(ExternalOperation(serviceHub, (SerializableLambda2 { _, _ -> throw e }))) + await(ExternalOperation(serviceHub, (SerializableLambda2 { _, _ -> throw createException() }))) } private fun createException() = when (exceptionType) { diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowIsKilledTest.kt b/core-tests/src/integration-test/kotlin/net/corda/coretests/flows/FlowIsKilledTest.kt similarity index 100% rename from core-tests/src/test/kotlin/net/corda/coretests/flows/FlowIsKilledTest.kt rename to core-tests/src/integration-test/kotlin/net/corda/coretests/flows/FlowIsKilledTest.kt diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FlowSleepTest.kt b/core-tests/src/integration-test/kotlin/net/corda/coretests/flows/FlowSleepTest.kt similarity index 100% rename from core-tests/src/test/kotlin/net/corda/coretests/flows/FlowSleepTest.kt rename to core-tests/src/integration-test/kotlin/net/corda/coretests/flows/FlowSleepTest.kt diff --git a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt index 490994e8dd..0c58365d8f 100644 --- a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt +++ b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt @@ -2,6 +2,11 @@ package net.corda.core.internal +import com.google.common.collect.ImmutableList +import com.google.common.collect.ImmutableMap +import com.google.common.collect.ImmutableSet +import com.google.common.collect.ImmutableSortedMap +import com.google.common.collect.ImmutableSortedSet import net.corda.core.crypto.Crypto import net.corda.core.crypto.DigitalSignature import net.corda.core.crypto.SecureHash @@ -52,6 +57,8 @@ import java.time.Duration import java.time.temporal.Temporal import java.util.Collections import java.util.PrimitiveIterator +import java.util.SortedMap +import java.util.SortedSet import java.util.Spliterator import java.util.Spliterator.DISTINCT import java.util.Spliterator.IMMUTABLE @@ -61,6 +68,8 @@ import java.util.Spliterator.SIZED import java.util.Spliterator.SORTED import java.util.Spliterator.SUBSIZED import java.util.Spliterators +import java.util.TreeMap +import java.util.TreeSet import java.util.concurrent.ExecutorService import java.util.concurrent.TimeUnit import java.util.jar.JarEntry @@ -160,6 +169,79 @@ inline fun Iterable.flatMapToSet(transform: (T) -> Iterable): Set Collection.toImmutableList(): List { + return when { + isEmpty() -> emptyList() + size == 1 -> listOf(first()) + this is ImmutableList -> this + contains(null as T) -> (this as java.util.Collection).toArray().asList() as List + else -> ImmutableList.copyOf(this) + } +} + +/** + * Returns an immutable, iteration preserving, [Set] which cannot be modified, nor its contents changed indirectly via the receiver. Tries + * to avoid copying data. + */ +@Suppress("UNCHECKED_CAST") +fun Collection.toImmutableSet(): Set { + return when { + isEmpty() -> emptySet() + size == 1 -> setOf(first()) + contains(null as T) -> Collections.unmodifiableSet(LinkedHashSet(this)) + else -> ImmutableSet.copyOf(this) + } +} + +/** + * Returns an immutable [SortedSet] which cannot be modified, nor its contents changed indirectly via the receiver. Tries to avoid copying + * data. + */ +@Suppress("UNCHECKED_CAST") +fun Collection.toImmutableSortedSet(): SortedSet { + return when { + isEmpty() -> Collections.emptySortedSet() + contains(null as T) -> Collections.unmodifiableSortedSet(TreeSet(this)) + else -> ImmutableSortedSet.copyOf(this) + } +} + +/** + * Returns an immutable, iteration preserving, [Map] which cannot be modified, nor its contents changed indirectly via the receiver. Tries + * to avoid copying data. + */ +@Suppress("UNCHECKED_CAST") +fun Map.toImmutableMap(): Map { + return when { + isEmpty() -> emptyMap() + size == 1 -> entries.first().let { Collections.singletonMap(it.key, it.value) } + containsValue(null as V) || containsKey(null as K) -> Collections.unmodifiableMap(LinkedHashMap(this)) + else -> ImmutableMap.copyOf(this) + } +} + +/** + * Returns an immutable [SortedMap] which cannot be modified, nor its contents changed indirectly via the receiver. Tries to avoid copying + * data. + */ +@Suppress("UNCHECKED_CAST") +fun Map.toImmutableSortedMap(): SortedMap { + return when { + isEmpty() -> Collections.emptySortedMap() + containsValue(null as V) || containsKey(null as K) -> Collections.unmodifiableSortedMap(TreeMap(this)) + else -> ImmutableSortedMap.copyOf(this) + } +} + fun InputStream.copyTo(target: Path, vararg options: CopyOption): Long = Files.copy(this, target, *options) /** Same as [InputStream.readBytes] but also closes the stream. */ @@ -430,6 +512,8 @@ val Class<*>.packageNameOrNull: String? // This intentionally does not go via `p } } +val Class<*>.fullyQualifiedPackage: String get() = "${module.name}/$packageName" + inline val Class<*>.isAbstractClass: Boolean get() = Modifier.isAbstract(modifiers) inline val Class<*>.isConcreteClass: Boolean get() = !isInterface && !isAbstractClass diff --git a/core/src/main/kotlin/net/corda/core/node/NetworkParameters.kt b/core/src/main/kotlin/net/corda/core/node/NetworkParameters.kt index 098b45f802..85849f0b4b 100644 --- a/core/src/main/kotlin/net/corda/core/node/NetworkParameters.kt +++ b/core/src/main/kotlin/net/corda/core/node/NetworkParameters.kt @@ -5,6 +5,8 @@ import net.corda.core.crypto.toStringShort import net.corda.core.identity.Party import net.corda.core.internal.noPackageOverlap import net.corda.core.internal.requirePackageValid +import net.corda.core.internal.toImmutableList +import net.corda.core.internal.toImmutableMap import net.corda.core.node.services.AttachmentId import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.DeprecatedConstructorForDeserialization @@ -12,8 +14,6 @@ import net.corda.core.utilities.days import java.security.PublicKey import java.time.Duration import java.time.Instant -import java.util.Collections.unmodifiableList -import java.util.Collections.unmodifiableMap // DOCSTART 1 /** @@ -245,38 +245,20 @@ data class NetworkParameters( fun toImmutable(): NetworkParameters { return NetworkParameters( minimumPlatformVersion = minimumPlatformVersion, - notaries = unmodifiable(notaries), + notaries = notaries.toImmutableList(), maxMessageSize = maxMessageSize, maxTransactionSize = maxTransactionSize, modifiedTime = modifiedTime, epoch = epoch, - whitelistedContractImplementations = unmodifiable(whitelistedContractImplementations) { entry -> - unmodifiableList(entry.value) - }, + whitelistedContractImplementations = whitelistedContractImplementations.mapValues { it.value.toImmutableList() }.toImmutableMap(), eventHorizon = eventHorizon, - packageOwnership = unmodifiable(packageOwnership), + packageOwnership = packageOwnership.toImmutableMap(), recoveryMaximumBackupInterval = recoveryMaximumBackupInterval, confidentialIdentityMinimumBackupInterval = confidentialIdentityMinimumBackupInterval ) } } -private fun unmodifiable(list: List): List { - return if (list.isEmpty()) { - emptyList() - } else { - unmodifiableList(list) - } -} - -private inline fun unmodifiable(map: Map, transform: (Map.Entry) -> V = Map.Entry::value): Map { - return if (map.isEmpty()) { - emptyMap() - } else { - unmodifiableMap(map.mapValues(transform)) - } -} - /** * Data class storing information about notaries available in the network. * @property identity Identity of the notary (note that it can be an identity of the distributed node). diff --git a/core/src/main/kotlin/net/corda/core/transactions/LedgerTransaction.kt b/core/src/main/kotlin/net/corda/core/transactions/LedgerTransaction.kt index 8845cfca4c..06d32653ef 100644 --- a/core/src/main/kotlin/net/corda/core/transactions/LedgerTransaction.kt +++ b/core/src/main/kotlin/net/corda/core/transactions/LedgerTransaction.kt @@ -22,6 +22,7 @@ import net.corda.core.internal.deserialiseCommands import net.corda.core.internal.deserialiseComponentGroup import net.corda.core.internal.eagerDeserialise import net.corda.core.internal.isUploaderTrusted +import net.corda.core.internal.toImmutableList import net.corda.core.internal.uncheckedCast import net.corda.core.internal.verification.AbstractVerifier import net.corda.core.internal.verification.Verifier @@ -32,7 +33,6 @@ import net.corda.core.serialization.SerializationFactory import net.corda.core.serialization.internal.AttachmentsClassLoaderBuilder import net.corda.core.serialization.internal.AttachmentsClassLoaderCache import net.corda.core.utilities.contextLogger -import java.util.Collections.unmodifiableList import java.util.function.Predicate import java.util.function.Supplier @@ -125,18 +125,6 @@ private constructor( companion object { private val logger = contextLogger() - private fun protect(list: List): List { - return list.run { - if (isEmpty()) { - emptyList() - } else { - unmodifiableList(this) - } - } - } - - private fun protectOrNull(list: List?): List? = list?.let(::protect) - @CordaInternal internal fun create( inputs: List>, @@ -168,9 +156,9 @@ private constructor( privacySalt = privacySalt, networkParameters = networkParameters, references = references, - componentGroups = protectOrNull(componentGroups), - serializedInputs = protectOrNull(serializedInputs), - serializedReferences = protectOrNull(serializedReferences), + componentGroups = componentGroups?.toImmutableList(), + serializedInputs = serializedInputs?.toImmutableList(), + serializedReferences = serializedReferences?.toImmutableList(), isAttachmentTrusted = isAttachmentTrusted, verifierFactory = verifierFactory, attachmentsClassLoaderCache = attachmentsClassLoaderCache, @@ -197,16 +185,16 @@ private constructor( references: List>, digestService: DigestService): LedgerTransaction { return LedgerTransaction( - inputs = protect(inputs), - outputs = protect(outputs), - commands = protect(commands), - attachments = protect(attachments), + inputs = inputs.toImmutableList(), + outputs = outputs.toImmutableList(), + commands = commands.toImmutableList(), + attachments = attachments.toImmutableList(), id = id, notary = notary, timeWindow = timeWindow, privacySalt = privacySalt, networkParameters = networkParameters, - references = protect(references), + references = references.toImmutableList(), componentGroups = null, serializedInputs = null, serializedReferences = null, diff --git a/experimental/raft-tests/README.md b/experimental/raft-tests/README.md new file mode 100644 index 0000000000..ea43cf1c39 --- /dev/null +++ b/experimental/raft-tests/README.md @@ -0,0 +1,5 @@ +# Raft tests + +Testing the `RaftUniquenessProvider` provider requires the `java.nio` packaage to be open (the atomix library does reflection into +`java.nio.Bits`). This module has this package opened up to allow mock and unit tests to work. This is preferred over having `java.nio` open +in every module as this is an experimental feature. diff --git a/experimental/raft-tests/build.gradle b/experimental/raft-tests/build.gradle new file mode 100644 index 0000000000..1a6c9fc078 --- /dev/null +++ b/experimental/raft-tests/build.gradle @@ -0,0 +1,33 @@ +apply plugin: 'org.jetbrains.kotlin.jvm' +//apply plugin: 'net.corda.plugins.quasar-utils' + +description 'Raft tests' + +dependencies { +// testImplementation project(path: ':core', configuration: 'testArtifacts') + testImplementation project(":core") +// testImplementation project(":serialization") + testImplementation project(":node-api") + testImplementation project(":node") + testImplementation project(':core-test-utils') + testImplementation project(':test-utils') + testImplementation project(":node-driver") + testImplementation "junit:junit:$junit_version" +// testImplementation "org.junit.jupiter:junit-jupiter-api:$junit_jupiter_version" + testImplementation "org.assertj:assertj-core:$assertj_version" +// testImplementation "org.mockito.kotlin:mockito-kotlin:$mockito_kotlin_version" +// testImplementation "com.esotericsoftware:kryo:$kryo_version" +// testImplementation "com.google.guava:guava:$guava_version" +// testImplementation "io.netty:netty-common:$netty_version" +// testImplementation "org.slf4j:slf4j-api:$slf4j_version" + testImplementation "io.dropwizard.metrics:metrics-jmx:$metrics_version" + testImplementation 'io.atomix.copycat:copycat-client:1.2.3' + testImplementation 'io.atomix.copycat:copycat-server:1.2.3' +} + +test { + enabled = true // Something is disabling the test + jvmArgs += [ + "--add-opens=java.base/java.nio=ALL-UNNAMED" + ] +} diff --git a/node/src/test/kotlin/net/corda/notary/experimental/raft/RaftTransactionCommitLogTests.kt b/experimental/raft-tests/src/test/kotlin/net/corda/notary/experimental/raft/RaftTransactionCommitLogTests.kt similarity index 96% rename from node/src/test/kotlin/net/corda/notary/experimental/raft/RaftTransactionCommitLogTests.kt rename to experimental/raft-tests/src/test/kotlin/net/corda/notary/experimental/raft/RaftTransactionCommitLogTests.kt index a794397fb1..5cebd1dbc3 100644 --- a/node/src/test/kotlin/net/corda/notary/experimental/raft/RaftTransactionCommitLogTests.kt +++ b/experimental/raft-tests/src/test/kotlin/net/corda/notary/experimental/raft/RaftTransactionCommitLogTests.kt @@ -26,9 +26,8 @@ import net.corda.testing.internal.LogHelper import net.corda.testing.internal.TestingNamedCacheFactory import net.corda.testing.internal.configureDatabase import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties -import org.hamcrest.Matchers.instanceOf +import org.assertj.core.api.Assertions.assertThat import org.junit.After -import org.junit.Assert.assertThat import org.junit.Before import org.junit.Rule import org.junit.Test @@ -119,7 +118,7 @@ class RaftTransactionCommitLogTests { states, txId, requestingPartyName.toString(), requestSignature, timeWindow ) val commitError = client.submit(commitCommand).getOrThrow() - assertThat(commitError, instanceOf(NotaryError.TimeWindowInvalid::class.java)) + assertThat(commitError).isInstanceOf(NotaryError.TimeWindowInvalid::class.java) } @Test(timeout=300_000) @@ -158,7 +157,7 @@ class RaftTransactionCommitLogTests { val address = Address(myAddress.host, myAddress.port) val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null }, NodeSchemaService(extraSchemas = setOf(RaftNotarySchemaV1))) databases.add(database) - val stateMachineFactory = { RaftTransactionCommitLog(database, Clock.systemUTC(), { RaftUniquenessProvider.createMap(TestingNamedCacheFactory()) }) } + val stateMachineFactory = { RaftTransactionCommitLog(database, Clock.systemUTC()) { RaftUniquenessProvider.createMap(TestingNamedCacheFactory()) } } val server = CopycatServer.builder(address) .withStateMachine(stateMachineFactory) diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/UniquenessProviderTests.kt b/experimental/raft-tests/src/test/kotlin/net/corda/notary/experimental/raft/UniquenessProviderTests.kt similarity index 99% rename from node/src/test/kotlin/net/corda/node/services/transactions/UniquenessProviderTests.kt rename to experimental/raft-tests/src/test/kotlin/net/corda/notary/experimental/raft/UniquenessProviderTests.kt index b94023b305..cdd9ecf184 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/UniquenessProviderTests.kt +++ b/experimental/raft-tests/src/test/kotlin/net/corda/notary/experimental/raft/UniquenessProviderTests.kt @@ -1,4 +1,4 @@ -package net.corda.node.services.transactions +package net.corda.notary.experimental.raft import com.codahale.metrics.MetricRegistry import net.corda.core.contracts.TimeWindow @@ -25,9 +25,6 @@ import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.notary.common.BatchSignature -import net.corda.notary.experimental.raft.RaftConfig -import net.corda.notary.experimental.raft.RaftNotarySchemaV1 -import net.corda.notary.experimental.raft.RaftUniquenessProvider import net.corda.notary.jpa.JPANotaryConfiguration import net.corda.notary.jpa.JPANotarySchemaV1 import net.corda.notary.jpa.JPAUniquenessProvider diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/CustomIteratorSerializers.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/CustomIteratorSerializers.kt deleted file mode 100644 index 193707dc0c..0000000000 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/CustomIteratorSerializers.kt +++ /dev/null @@ -1,136 +0,0 @@ -package net.corda.nodeapi.internal.serialization.kryo - -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.Serializer -import com.esotericsoftware.kryo.io.Input -import com.esotericsoftware.kryo.io.Output -import java.lang.reflect.Constructor -import java.lang.reflect.Field -import java.util.LinkedHashMap -import java.util.LinkedHashSet -import java.util.LinkedList - -/** - * The [LinkedHashMap] and [LinkedHashSet] have a problem with the default Quasar/Kryo serialisation - * in that serialising an iterator (and subsequent [LinkedHashMap.Entry]) over a sufficiently large - * data set can lead to a stack overflow (because the object map is traversed recursively). - * - * We've added our own custom serializer in order to ensure that the iterator is correctly deserialized. - */ -internal object LinkedHashMapIteratorSerializer : Serializer>() { - private val DUMMY_MAP = linkedMapOf(1L to 1) - private val outerMapField: Field = getIterator()::class.java.superclass.getDeclaredField("this$0").apply { isAccessible = true } - private val currentField: Field = getIterator()::class.java.superclass.getDeclaredField("current").apply { isAccessible = true } - - private val KEY_ITERATOR_CLASS: Class> = DUMMY_MAP.keys.iterator().javaClass - private val VALUE_ITERATOR_CLASS: Class> = DUMMY_MAP.values.iterator().javaClass - private val MAP_ITERATOR_CLASS: Class>> = DUMMY_MAP.iterator().javaClass - - fun getIterator(): Any = DUMMY_MAP.iterator() - - override fun write(kryo: Kryo, output: Output, obj: Iterator<*>) { - val current: Map.Entry<*, *>? = currentField.get(obj) as Map.Entry<*, *>? - kryo.writeClassAndObject(output, outerMapField.get(obj)) - kryo.writeClassAndObject(output, current) - } - - override fun read(kryo: Kryo, input: Input, type: Class>): Iterator<*> { - val outerMap = kryo.readClassAndObject(input) as Map<*, *> - return when (type) { - KEY_ITERATOR_CLASS -> { - val current = (kryo.readClassAndObject(input) as? Map.Entry<*, *>)?.key - outerMap.keys.iterator().returnToIteratorLocation(kryo, current) - } - VALUE_ITERATOR_CLASS -> { - val current = (kryo.readClassAndObject(input) as? Map.Entry<*, *>)?.value - outerMap.values.iterator().returnToIteratorLocation(kryo, current) - } - MAP_ITERATOR_CLASS -> { - val current = (kryo.readClassAndObject(input) as? Map.Entry<*, *>) - outerMap.iterator().returnToIteratorLocation(kryo, current) - } - else -> throw IllegalStateException("Invalid type") - } - } - - private fun Iterator<*>.returnToIteratorLocation(kryo: Kryo, current: Any?): Iterator<*> { - while (this.hasNext()) { - val key = this.next() - if (iteratedObjectsEqual(kryo, key, current)) break - } - return this - } - - private fun iteratedObjectsEqual(kryo: Kryo, a: Any?, b: Any?): Boolean = if (a == null || b == null) { - a == b - } else { - a === b || mapEntriesEqual(kryo, a, b) || kryoOptimisesAwayReferencesButEqual(kryo, a, b) - } - - /** - * Kryo can substitute brand new created instances for some types during deserialization, making the identity check fail. - * Fall back to equality for those. - */ - private fun kryoOptimisesAwayReferencesButEqual(kryo: Kryo, a: Any, b: Any) = - (!kryo.referenceResolver.useReferences(a.javaClass) && !kryo.referenceResolver.useReferences(b.javaClass) && a == b) - - private fun mapEntriesEqual(kryo: Kryo, a: Any, b: Any) = - (a is Map.Entry<*, *> && b is Map.Entry<*, *> && iteratedObjectsEqual(kryo, a.key, b.key)) -} - -/** - * The [LinkedHashMap] and [LinkedHashSet] have a problem with the default Quasar/Kryo serialisation - * in that serialising an iterator (and subsequent [LinkedHashMap.Entry]) over a sufficiently large - * data set can lead to a stack overflow (because the object map is traversed recursively). - * - * We've added our own custom serializer in order to ensure that only the key/value are recorded. - * The rest of the list isn't required at this scope. - */ -object LinkedHashMapEntrySerializer : Serializer>() { - // Create a dummy map so that we can get the LinkedHashMap$Entry from it - // The element type of the map doesn't matter. The entry is all we want - private val DUMMY_MAP = linkedMapOf(1L to 1) - fun getEntry(): Any = DUMMY_MAP.entries.first() - private val constr: Constructor<*> = getEntry()::class.java.declaredConstructors.single().apply { isAccessible = true } - - /** - * Kryo would end up serialising "this" entry, then serialise "this.after" recursively, leading to a very large stack. - * we'll skip that and just write out the key/value - */ - override fun write(kryo: Kryo, output: Output, obj: Map.Entry<*, *>) { - val e: Map.Entry<*, *> = obj - kryo.writeClassAndObject(output, e.key) - kryo.writeClassAndObject(output, e.value) - } - - override fun read(kryo: Kryo, input: Input, type: Class>): Map.Entry<*, *> { - val key = kryo.readClassAndObject(input) - val value = kryo.readClassAndObject(input) - return constr.newInstance(0, key, value, null) as Map.Entry<*, *> - } -} - -/** - * Also, add a [ListIterator] serializer to avoid more linked list issues. -*/ -object LinkedListItrSerializer : Serializer>() { - // Create a dummy list so that we can get the ListItr from it - // The element type of the list doesn't matter. The iterator is all we want - private val DUMMY_LIST = LinkedList(listOf(1)) - fun getListItr(): Any = DUMMY_LIST.listIterator() - - private val outerListField: Field = getListItr()::class.java.getDeclaredField("this$0").apply { isAccessible = true } - - override fun write(kryo: Kryo, output: Output, obj: ListIterator) { - kryo.writeClassAndObject(output, outerListField.get(obj)) - output.writeInt(obj.nextIndex()) - } - - override fun read(kryo: Kryo, input: Input, type: Class>): ListIterator { - val list = kryo.readClassAndObject(input) as LinkedList<*> - val index = input.readInt() - return list.listIterator(index) - } -} - - diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/DefaultKryoCustomizer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/DefaultKryoCustomizer.kt index 3f80ba5200..340def31ac 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/DefaultKryoCustomizer.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/DefaultKryoCustomizer.kt @@ -6,10 +6,8 @@ import com.esotericsoftware.kryo.SerializerFactory import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.serializers.ClosureSerializer -import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer import com.esotericsoftware.kryo.serializers.FieldSerializer import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy -import de.javakaffee.kryoserializers.ArraysAsListSerializer import de.javakaffee.kryoserializers.BitSetSerializer import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer import de.javakaffee.kryoserializers.guava.ImmutableListSerializer @@ -20,7 +18,6 @@ import de.javakaffee.kryoserializers.guava.ImmutableSortedSetSerializer import net.corda.core.contracts.ContractAttachment import net.corda.core.contracts.ContractClassName import net.corda.core.contracts.PrivacySalt -import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.SecureHash import net.corda.core.identity.PartyAndCertificate import net.corda.core.internal.AbstractAttachment @@ -40,29 +37,20 @@ import net.corda.core.utilities.toNonEmptySet import net.corda.serialization.internal.DefaultWhitelist import net.corda.serialization.internal.GeneratedAttachment import net.corda.serialization.internal.MutableClassWhitelist -import net.i2p.crypto.eddsa.EdDSAPrivateKey -import net.i2p.crypto.eddsa.EdDSAPublicKey -import org.bouncycastle.jcajce.provider.asymmetric.ec.BCECPrivateKey -import org.bouncycastle.jcajce.provider.asymmetric.ec.BCECPublicKey -import org.bouncycastle.jcajce.provider.asymmetric.rsa.BCRSAPrivateCrtKey -import org.bouncycastle.jcajce.provider.asymmetric.rsa.BCRSAPublicKey -import org.bouncycastle.pqc.jcajce.provider.sphincs.BCSphincs256PrivateKey -import org.bouncycastle.pqc.jcajce.provider.sphincs.BCSphincs256PublicKey import org.objenesis.instantiator.ObjectInstantiator import org.objenesis.strategy.InstantiatorStrategy import org.objenesis.strategy.StdInstantiatorStrategy import org.slf4j.Logger -import java.io.BufferedInputStream import java.io.ByteArrayOutputStream -import java.io.FileInputStream import java.io.InputStream +import java.lang.invoke.SerializedLambda import java.lang.reflect.Modifier.isPublic +import java.security.KeyPair import java.security.PrivateKey import java.security.PublicKey import java.security.cert.CertPath import java.security.cert.X509Certificate import java.util.* -import kotlin.collections.ArrayList object DefaultKryoCustomizer { private val serializationWhitelists: List by lazy { @@ -71,11 +59,7 @@ object DefaultKryoCustomizer { fun customize(kryo: Kryo, publicKeySerializer: Serializer = PublicKeySerializer): Kryo { return kryo.apply { - isRegistrationRequired = false references = true - // Needed because of https://github.com/EsotericSoftware/kryo/issues/864 - setOptimizedGenerics(false) - val defaultFactoryConfig = FieldSerializer.FieldSerializerConfig() // Take the safest route here and allow subclasses to have fields named the same as super classes. defaultFactoryConfig.extendedFieldNames = true @@ -83,78 +67,60 @@ object DefaultKryoCustomizer { // For checkpoints we still want all the synthetic fields. This allows inner classes to reference // their parents after deserialization. defaultFactoryConfig.ignoreSyntheticFields = false - kryo.setDefaultSerializer(SerializerFactory.FieldSerializerFactory(defaultFactoryConfig)) + setDefaultSerializer(SerializerFactory.FieldSerializerFactory(defaultFactoryConfig)) instantiatorStrategy = CustomInstantiatorStrategy() - addDefaultSerializer(Iterator::class.java, object : SerializerFactory.BaseSerializerFactory() { - override fun newSerializer(kryo: Kryo, type: Class<*>): IteratorSerializer { - val config = CompatibleFieldSerializer.CompatibleFieldSerializerConfig().apply { - ignoreSyntheticFields = false - extendedFieldNames = true - } - return IteratorSerializer(type, CompatibleFieldSerializer(kryo, type, config)) - } - }) - - // Required for HashCheckingStream (de)serialization. - // Note that return type should be specifically set to InputStream, otherwise it may not work, - // i.e. val aStream : InputStream = HashCheckingStream(...). + addDefaultSerializer(Iterator::class.java, IteratorSerializerFactory) addDefaultSerializer(InputStream::class.java, InputStreamSerializer) addDefaultSerializer(SerializeAsToken::class.java, SerializeAsTokenSerializer()) addDefaultSerializer(Logger::class.java, LoggerSerializer) addDefaultSerializer(X509Certificate::class.java, X509CertificateSerializer) + addDefaultSerializer(CertPath::class.java, CertPathSerializer) + addDefaultSerializer(PrivateKey::class.java, PrivateKeySerializer) + addDefaultSerializer(PublicKey::class.java, publicKeySerializer) // WARNING: reordering the registrations here will cause a change in the serialized form, since classes // with custom serializers get written as registration ids. This will break backwards-compatibility. // Please add any new registrations to the end. - addDefaultSerializer(LinkedHashMapIteratorSerializer.getIterator()::class.java.superclass, LinkedHashMapIteratorSerializer) - register(LinkedHashMapEntrySerializer.getEntry()::class.java, LinkedHashMapEntrySerializer) - register(LinkedListItrSerializer.getListItr()::class.java, LinkedListItrSerializer) - register(Arrays.asList("").javaClass, ArraysAsListSerializer()) + registerIfPackageOpen(linkedMapOf(1 to 1).entries.first()::class.java, { LinkedHashMapEntrySerializer }, fallbackWrite = false) register(LazyMappedList::class.java, LazyMappedListSerializer) register(SignedTransaction::class.java, SignedTransactionSerializer) register(WireTransaction::class.java, WireTransactionSerializer) register(SerializedBytes::class.java, SerializedBytesSerializer) - UnmodifiableCollectionsSerializer.registerSerializers(this) + if (Collections::class.java.isPackageOpen) { + UnmodifiableCollectionsSerializer.registerSerializers(this) + } else { + registerAsInaccessible(Collections.unmodifiableCollection(listOf("")).javaClass) + registerAsInaccessible(Collections.unmodifiableList(ArrayList()).javaClass) + registerAsInaccessible(Collections.unmodifiableList(LinkedList()).javaClass) + registerAsInaccessible(Collections.unmodifiableSet(HashSet()).javaClass) + registerAsInaccessible(Collections.unmodifiableSortedSet(TreeSet()).javaClass) + registerAsInaccessible(Collections.unmodifiableMap(HashMap()).javaClass) + registerAsInaccessible(Collections.unmodifiableSortedMap(TreeMap()).javaClass) + } ImmutableListSerializer.registerSerializers(this) ImmutableSetSerializer.registerSerializers(this) ImmutableSortedSetSerializer.registerSerializers(this) ImmutableMapSerializer.registerSerializers(this) ImmutableMultimapSerializer.registerSerializers(this) - // InputStream subclasses whitelisting, required for attachments. - register(BufferedInputStream::class.java, InputStreamSerializer) - register(Class.forName("sun.net.www.protocol.jar.JarURLConnection\$JarURLInputStream"), InputStreamSerializer) - register(PublicKey::class.java, publicKeySerializer) - register(PrivateKey::class.java, PrivateKeySerializer) - register(EdDSAPublicKey::class.java, publicKeySerializer) - register(EdDSAPrivateKey::class.java, PrivateKeySerializer) - register(CompositeKey::class.java, publicKeySerializer) // Using a custom serializer for compactness // Exceptions. We don't bother sending the stack traces as the client will fill in its own anyway. register(Array::class, read = { _, _ -> emptyArray() }, write = { _, _, _ -> }) // This ensures a NonEmptySetSerializer is constructed with an initial value. register(NonEmptySet::class.java, NonEmptySetSerializer) register(BitSet::class.java, BitSetSerializer()) register(Class::class.java, ClassSerializer) - register(FileInputStream::class.java, InputStreamSerializer) - register(CertPath::class.java, CertPathSerializer) - register(BCECPrivateKey::class.java, PrivateKeySerializer) - register(BCECPublicKey::class.java, publicKeySerializer) - register(BCRSAPrivateCrtKey::class.java, PrivateKeySerializer) - register(BCRSAPublicKey::class.java, publicKeySerializer) - register(BCSphincs256PrivateKey::class.java, PrivateKeySerializer) - register(BCSphincs256PublicKey::class.java, publicKeySerializer) register(NotaryChangeWireTransaction::class.java, NotaryChangeWireTransactionSerializer) register(PartyAndCertificate::class.java, PartyAndCertificateSerializer) // Don't deserialize PrivacySalt via its default constructor. register(PrivacySalt::class.java, PrivacySaltSerializer) - + register(KeyPair::class.java, KeyPairSerializer) // Used by the remote verifier, and will possibly be removed in future. register(ContractAttachment::class.java, ContractAttachmentSerializer) - register(java.lang.invoke.SerializedLambda::class.java) + registerIfPackageOpen(SerializedLambda::class.java, fallbackWrite = false) register(ClosureSerializer.Closure::class.java, CordaClosureSerializer) register(ContractUpgradeWireTransaction::class.java, ContractUpgradeWireTransactionSerializer) register(ContractUpgradeFilteredTransaction::class.java, ContractUpgradeFilteredTransactionSerializer) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/IteratorSerializer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/IteratorSerializer.kt deleted file mode 100644 index d618251e37..0000000000 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/IteratorSerializer.kt +++ /dev/null @@ -1,52 +0,0 @@ -package net.corda.nodeapi.internal.serialization.kryo - -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.Serializer -import com.esotericsoftware.kryo.io.Input -import com.esotericsoftware.kryo.io.Output -import java.lang.reflect.Field - -class IteratorSerializer(type: Class<*>, private val serializer: Serializer>) : Serializer>(false, false) { - - private val iterableReferenceField = findField(type, "this\$0")?.apply { isAccessible = true } - private val expectedModCountField = findField(type, "expectedModCount")?.apply { isAccessible = true } - private val iterableReferenceFieldType = iterableReferenceField?.type - private val modCountField = when (iterableReferenceFieldType) { - null -> null - else -> findField(iterableReferenceFieldType, "modCount")?.apply { isAccessible = true } - } - - override fun write(kryo: Kryo, output: Output, obj: Iterator<*>) { - serializer.write(kryo, output, obj) - } - - override fun read(kryo: Kryo, input: Input, type: Class>): Iterator<*> { - val iterator = serializer.read(kryo, input, type) - return fixIterator(iterator) - } - - private fun fixIterator(iterator: Iterator<*>) : Iterator<*> { - - // Set expectedModCount of iterator - val iterableInstance = iterableReferenceField?.get(iterator) ?: return iterator - val modCountValue = modCountField?.getInt(iterableInstance) ?: return iterator - expectedModCountField?.setInt(iterator, modCountValue) - - return iterator - } - - /** - * Find field in clazz or any superclass - */ - private fun findField(clazz: Class<*>, fieldName: String): Field? { - return clazz.declaredFields.firstOrNull { x -> x.name == fieldName } ?: when { - clazz.superclass != null -> { - // Look in superclasses - findField(clazz.superclass, fieldName) - } - else -> null // Not found - } - } -} - - diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/IteratorSerializerFactory.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/IteratorSerializerFactory.kt new file mode 100644 index 0000000000..cdbc85f1ad --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/IteratorSerializerFactory.kt @@ -0,0 +1,64 @@ +package net.corda.nodeapi.internal.serialization.kryo + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.Serializer +import com.esotericsoftware.kryo.SerializerFactory +import com.esotericsoftware.kryo.io.Input +import com.esotericsoftware.kryo.io.Output +import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer +import net.corda.core.internal.fullyQualifiedPackage +import java.util.Collections +import java.util.LinkedList + +object IteratorSerializerFactory : SerializerFactory.BaseSerializerFactory>() { + private val linkedListListIteratorClass = LinkedList().listIterator()::class.java + + override fun newSerializer(kryo: Kryo, type: Class<*>): Serializer> { + return when { + !type.isPackageOpen -> FallbackEmptyIteratorSerializer + type == linkedListListIteratorClass -> LinkedListListIteratorSerializer + else -> { + val config = CompatibleFieldSerializer.CompatibleFieldSerializerConfig().apply { + ignoreSyntheticFields = false + extendedFieldNames = true + } + CompatibleFieldSerializer(kryo, type, config) + } + } + } + + private object LinkedListListIteratorSerializer : Serializer>() { + private val outerListField = linkedListListIteratorClass.getDeclaredField("this$0").apply { isAccessible = true } + + override fun write(kryo: Kryo, output: Output, obj: ListIterator<*>) { + kryo.writeClassAndObject(output, outerListField.get(obj)) + output.writeInt(obj.nextIndex()) + } + + override fun read(kryo: Kryo, input: Input, type: Class>): ListIterator<*> { + val list = kryo.readClassAndObject(input) as LinkedList<*> + val index = input.readInt() + return list.listIterator(index) + } + } + + private object FallbackEmptyIteratorSerializer : Serializer>() { + override fun write(kryo: Kryo, output: Output, obj: Iterator<*>) { + val hasNext = obj.hasNext() + output.writeBoolean(hasNext) + } + + override fun read(kryo: Kryo, input: Input, type: Class>): Iterator<*> { + val hasNext = input.readBoolean() + if (hasNext) { + throw UnsupportedOperationException("Restoring checkpoints containing iterators is not supported in this test environment. " + + "If you wish to restore these checkpoints in your tests then use the out-of-process node driver, or add " + + "--add-opens=${type.fullyQualifiedPackage}=ALL-UNNAMED to the test JVM args.") + } else { + // If the iterator didn't have any elements left (which can happen commonly when iterating over a singleton collection) then + // there's no need to make a fuss. We can return an empty iterator and move on. + return Collections.emptyIterator() + } + } + } +} diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/Kryo.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/Kryo.kt index 6cd1015085..20ece864fc 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/Kryo.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/Kryo.kt @@ -1,8 +1,6 @@ package net.corda.nodeapi.internal.serialization.kryo -import com.esotericsoftware.kryo.ClassResolver import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.KryoException import com.esotericsoftware.kryo.Registration import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.SerializerFactory @@ -10,13 +8,13 @@ import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer import com.esotericsoftware.kryo.serializers.FieldSerializer -import com.esotericsoftware.kryo.util.MapReferenceResolver import net.corda.core.contracts.PrivacySalt import net.corda.core.crypto.Crypto import net.corda.core.crypto.DigestService import net.corda.core.crypto.SecureHash import net.corda.core.crypto.TransactionSignature import net.corda.core.internal.LazyMappedList +import net.corda.core.internal.fullyQualifiedPackage import net.corda.core.internal.uncheckedCast import net.corda.core.serialization.SerializeAsTokenContext import net.corda.core.serialization.SerializedBytes @@ -28,25 +26,21 @@ import net.corda.core.transactions.NotaryChangeWireTransaction import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.WireTransaction import net.corda.core.utilities.OpaqueBytes -import net.corda.core.utilities.SgxSupport +import net.corda.core.utilities.contextLogger import net.corda.serialization.internal.serializationContextKey import org.slf4j.Logger import org.slf4j.LoggerFactory import java.io.InputStream -import java.lang.reflect.InvocationTargetException +import java.security.KeyPair import java.security.PrivateKey import java.security.PublicKey import java.security.cert.CertPath import java.security.cert.CertificateFactory import java.security.cert.X509Certificate +import java.util.Collections +import java.util.concurrent.ConcurrentHashMap import javax.annotation.concurrent.ThreadSafe import kotlin.reflect.KClass -import kotlin.reflect.KMutableProperty -import kotlin.reflect.KParameter -import kotlin.reflect.full.memberProperties -import kotlin.reflect.full.primaryConstructor -import kotlin.reflect.jvm.isAccessible -import kotlin.reflect.jvm.javaType /** * Serialization utilities, using the Kryo framework with a custom serializer for immutable data classes and a dead @@ -72,94 +66,6 @@ object SerializedBytesSerializer : Serializer>() { } } -/** - * Serializes properties and deserializes by using the constructor. This assumes that all backed properties are - * set via the constructor and the class is immutable. - */ -class ImmutableClassSerializer(val klass: KClass) : Serializer() { - val props by lazy { klass.memberProperties.sortedBy { it.name } } - val propsByName by lazy { props.associateBy { it.name } } - val constructor by lazy { klass.primaryConstructor!! } - - init { - // Verify that this class is immutable (all properties are final). - // We disable this check inside SGX as the reflection blows up. - if (!SgxSupport.isInsideEnclave) { - props.forEach { - require(it !is KMutableProperty<*>) { "$it mutable property of class: ${klass} is unsupported" } - } - } - } - - // Just a utility to help us catch cases where nodes are running out of sync versions. - private fun hashParameters(params: List): Int { - return params.map { - (it.name ?: "") + it.index.toString() + it.type.javaType.typeName - }.hashCode() - } - - override fun write(kryo: Kryo, output: Output, obj: T) { - output.writeVarInt(constructor.parameters.size, true) - output.writeInt(hashParameters(constructor.parameters)) - for (param in constructor.parameters) { - val kProperty = propsByName[param.name!!]!! - kProperty.isAccessible = true - when (param.type.javaType.typeName) { - "int" -> output.writeVarInt(kProperty.get(obj) as Int, true) - "long" -> output.writeVarLong(kProperty.get(obj) as Long, true) - "short" -> output.writeShort(kProperty.get(obj) as Int) - "char" -> output.writeChar(kProperty.get(obj) as Char) - "byte" -> output.writeByte(kProperty.get(obj) as Byte) - "double" -> output.writeDouble(kProperty.get(obj) as Double) - "float" -> output.writeFloat(kProperty.get(obj) as Float) - "boolean" -> output.writeBoolean(kProperty.get(obj) as Boolean) - else -> try { - kryo.writeClassAndObject(output, kProperty.get(obj)) - } catch (e: Exception) { - throw IllegalStateException("Failed to serialize ${param.name} in ${klass.qualifiedName}", e) - } - } - } - } - - @Suppress("ComplexMethod") - override fun read(kryo: Kryo, input: Input, type: Class): T { - require(type.kotlin == klass) - val numFields = input.readVarInt(true) - val fieldTypeHash = input.readInt() - - // A few quick checks for data evolution. Note that this is not guaranteed to catch every problem! But it's - // good enough for a prototype. - if (numFields != constructor.parameters.size) - throw KryoException("Mismatch between number of constructor parameters and number of serialised fields " + - "for ${klass.qualifiedName} ($numFields vs ${constructor.parameters.size})") - if (fieldTypeHash != hashParameters(constructor.parameters)) - throw KryoException("Hashcode mismatch for parameter types for ${klass.qualifiedName}: unsupported type evolution has happened.") - - val args = arrayOfNulls(numFields) - var cursor = 0 - for (param in constructor.parameters) { - args[cursor++] = when (param.type.javaType.typeName) { - "int" -> input.readVarInt(true) - "long" -> input.readVarLong(true) - "short" -> input.readShort() - "char" -> input.readChar() - "byte" -> input.readByte() - "double" -> input.readDouble() - "float" -> input.readFloat() - "boolean" -> input.readBoolean() - else -> kryo.readClassAndObject(input) - } - } - // If the constructor throws an exception, pass it through instead of wrapping it. - return try { - constructor.call(*args) - } catch (e: InvocationTargetException) { - throw e.cause!! - } - } -} - // TODO This is a temporary inefficient serializer for sending InputStreams through RPC. This may be done much more // efficiently using Artemis's large message feature. object InputStreamSerializer : Serializer() { @@ -187,7 +93,7 @@ object InputStreamSerializer : Serializer() { chunks.add(chunk) } } - val flattened = ByteArray(chunks.sumBy { it.size }) + val flattened = ByteArray(chunks.sumOf { it.size }) var offset = 0 for (chunk in chunks) { System.arraycopy(chunk, 0, flattened, offset, chunk.size) @@ -198,16 +104,6 @@ object InputStreamSerializer : Serializer() { } -inline fun Kryo.useClassLoader(cl: ClassLoader, body: () -> T): T { - val tmp = this.classLoader ?: ClassLoader.getSystemClassLoader() - this.classLoader = cl - try { - return body() - } finally { - this.classLoader = tmp - } -} - fun Output.writeBytesWithLength(byteArray: ByteArray) { this.writeInt(byteArray.size, true) this.writeBytes(byteArray) @@ -301,8 +197,8 @@ object PrivateKeySerializer : Serializer() { } override fun read(kryo: Kryo, input: Input, type: Class): PrivateKey { - val A = input.readBytesWithLength() - return Crypto.decodePrivateKey(A) + val encodedKey = input.readBytesWithLength() + return Crypto.decodePrivateKey(encodedKey) } } @@ -315,63 +211,8 @@ object PublicKeySerializer : Serializer() { } override fun read(kryo: Kryo, input: Input, type: Class): PublicKey { - val A = input.readBytesWithLength() - return Crypto.decodePublicKey(A) - } -} - -/** - * Helper function for reading lists with number of elements at the beginning. - * @param minLen minimum number of elements we expect for list to include, defaults to 1 - * @param expectedLen expected length of the list, defaults to null if arbitrary length list read - */ -inline fun readListOfLength(kryo: Kryo, input: Input, minLen: Int = 1, expectedLen: Int? = null): List { - val elemCount = input.readInt() - if (elemCount < minLen) throw KryoException("Cannot deserialize list, too little elements. Minimum required: $minLen, got: $elemCount") - if (expectedLen != null && elemCount != expectedLen) - throw KryoException("Cannot deserialize list, expected length: $expectedLen, got: $elemCount.") - return (1..elemCount).map { kryo.readClassAndObject(input) as T } -} - -/** - * We need to disable whitelist checking during calls from our Kryo code to register a serializer, since it checks - * for existing registrations and then will enter our [CordaClassResolver.getRegistration] method. - */ -open class CordaKryo(classResolver: ClassResolver) : Kryo(classResolver, MapReferenceResolver()) { - override fun register(type: Class<*>?): Registration { - (classResolver as? CordaClassResolver)?.disableWhitelist() - try { - return super.register(type) - } finally { - (classResolver as? CordaClassResolver)?.enableWhitelist() - } - } - - override fun register(type: Class<*>?, id: Int): Registration { - (classResolver as? CordaClassResolver)?.disableWhitelist() - try { - return super.register(type, id) - } finally { - (classResolver as? CordaClassResolver)?.enableWhitelist() - } - } - - override fun register(type: Class<*>?, serializer: Serializer<*>?): Registration { - (classResolver as? CordaClassResolver)?.disableWhitelist() - try { - return super.register(type, serializer) - } finally { - (classResolver as? CordaClassResolver)?.enableWhitelist() - } - } - - override fun register(registration: Registration?): Registration { - (classResolver as? CordaClassResolver)?.disableWhitelist() - try { - return super.register(registration) - } finally { - (classResolver as? CordaClassResolver)?.enableWhitelist() - } + val encodedKey = input.readBytesWithLength() + return Crypto.decodePublicKey(encodedKey) } } @@ -388,23 +229,60 @@ inline fun Kryo.register( ) } +internal val Class<*>.isPackageOpen: Boolean get() = module.isOpen(packageName, KryoCheckpointSerializer::class.java.module) + /** - * Use this method to mark any types which can have the same instance within it more than once. This will make sure - * the serialised form is stable across multiple serialise-deserialise cycles. Using this on a type with internal cyclic - * references will throw a stack overflow exception during serialisation. + * */ -inline fun Kryo.noReferencesWithin() { - register(T::class.java, NoReferencesSerializer(getSerializer(T::class.java))) +fun Kryo.registerIfPackageOpen(type: Class<*>, createSerializer: () -> Serializer<*>, fallbackWrite: Boolean = true) { + val serializer = if (type.isPackageOpen) createSerializer() else serializerForInaccesible(type, fallbackWrite) + register(type, serializer) } -class NoReferencesSerializer(private val baseSerializer: Serializer) : Serializer() { +/** + * + */ +fun Kryo.registerIfPackageOpen(type: Class<*>, fallbackWrite: Boolean = true) { + if (type.isPackageOpen) { + register(type) + } else { + registerAsInaccessible(type, fallbackWrite) + } +} - override fun read(kryo: Kryo, input: Input, type: Class): T { - return kryo.withoutReferences { baseSerializer.read(kryo, input, type) } +/** + * + */ +fun Kryo.registerAsInaccessible(type: Class<*>, fallbackWrite: Boolean = true) { + register(type, serializerForInaccesible(type, fallbackWrite)) +} + +private fun Kryo.serializerForInaccesible(type: Class<*>, fallbackWrite: Boolean = true): Serializer<*> { + // Find the most specific serializer already registered to use for writing. This will be useful to make sure as much of the object + // graph is serialised and covered in the writing phase. + return InaccessibleSerializer(if (fallbackWrite) getSerializer(type) else null) +} + + +private class InaccessibleSerializer(private val fallbackWrite: Serializer? = null) : Serializer() { + companion object { + private val logger = contextLogger() + private val typesLogged = Collections.newSetFromMap>(ConcurrentHashMap()) } override fun write(kryo: Kryo, output: Output, obj: T) { - kryo.withoutReferences { baseSerializer.write(kryo, output, obj) } + val type = obj.javaClass + if (typesLogged.add(type)) { + logger.warn("${type.fullyQualifiedPackage} is not open to this test environment and so ${type.name} objects are not " + + "supported in checkpoints. This will most likely not be an issue unless checkpoints are restored.") + } + fallbackWrite?.write(kryo, output, obj) + } + + override fun read(kryo: Kryo, input: Input, type: Class): T { + throw UnsupportedOperationException("Restoring checkpoints containing ${type.name} objects is not supported in this test " + + "environment. If you wish to restore these checkpoints in your tests then use the out-of-process node driver, or add " + + "--add-opens=${type.fullyQualifiedPackage}=ALL-UNNAMED to the test JVM args.") } } @@ -490,7 +368,8 @@ class ThrowableSerializer(kryo: Kryo, type: Class) : Serializer } } - private val delegate: Serializer = uncheckedCast(SerializerFactory.ReflectionSerializerFactory.newSerializer(kryo, FieldSerializer::class.java, type)) as Serializer + @Suppress("UNCHECKED_CAST") + private val delegate: Serializer = SerializerFactory.ReflectionSerializerFactory.newSerializer(kryo, FieldSerializer::class.java, type) as Serializer override fun write(kryo: Kryo, output: Output, throwable: Throwable) { delegate.write(kryo, output, throwable) @@ -509,9 +388,22 @@ class ThrowableSerializer(kryo: Kryo, type: Class) : Serializer /** For serializing the utility [LazyMappedList]. It will serialize the fully resolved object.*/ @ThreadSafe -@SuppressWarnings("ALL") object LazyMappedListSerializer : Serializer>() { // Using a MutableList so that Kryo will always write an instance of java.util.ArrayList. override fun write(kryo: Kryo, output: Output, obj: List<*>) = kryo.writeClassAndObject(output, obj.toMutableList()) override fun read(kryo: Kryo, input: Input, type: Class>) = kryo.readClassAndObject(input) as? List<*> } + +object KeyPairSerializer : Serializer() { + override fun write(kryo: Kryo, output: Output, obj: KeyPair) { + kryo.writeObject(output, obj.public) + kryo.writeObject(output, obj.private) + } + + override fun read(kryo: Kryo, input: Input, type: Class): KeyPair { + return KeyPair( + kryo.readObject(input, PublicKey::class.java), + kryo.readObject(input, PrivateKey::class.java) + ) + } +} diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointSerializer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointSerializer.kt index dd67326e53..a61bc7c8ee 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointSerializer.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointSerializer.kt @@ -1,13 +1,21 @@ package net.corda.nodeapi.internal.serialization.kryo import co.paralleluniverse.fibers.Fiber +import co.paralleluniverse.io.serialization.kryo.CollectionsSetFromMapSerializer +import co.paralleluniverse.io.serialization.kryo.ExternalizableKryoSerializer +import co.paralleluniverse.io.serialization.kryo.JdkProxySerializer import co.paralleluniverse.io.serialization.kryo.KryoSerializer +import co.paralleluniverse.io.serialization.kryo.ReferenceSerializer import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.KryoException import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.serializers.ClosureSerializer +import com.esotericsoftware.kryo.serializers.DefaultSerializers +import com.esotericsoftware.kryo.util.MapReferenceResolver +import de.javakaffee.kryoserializers.GregorianCalendarSerializer +import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer import net.corda.core.internal.uncheckedCast import net.corda.core.serialization.CheckpointCustomSerializer import net.corda.core.serialization.ClassWhitelist @@ -25,7 +33,24 @@ import net.corda.serialization.internal.CordaSerializationMagic import net.corda.serialization.internal.QuasarWhitelist import net.corda.serialization.internal.SectionId import net.corda.serialization.internal.encodingNotPermittedFormat +import java.io.Externalizable +import java.lang.ref.Reference +import java.lang.reflect.InvocationHandler +import java.net.URI +import java.util.Collections +import java.util.EnumMap +import java.util.EnumSet +import java.util.GregorianCalendar +import java.util.LinkedList +import java.util.TreeMap +import java.util.TreeSet +import java.util.UUID import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Future +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong +import java.util.regex.Pattern val kryoMagic = CordaSerializationMagic("corda".toByteArray() + byteArrayOf(0, 0)) @@ -40,20 +65,27 @@ private object AutoCloseableSerialisationDetector : Serializer() override fun read(kryo: Kryo, input: Input, type: Class) = throw IllegalStateException("Should not reach here!") } +private object FutureSerialisationDetector : Serializer>() { + override fun write(kryo: Kryo, output: Output, future: Future<*>) { + val message = "${future.javaClass.name}, which is a Future, has been detected during flow checkpointing. " + + "Restoring Futures across node restarts is not supported. Make sure code accessing it is " + + "confined to a private method or the reference is nulled out." + throw UnsupportedOperationException(message) + } + + override fun read(kryo: Kryo, input: Input, type: Class>) = throw IllegalStateException("Should not reach here!") +} + object KryoCheckpointSerializer : CheckpointSerializer { private val kryoPoolsForContexts = ConcurrentHashMap>>, KryoPool>() + private fun getPool(context: CheckpointSerializationContext): KryoPool { return kryoPoolsForContexts.computeIfAbsent(Triple(context.whitelist, context.deserializationClassLoader, context.checkpointCustomSerializers)) { KryoPool { - val classResolver = CordaClassResolver(context) - val serializer = Fiber.getFiberSerializer(classResolver,false) as KryoSerializer - // TODO The ClassResolver can only be set in the Kryo constructor and Quasar doesn't provide us with a way of doing that - val field = Kryo::class.java.getDeclaredField("classResolver").apply { isAccessible = true } + val serializer = createFiberSerializer(context) serializer.kryo.apply { - field.set(this, classResolver) - // don't allow overriding the public key serializer for checkpointing - DefaultKryoCustomizer.customize(this) addDefaultSerializer(AutoCloseable::class.java, AutoCloseableSerialisationDetector) + addDefaultSerializer(Future::class.java, FutureSerialisationDetector) register(ClosureSerializer.Closure::class.java, CordaClosureSerializer) classLoader = it.second @@ -62,12 +94,75 @@ object KryoCheckpointSerializer : CheckpointSerializer { warnAboutDuplicateSerializers(customSerializers) val classToSerializer = mapInputClassToCustomSerializer(context.deserializationClassLoader, customSerializers) addDefaultCustomSerializers(this, classToSerializer) - referenceResolver + registerCommonClasses(this) } } } } + fun createFiberSerializer(context: CheckpointSerializationContext): KryoSerializer { +// val serializer = Fiber.getFiberSerializer(classResolver, false) as KryoSerializer + // (this as ReplaceableObjectKryo).isIgnoreInaccessibleClasses = true + val kryo = Kryo(CordaClassResolver(context), MapReferenceResolver()) + kryo.isRegistrationRequired = false + // Needed because of https://github.com/EsotericSoftware/kryo/issues/864 + kryo.setOptimizedGenerics(false) + DefaultKryoCustomizer.customize(kryo) + return Fiber.getFiberSerializer(kryo, false) as KryoSerializer + } + + /** + * Copy of [co.paralleluniverse.io.serialization.kryo.KryoUtil.registerCommonClasses] ... + */ + fun registerCommonClasses(kryo: Kryo) { + kryo.register(BooleanArray::class.java) + kryo.register(ByteArray::class.java) + kryo.register(ShortArray::class.java) + kryo.register(CharArray::class.java) + kryo.register(IntArray::class.java) + kryo.register(FloatArray::class.java) + kryo.register(LongArray::class.java) + kryo.register(DoubleArray::class.java) + kryo.register(Array::class.java) + kryo.register(Array::class.java) + kryo.register(ArrayList::class.java) + kryo.register(LinkedList::class.java) + kryo.register(HashMap::class.java) + kryo.register(LinkedHashMap::class.java) + kryo.register(TreeMap::class.java) + kryo.register(EnumMap::class.java) + kryo.register(HashSet::class.java) + kryo.register(LinkedHashSet::class.java) + kryo.register(TreeSet::class.java) + kryo.register(EnumSet::class.java) + + kryo.registerIfPackageOpen(Collections.newSetFromMap(emptyMap()).javaClass, ::CollectionsSetFromMapSerializer) + if (GregorianCalendar::class.java.isPackageOpen) { + // If possible register a more efficient serializer for GregorianCalendar, otherwise a default serializer is already registered. + kryo.register(GregorianCalendar::class.java, GregorianCalendarSerializer()) + } + kryo.register(InvocationHandler::class.java, JdkProxySerializer()) + if (Collections::class.java.isPackageOpen) { + SynchronizedCollectionsSerializer.registerSerializers(kryo) + } else { + kryo.registerAsInaccessible(Collections.synchronizedCollection(listOf(1)).javaClass) + kryo.registerAsInaccessible(Collections.synchronizedList(ArrayList()).javaClass) + kryo.registerAsInaccessible(Collections.synchronizedList(LinkedList()).javaClass) + kryo.registerAsInaccessible(Collections.synchronizedSet(HashSet()).javaClass) + kryo.registerAsInaccessible(Collections.synchronizedSortedSet(TreeSet()).javaClass) + kryo.registerAsInaccessible(Collections.synchronizedMap(HashMap()).javaClass) + kryo.registerAsInaccessible(Collections.synchronizedSortedMap(TreeMap()).javaClass) + } + kryo.addDefaultSerializer(Externalizable::class.java, ExternalizableKryoSerializer()) + kryo.addDefaultSerializer(Reference::class.java, ReferenceSerializer()) + kryo.addDefaultSerializer(URI::class.java, DefaultSerializers.URISerializer::class.java) + kryo.addDefaultSerializer(UUID::class.java, DefaultSerializers.UUIDSerializer::class.java) + kryo.addDefaultSerializer(AtomicBoolean::class.java, DefaultSerializers.AtomicBooleanSerializer::class.java) + kryo.addDefaultSerializer(AtomicInteger::class.java, DefaultSerializers.AtomicIntegerSerializer::class.java) + kryo.addDefaultSerializer(AtomicLong::class.java, DefaultSerializers.AtomicLongSerializer::class.java) + kryo.addDefaultSerializer(Pattern::class.java, DefaultSerializers.PatternSerializer::class.java) + } + /** * Returns a sorted list of CustomSerializerCheckpointAdaptor based on the custom serializers inside context. * diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoStreams.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoStreams.kt index 80591ebc07..e60e4aaf28 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoStreams.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoStreams.kt @@ -13,7 +13,7 @@ import java.io.SequenceInputStream private val serializationBufferPool = LazyPool( newInstance = { ByteArray(64 * 1024) }) -internal fun kryoInput(underlying: InputStream, task: Input.() -> T): T { +fun kryoInput(underlying: InputStream, task: Input.() -> T): T { return serializationBufferPool.run { Input(it).use { input -> input.inputStream = underlying @@ -22,7 +22,7 @@ internal fun kryoInput(underlying: InputStream, task: Input.() -> T): T { } } -internal fun kryoOutput(task: Output.() -> T): ByteArray { +fun kryoOutput(task: Output.() -> T): ByteArray { return byteArrayOutput { underlying -> serializationBufferPool.run { Output(it).use { output -> @@ -33,11 +33,11 @@ internal fun kryoOutput(task: Output.() -> T): ByteArray { } } -internal fun Output.substitute(transform: (OutputStream) -> OutputStream) { +fun Output.substitute(transform: (OutputStream) -> OutputStream) { flush() outputStream = transform(outputStream) } -internal fun Input.substitute(transform: (InputStream) -> InputStream) { +fun Input.substitute(transform: (InputStream) -> InputStream) { inputStream = transform(SequenceInputStream(buffer.copyOfRange(position(), limit()).inputStream(), inputStream)) } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/LinkedHashMapEntrySerializer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/LinkedHashMapEntrySerializer.kt new file mode 100644 index 0000000000..faf43b3dde --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/LinkedHashMapEntrySerializer.kt @@ -0,0 +1,36 @@ +package net.corda.nodeapi.internal.serialization.kryo + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.Serializer +import com.esotericsoftware.kryo.io.Input +import com.esotericsoftware.kryo.io.Output + +/** + * The [LinkedHashMap] and [LinkedHashSet] have a problem with the default Quasar/Kryo serialisation + * in that serialising an iterator (and subsequent [LinkedHashMap.Entry]) over a sufficiently large + * data set can lead to a stack overflow (because the object map is traversed recursively). + * + * We've added our own custom serializer in order to ensure that only the key/value are recorded. + * The rest of the list isn't required at this scope. + */ +object LinkedHashMapEntrySerializer : Serializer>() { + // Create a dummy map so that we can get the LinkedHashMap$Entry from it + // The element type of the map doesn't matter. The entry is all we want + private val constructor = linkedMapOf(1L to 1).entries.first()::class.java.declaredConstructors.single().apply { isAccessible = true } + + /** + * Kryo would end up serialising "this" entry, then serialise "this.after" recursively, leading to a very large stack. + * we'll skip that and just write out the key/value + */ + override fun write(kryo: Kryo, output: Output, obj: Map.Entry<*, *>) { + val e: Map.Entry<*, *> = obj + kryo.writeClassAndObject(output, e.key) + kryo.writeClassAndObject(output, e.value) + } + + override fun read(kryo: Kryo, input: Input, type: Class>): Map.Entry<*, *> { + val key = kryo.readClassAndObject(input) + val value = kryo.readClassAndObject(input) + return constructor.newInstance(0, key, value, null) as Map.Entry<*, *> + } +} diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/ArrayListItrConcurrentModificationException.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/ArrayListItrConcurrentModificationException.kt deleted file mode 100644 index 0543e7b4eb..0000000000 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/ArrayListItrConcurrentModificationException.kt +++ /dev/null @@ -1,122 +0,0 @@ -package net.corda.nodeapi.internal.serialization.kryo - -import org.mockito.kotlin.doReturn -import org.mockito.kotlin.whenever -import net.corda.core.serialization.EncodingWhitelist -import net.corda.core.serialization.internal.CheckpointSerializationContext -import net.corda.core.serialization.internal.checkpointDeserialize -import net.corda.core.serialization.internal.checkpointSerialize -import net.corda.coretesting.internal.rigorousMock -import net.corda.serialization.internal.AllWhitelist -import net.corda.serialization.internal.CheckpointSerializationContextImpl -import net.corda.serialization.internal.CordaSerializationEncoding -import net.corda.testing.core.internal.CheckpointSerializationEnvironmentRule -import org.assertj.core.api.Assertions.assertThat -import org.junit.Before -import org.junit.Rule -import org.junit.Test -import org.junit.runner.RunWith -import org.junit.runners.Parameterized -import org.junit.runners.Parameterized.Parameters -import java.util.* -import kotlin.collections.ArrayList -import kotlin.collections.HashMap -import kotlin.collections.HashSet -import kotlin.collections.LinkedHashMap -import kotlin.collections.LinkedHashSet - -@RunWith(Parameterized::class) -class ArrayListItrConcurrentModificationException(private val compression: CordaSerializationEncoding?) { - companion object { - @Parameters(name = "{0}") - @JvmStatic - fun compression() = arrayOf(null) + CordaSerializationEncoding.values() - } - - @get:Rule - val serializationRule = CheckpointSerializationEnvironmentRule(inheritable = true) - private lateinit var context: CheckpointSerializationContext - - @Before - fun setup() { - context = CheckpointSerializationContextImpl( - deserializationClassLoader = javaClass.classLoader, - whitelist = AllWhitelist, - properties = emptyMap(), - objectReferencesEnabled = true, - encoding = compression, - encodingWhitelist = rigorousMock().also { - if (compression != null) doReturn(true).whenever(it).acceptEncoding(compression) - }) - } - - @Test(timeout=300_000) - fun `ArrayList iterator can checkpoint without error`() { - runTestWithCollection(ArrayList()) - } - - @Test(timeout=300_000) - fun `HashSet iterator can checkpoint without error`() { - runTestWithCollection(HashSet()) - } - - @Test(timeout=300_000) - fun `LinkedHashSet iterator can checkpoint without error`() { - runTestWithCollection(LinkedHashSet()) - } - - @Test(timeout=300_000) - fun `HashMap iterator can checkpoint without error`() { - runTestWithCollection(HashMap()) - } - - @Test(timeout=300_000) - fun `LinkedHashMap iterator can checkpoint without error`() { - runTestWithCollection(LinkedHashMap()) - } - - @Test(timeout=300_000) - fun `LinkedList iterator can checkpoint without error`() { - runTestWithCollection(LinkedList()) - } - - private data class TestCheckpoint(val list: C, val iterator: I) - - private fun runTestWithCollection(collection: MutableCollection) { - - for (i in 1..100) { - collection.add(i) - } - - val iterator = collection.iterator() - iterator.next() - - val checkpoint = TestCheckpoint(collection, iterator) - - val serializedBytes = checkpoint.checkpointSerialize(context) - val deserializedCheckpoint = serializedBytes.checkpointDeserialize(context) - - assertThat(deserializedCheckpoint.list).isEqualTo(collection) - assertThat(deserializedCheckpoint.iterator.next()).isEqualTo(2) - assertThat(deserializedCheckpoint.iterator.hasNext()).isTrue() - } - - private fun runTestWithCollection(collection: MutableMap) { - - for (i in 1..100) { - collection[i] = i - } - - val iterator = collection.iterator() - iterator.next() - - val checkpoint = TestCheckpoint(collection, iterator) - - val serializedBytes = checkpoint.checkpointSerialize(context) - val deserializedCheckpoint = serializedBytes.checkpointDeserialize(context) - - assertThat(deserializedCheckpoint.list).isEqualTo(collection) - assertThat(deserializedCheckpoint.iterator.next().key).isEqualTo(2) - assertThat(deserializedCheckpoint.iterator.hasNext()).isTrue() - } -} diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointTest.kt deleted file mode 100644 index e0906294b4..0000000000 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoCheckpointTest.kt +++ /dev/null @@ -1,171 +0,0 @@ -package net.corda.nodeapi.internal.serialization.kryo - -import org.junit.Ignore -import org.junit.Test -import org.junit.jupiter.api.assertDoesNotThrow -import java.util.LinkedList -import kotlin.test.assertEquals - -class KryoCheckpointTest { - - private val testSize = 1000L - - /** - * This test just ensures that the checkpoints still work in light of [LinkedHashMapEntrySerializer]. - */ - @Test(timeout=300_000) - fun `linked hash map can checkpoint without error`() { - var lastKey = "" - val dummyMap = linkedMapOf() - for (i in 0..testSize) { - dummyMap[i.toString()] = i - } - var it = dummyMap.iterator() - while (it.hasNext()) { - lastKey = it.next().key - val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT) - it = KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT) - } - assertEquals(testSize.toString(), lastKey) - } - - @Test(timeout=300_000) - fun `empty linked hash map can checkpoint without error`() { - val dummyMap = linkedMapOf() - val it = dummyMap.iterator() - val itKeys = dummyMap.keys.iterator() - val itValues = dummyMap.values.iterator() - val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT) - val bytesKeys = KryoCheckpointSerializer.serialize(itKeys, KRYO_CHECKPOINT_CONTEXT) - val bytesValues = KryoCheckpointSerializer.serialize(itValues, KRYO_CHECKPOINT_CONTEXT) - assertDoesNotThrow { - KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT) - KryoCheckpointSerializer.deserialize(bytesKeys, itKeys.javaClass, KRYO_CHECKPOINT_CONTEXT) - KryoCheckpointSerializer.deserialize(bytesValues, itValues.javaClass, KRYO_CHECKPOINT_CONTEXT) - } - } - - @Test(timeout=300_000) - fun `linked hash map with null values can checkpoint without error`() { - val dummyMap = linkedMapOf().apply { - put("foo", 2L) - put(null, null) - put("bar", 3L) - } - val it = dummyMap.iterator() - val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT) - - val itKeys = dummyMap.keys.iterator() - itKeys.next() - itKeys.next() - val bytesKeys = KryoCheckpointSerializer.serialize(itKeys, KRYO_CHECKPOINT_CONTEXT) - - val itValues = dummyMap.values.iterator() - val bytesValues = KryoCheckpointSerializer.serialize(itValues, KRYO_CHECKPOINT_CONTEXT) - - assertDoesNotThrow { - KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT) - val desItKeys = KryoCheckpointSerializer.deserialize(bytesKeys, itKeys.javaClass, KRYO_CHECKPOINT_CONTEXT) - assertEquals("bar", desItKeys.next()) - KryoCheckpointSerializer.deserialize(bytesValues, itValues.javaClass, KRYO_CHECKPOINT_CONTEXT) - } - } - - @Test(timeout=300_000) - fun `linked hash map keys can checkpoint without error`() { - var lastKey = "" - val dummyMap = linkedMapOf() - for (i in 0..testSize) { - dummyMap[i.toString()] = i - } - var it = dummyMap.keys.iterator() - while (it.hasNext()) { - lastKey = it.next() - val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT) - it = KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT) - } - assertEquals(testSize.toString(), lastKey) - } - - @Test(timeout=300_000) - fun `linked hash map values can checkpoint without error`() { - var lastValue = 0L - val dummyMap = linkedMapOf() - for (i in 0..testSize) { - dummyMap[i.toString()] = i - } - var it = dummyMap.values.iterator() - while (it.hasNext()) { - lastValue = it.next() - val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT) - it = KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT) - } - assertEquals(testSize, lastValue) - } - - @Test(timeout = 300_000) - fun `linked hash map values can checkpoint without error, even with repeats`() { - var lastValue = "0" - val dummyMap = linkedMapOf() - for (i in 0..testSize) { - dummyMap[i.toString()] = (i % 10).toString() - } - var it = dummyMap.values.iterator() - while (it.hasNext()) { - lastValue = it.next() - val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT) - it = KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT) - } - assertEquals((testSize % 10).toString(), lastValue) - } - - @Ignore("Kryo optimizes boxed primitives so this does not work. Need to customise ReferenceResolver to stop it doing it.") - @Test(timeout = 300_000) - fun `linked hash map values can checkpoint without error, even with repeats for boxed primitives`() { - var lastValue = 0L - val dummyMap = linkedMapOf() - for (i in 0..testSize) { - dummyMap[i.toString()] = (i % 10) - } - var it = dummyMap.values.iterator() - while (it.hasNext()) { - lastValue = it.next() - val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT) - it = KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT) - } - assertEquals(testSize % 10, lastValue) - } - - /** - * This test just ensures that the checkpoints still work in light of [LinkedHashMapEntrySerializer]. - */ - @Test(timeout=300_000) - fun `linked hash set can checkpoint without error`() { - var result: Any = 0L - val dummySet = linkedSetOf().apply { addAll(0..testSize) } - var it = dummySet.iterator() - while (it.hasNext()) { - result = it.next() - val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT) - it = KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT) - } - assertEquals(testSize, result) - } - - /** - * This test just ensures that the checkpoints still work in light of [LinkedListItrSerializer]. - */ - @Test(timeout=300_000) - fun `linked list can checkpoint without error`() { - var result: Any = 0L - val dummyList = LinkedList().apply { addAll(0..testSize) } - - var it = dummyList.iterator() - while (it.hasNext()) { - result = it.next() - val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT) - it = KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT) - } - assertEquals(testSize, result) - } -} diff --git a/node/capsule/src/main/resources/node-jvm-args.txt b/node/capsule/src/main/resources/node-jvm-args.txt index 21d6d9f829..549e0b3d6f 100644 --- a/node/capsule/src/main/resources/node-jvm-args.txt +++ b/node/capsule/src/main/resources/node-jvm-args.txt @@ -1,9 +1,3 @@ --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED ---add-opens=java.base/java.nio=ALL-UNNAMED ---add-opens=java.base/java.security=ALL-UNNAMED ---add-opens=java.base/java.security.cert=ALL-UNNAMED ---add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED ---add-opens=java.base/java.util.concurrent=ALL-UNNAMED ---add-opens=java.sql/java.sql=ALL-UNNAMED diff --git a/node/src/integration-test/kotlin/net/corda/node/CustomSerializationSchemeDriverTest.kt b/node/src/integration-test/kotlin/net/corda/node/CustomSerializationSchemeDriverTest.kt index 7a30f4840b..eba07c7089 100644 --- a/node/src/integration-test/kotlin/net/corda/node/CustomSerializationSchemeDriverTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/CustomSerializationSchemeDriverTest.kt @@ -5,7 +5,6 @@ import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy -import de.javakaffee.kryoserializers.ArraysAsListSerializer import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint import net.corda.core.contracts.BelongsToContract import net.corda.core.contracts.Contract @@ -58,7 +57,6 @@ import org.objenesis.strategy.StdInstantiatorStrategy import java.io.ByteArrayOutputStream import java.lang.reflect.Modifier import java.security.PublicKey -import java.util.Arrays import kotlin.test.assertEquals import kotlin.test.assertTrue @@ -305,8 +303,6 @@ class CustomSerializationSchemeDriverTest { kryo.isRegistrationRequired = false kryo.instantiatorStrategy = CustomInstantiatorStrategy() kryo.classLoader = classLoader - @Suppress("ReplaceJavaStaticMethodWithKotlinAnalog") - kryo.register(Arrays.asList("").javaClass, ArraysAsListSerializer()) } //Stolen from DefaultKryoCustomizer.kt diff --git a/node/src/test/kotlin/net/corda/node/internal/telemetry/TelemetryTests.kt b/node/src/integration-test/kotlin/net/corda/node/internal/telemetry/TelemetryTests.kt similarity index 100% rename from node/src/test/kotlin/net/corda/node/internal/telemetry/TelemetryTests.kt rename to node/src/integration-test/kotlin/net/corda/node/internal/telemetry/TelemetryTests.kt diff --git a/node/src/test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt similarity index 78% rename from node/src/test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt rename to node/src/integration-test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt index f4f10163ef..0b8147d3a6 100644 --- a/node/src/test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt @@ -6,10 +6,10 @@ import net.corda.core.flows.StartableByRPC import net.corda.core.internal.packageName import net.corda.core.messaging.startFlow import net.corda.core.schemas.MappedSchema -import net.corda.core.schemas.PersistentState import net.corda.core.utilities.getOrThrow import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.schema.NodeSchemaService.NodeCoreV1 +import net.corda.node.services.schema.PersistentStateServiceTests.TestSchema import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.driver import net.corda.testing.driver.internal.InProcessImpl @@ -17,11 +17,8 @@ import net.corda.testing.internal.vault.DummyLinearStateSchemaV1 import net.corda.testing.node.internal.InternalMockNetwork import net.corda.testing.node.internal.cordappsForPackages import net.corda.testing.node.internal.enclosedCordapp -import org.hibernate.annotations.Cascade -import org.hibernate.annotations.CascadeType import org.junit.Ignore import org.junit.Test -import javax.persistence.* import kotlin.test.assertEquals import kotlin.test.assertTrue @@ -115,34 +112,4 @@ class NodeSchemaServiceTest { return (this.serviceHub as ServiceHubInternal).schemaService.schemas.map { it.name } } } - - class SchemaFamily - - object TestSchema : MappedSchema(SchemaFamily::class.java, 1, setOf(Parent::class.java, Child::class.java)) { - @Entity - @Table(name = "Parents") - class Parent : PersistentState() { - @OneToMany(fetch = FetchType.LAZY) - @JoinColumns(JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id"), JoinColumn(name = "output_index", referencedColumnName = "output_index")) - @OrderColumn - @Cascade(CascadeType.PERSIST) - var children: MutableSet = mutableSetOf() - } - - @Suppress("unused") - @Entity - @Table(name = "Children") - class Child { - @Id - @GeneratedValue - @Column(name = "child_id", unique = true, nullable = false) - var childId: Int? = null - - @ManyToOne(fetch = FetchType.LAZY) - @JoinColumns(JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id"), JoinColumn(name = "output_index", referencedColumnName = "output_index")) - var parent: Parent? = null - } - } - } - diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowMetadataRecordingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowMetadataRecordingTest.kt similarity index 100% rename from node/src/test/kotlin/net/corda/node/services/statemachine/FlowMetadataRecordingTest.kt rename to node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowMetadataRecordingTest.kt diff --git a/node/src/test/kotlin/net/corda/notary/experimental/raft/RaftNotaryServiceTests.kt b/node/src/integration-test/kotlin/net/corda/notary/experimental/raft/RaftNotaryServiceTests.kt similarity index 100% rename from node/src/test/kotlin/net/corda/notary/experimental/raft/RaftNotaryServiceTests.kt rename to node/src/integration-test/kotlin/net/corda/notary/experimental/raft/RaftNotaryServiceTests.kt diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 68b08e951c..45a7732c4b 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -3,6 +3,7 @@ package net.corda.node.internal import co.paralleluniverse.fibers.instrument.Retransform import com.codahale.metrics.Gauge import com.codahale.metrics.MetricRegistry +import com.google.common.collect.ImmutableList import com.google.common.collect.MutableClassToInstanceMap import com.google.common.util.concurrent.MoreExecutors import com.zaxxer.hikari.pool.HikariPool @@ -210,6 +211,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, @Suppress("LeakingThis") private var tokenizableServices: MutableList? = mutableListOf(platformClock, this) + private var frozenTokenizableServices: List? = null val metricRegistry = MetricRegistry() protected val cacheFactory = cacheFactoryPrototype.bindWithConfig(configuration).bindWithMetrics(metricRegistry).tokenize() @@ -361,10 +363,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, private val nodeServicesContext = object : NodeServicesContext { override val platformVersion = versionInfo.platformVersion override val configurationWithOptions = configuration.configurationWithOptions - // Note: tokenizableServices passed by reference meaning that any subsequent modification to the content in the `AbstractNode` will - // be reflected in the context as well. However, since context only has access to immutable collection it can only read (but not modify) - // the content. - override val tokenizableServices: List = this@AbstractNode.tokenizableServices!! + override val tokenizableServices: List get() = this@AbstractNode.frozenTokenizableServices!! } private val nodeLifecycleEventsDistributor = NodeLifecycleEventsDistributor().apply { add(checkpointDumper) } @@ -483,6 +482,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, "Node's platform version is lower than network's required minimumPlatformVersion" } networkMapCache.start(netParams.notaries) + services.networkParameters = netParams database.transaction { networkParametersStorage.setCurrentParameters(signedNetParams, trustRoots) @@ -622,10 +622,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration, vaultService.start() ScheduledActivityObserver.install(vaultService, schedulerService, flowLogicRefFactory) - val frozenTokenizableServices = tokenizableServices!! + frozenTokenizableServices = ImmutableList.copyOf(tokenizableServices!!) tokenizableServices = null - verifyCheckpointsCompatible(frozenTokenizableServices) + verifyCheckpointsCompatible(frozenTokenizableServices!!) partyInfoCache.start() encryptionService.start(nodeInfo.legalIdentities[0]) @@ -634,7 +634,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, state machine manager from starting (just below this) until the service is ready. */ nodeLifecycleEventsDistributor.distributeEvent(NodeLifecycleEvent.BeforeStateMachineStart(nodeServicesContext)).get() - val callback = smm.start(frozenTokenizableServices) + val callback = smm.start(frozenTokenizableServices!!) val smmStartedFuture = rootFuture.map { callback() } // Shut down the SMM so no Fibers are scheduled. runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) } @@ -1205,8 +1205,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, override val attachmentsClassLoaderCache: AttachmentsClassLoaderCache get() = this@AbstractNode.attachmentsClassLoaderCache @Volatile - private lateinit var _networkParameters: NetworkParameters - override val networkParameters: NetworkParameters get() = _networkParameters + override lateinit var networkParameters: NetworkParameters init { this@AbstractNode.attachments.servicesForResolution = this @@ -1214,7 +1213,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, fun start(myInfo: NodeInfo, networkParameters: NetworkParameters) { this._myInfo = myInfo - this._networkParameters = networkParameters + this.networkParameters = networkParameters } override fun cordaService(type: Class): T { @@ -1296,7 +1295,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } override fun onNewNetworkParameters(networkParameters: NetworkParameters) { - this._networkParameters = networkParameters + this.networkParameters = networkParameters } override fun tryExternalVerification(stx: SignedTransaction, checkSufficientSignatures: Boolean): Boolean { diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt index 750ce86f73..85b1f68ef8 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt @@ -10,6 +10,7 @@ import net.corda.core.internal.ThreadBox import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.internal.concurrent.doneFuture +import net.corda.core.internal.toImmutableList import net.corda.core.messaging.DataFeed import net.corda.core.node.services.SignedTransactionWithStatus import net.corda.core.serialization.SerializationContext @@ -41,7 +42,6 @@ import org.hibernate.annotations.Type import rx.Observable import rx.subjects.PublishSubject import java.time.Instant -import java.util.Collections import javax.persistence.AttributeConverter import javax.persistence.Column import javax.persistence.Convert @@ -398,20 +398,20 @@ open class DBTransactionStorage(private val database: CordaPersistence, cacheFac } // Cache value type to just store the immutable bits of a signed transaction plus conversion helpers - internal class TxCacheValue( + private class TxCacheValue( val txBits: SerializedBytes, val sigs: List, val status: TransactionStatus ) { constructor(stx: SignedTransaction, status: TransactionStatus) : this( stx.txBits, - Collections.unmodifiableList(stx.sigs), + stx.sigs.toImmutableList(), status ) constructor(stx: SignedTransaction, status: TransactionStatus, sigs: List?) : this( stx.txBits, - if (sigs == null) Collections.unmodifiableList(stx.sigs) else Collections.unmodifiableList(stx.sigs + sigs).distinct(), + if (sigs == null) stx.sigs.toImmutableList() else stx.sigs.toMutableSet().apply { addAll(sigs) }.toImmutableList(), status ) fun toSignedTx() = SignedTransaction(txBits, sigs) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 50f2c046e0..7852c17a7b 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -32,11 +32,11 @@ import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.isIdempotentFlow import net.corda.core.internal.location -import net.corda.core.internal.toPath -import net.corda.core.internal.uncheckedCast import net.corda.core.internal.telemetry.ComponentTelemetryIds import net.corda.core.internal.telemetry.SerializedTelemetry import net.corda.core.internal.telemetry.telemetryServiceInternal +import net.corda.core.internal.toPath +import net.corda.core.internal.uncheckedCast import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.internal.CheckpointSerializationContext @@ -363,7 +363,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, errorAndTerminate("Caught unrecoverable error from flow. Forcibly terminating the JVM, this might leave resources open, and most likely will.", t) } logFlowError(t) - Try.Failure(t) + Try.Failure(t) } val softLocksId = if (softLockedStates.isNotEmpty()) logic.runId.uuid else null val finalEvent = when (resultOrError) { @@ -499,7 +499,6 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, permissionName, permissionGranted) serviceHub.auditService.recordAuditEvent(checkPermissionEvent) - @Suppress("ConstantConditionIf") if (!permissionGranted) { throw FlowPermissionException("User ${context.principal()} not permissioned for $permissionName on flow $id") } @@ -540,7 +539,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, val serializationContext = TransientReference(transientValues.checkpointSerializationContext) val transaction = extractThreadLocalTransaction() val telemetryIds = retrieveTelemetryIds() - parkAndSerialize { _, _ -> + parkAndCustomSerialize { _ -> setLoggingContext() logger.trace { "Suspended on $ioRequest" } diff --git a/node/src/test/kotlin/net/corda/node/services/schema/PersistentStateServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/schema/PersistentStateServiceTests.kt index 9f674c6233..cc6c23b926 100644 --- a/node/src/test/kotlin/net/corda/node/services/schema/PersistentStateServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/schema/PersistentStateServiceTests.kt @@ -1,25 +1,41 @@ package net.corda.node.services.schema -import net.corda.core.contracts.* +import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TransactionState import net.corda.core.crypto.SecureHash import net.corda.core.identity.AbstractParty import net.corda.core.identity.CordaX500Name import net.corda.core.schemas.MappedSchema import net.corda.core.schemas.PersistentState import net.corda.core.schemas.QueryableState +import net.corda.coretesting.internal.rigorousMock import net.corda.node.services.api.SchemaService -import net.corda.node.services.schema.NodeSchemaServiceTest.TestSchema import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.currentDBSession import net.corda.testing.contracts.DummyContract import net.corda.testing.core.TestIdentity import net.corda.testing.internal.LogHelper import net.corda.testing.internal.configureDatabase -import net.corda.coretesting.internal.rigorousMock import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties +import org.hibernate.annotations.Cascade +import org.hibernate.annotations.CascadeType import org.junit.After import org.junit.Before import org.junit.Test +import javax.persistence.Column +import javax.persistence.Entity +import javax.persistence.FetchType +import javax.persistence.GeneratedValue +import javax.persistence.Id +import javax.persistence.JoinColumn +import javax.persistence.JoinColumns +import javax.persistence.ManyToOne +import javax.persistence.OneToMany +import javax.persistence.OrderColumn +import javax.persistence.Table import kotlin.test.assertEquals class PersistentStateServiceTests { @@ -81,4 +97,32 @@ class PersistentStateServiceTests { database.close() } -} \ No newline at end of file + + class SchemaFamily + + object TestSchema : MappedSchema(SchemaFamily::class.java, 1, setOf(Parent::class.java, Child::class.java)) { + @Entity + @Table(name = "Parents") + class Parent : PersistentState() { + @OneToMany(fetch = FetchType.LAZY) + @JoinColumns(JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id"), JoinColumn(name = "output_index", referencedColumnName = "output_index")) + @OrderColumn + @Cascade(CascadeType.PERSIST) + var children: MutableSet = mutableSetOf() + } + + @Suppress("unused") + @Entity + @Table(name = "Children") + class Child { + @Id + @GeneratedValue + @Column(name = "child_id", unique = true, nullable = false) + var childId: Int? = null + + @ManyToOne(fetch = FetchType.LAZY) + @JoinColumns(JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id"), JoinColumn(name = "output_index", referencedColumnName = "output_index")) + var parent: Parent? = null + } + } +} diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowOperatorTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowOperatorTests.kt index 29d55aa2ce..e210d0f8d1 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowOperatorTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowOperatorTests.kt @@ -10,6 +10,9 @@ import net.corda.core.flows.InitiatingFlow import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.messaging.MessageRecipients +import net.corda.core.node.AppServiceHub +import net.corda.core.node.services.CordaService +import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.utilities.contextLogger import net.corda.core.utilities.seconds @@ -41,15 +44,15 @@ class FlowOperatorTests { val EUGENE_NAME = CordaX500Name("Eugene", "EugeneCorp", "GB") } - lateinit var mockNet: InternalMockNetwork - lateinit var aliceNode: TestStartedNode + private lateinit var mockNet: InternalMockNetwork + private lateinit var aliceNode: TestStartedNode private lateinit var aliceParty: Party - lateinit var bobNode: TestStartedNode + private lateinit var bobNode: TestStartedNode private lateinit var bobParty: Party - lateinit var charlieNode: TestStartedNode + private lateinit var charlieNode: TestStartedNode private lateinit var charlieParty: Party - lateinit var daveNode: TestStartedNode - lateinit var daveParty: Party + private lateinit var daveNode: TestStartedNode + private lateinit var daveParty: Party private lateinit var eugeneNode: TestStartedNode private lateinit var eugeneParty: Party @@ -216,8 +219,7 @@ class FlowOperatorTests { fun `mixed query should return all flows which are waiting for counter party to process`() { charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { AcceptingFlow("Hello", it) } - val future = CompletableFuture() - aliceNode.services.startFlow(ExternalAsyncOperationFlow(future)) + aliceNode.services.startFlow(ExternalAsyncOperationFlow()) val bobStart = aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(bobParty))) val daveStart = aliceNode.services.startFlow(GetFlowInfoFlow(listOf(daveParty))) charlieNode.services.startFlow(ReceiveFlow("Hello", listOf(charlieParty))) @@ -249,8 +251,7 @@ class FlowOperatorTests { @Test(timeout = 300_000) fun `query should return all flows which are waiting for counter party (the flow must have counter party) to process grouped by party`() { - val future = CompletableFuture() - aliceNode.services.startFlow(ExternalAsyncOperationFlow(future)) + aliceNode.services.startFlow(ExternalAsyncOperationFlow()) val bobStart = aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(bobParty))) val daveStart = aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(daveParty))) @@ -380,12 +381,11 @@ class FlowOperatorTests { @Test(timeout = 300_000) fun `query should return all flows which are waiting for async external operations`() { - val future = CompletableFuture() - val start = aliceNode.services.startFlow(ExternalAsyncOperationFlow(future)) + val start = aliceNode.services.startFlow(ExternalAsyncOperationFlow()) val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock) - executeTest(5.seconds, { future.complete("Hello") }) { + executeTest(5.seconds, { aliceNode.services.startFlow(CompleteFutureFlow("Hello")) }) { val result = cut.queryWaitingFlows(WaitingFlowQuery( waitingSources = mutableListOf(WaitingSource.EXTERNAL_OPERATION) )) // the list of counter parties must be empty to get any external operation @@ -400,12 +400,11 @@ class FlowOperatorTests { @Test(timeout = 300_000) fun `query should return all flows which are waiting for external operations`() { - val future = CompletableFuture() - val start = aliceNode.services.startFlow(ExternalOperationFlow(future)) + val start = aliceNode.services.startFlow(ExternalOperationFlow()) val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock) - executeTest(5.seconds, { future.complete("Hello") }) { + executeTest(5.seconds, { aliceNode.services.startFlow(CompleteFutureFlow("Hello")) }) { val result = cut.queryWaitingFlows(WaitingFlowQuery()) assertEquals(1, result.size) @@ -512,33 +511,46 @@ class FlowOperatorTests { } @InitiatingFlow - class ExternalAsyncOperationFlow(private val future: CompletableFuture) : FlowLogic() { + class ExternalAsyncOperationFlow : FlowLogic() { @Suspendable override fun call() { - await(ExternalOperation(future)) + await(ExternalOperation(serviceHub.cordaService(FutureService::class.java))) } - class ExternalOperation(private val future: CompletableFuture) : FlowExternalAsyncOperation { + class ExternalOperation(private val futureService: FutureService) : FlowExternalAsyncOperation { override fun execute(deduplicationId: String): CompletableFuture { - return future + return futureService.future } } } @InitiatingFlow - class ExternalOperationFlow(private val future: CompletableFuture) : FlowLogic() { + class ExternalOperationFlow : FlowLogic() { @Suspendable override fun call() { - await(ExternalOperation(future)) + await(ExternalOperation(serviceHub.cordaService(FutureService::class.java))) } - class ExternalOperation(private val future: CompletableFuture) : FlowExternalOperation { + class ExternalOperation(private val futureService: FutureService) : FlowExternalOperation { override fun execute(deduplicationId: String): String { - return future.get() + return futureService.future.get() } } } + class CompleteFutureFlow(private val value: String) : FlowLogic() { + @Suspendable + override fun call() { + serviceHub.cordaService(FutureService::class.java).future.complete(value) + } + } + + @Suppress("unused") + @CordaService + class FutureService(private val services: AppServiceHub) : SingletonSerializeAsToken() { + val future = CompletableFuture() + } + @InitiatingFlow class SleepFlow : FlowLogic() { @Suspendable diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowSoftLocksTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowSoftLocksTests.kt index 1930e7ffd8..7072833fa2 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowSoftLocksTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowSoftLocksTests.kt @@ -6,6 +6,7 @@ import net.corda.core.contracts.StateRef import net.corda.core.flows.FlowLogic import net.corda.core.identity.Party import net.corda.core.internal.FlowIORequest +import net.corda.core.internal.mapToSet import net.corda.core.node.services.Vault import net.corda.core.node.services.VaultService import net.corda.core.node.services.queryBy @@ -34,7 +35,6 @@ import org.junit.After import org.junit.Assert.assertEquals import org.junit.Before import org.junit.Test -import java.lang.IllegalStateException import java.sql.SQLTransientConnectionException import java.util.UUID import kotlin.test.assertFailsWith @@ -75,7 +75,7 @@ class FlowSoftLocksTests { @Test(timeout=300_000) fun `flow reserves fungible states with its own flow id and then manually releases them`() { - val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toSet() + val vaultStates = fillVault(aliceNode, 10).states.mapToSet { it.ref } val softLockActions = arrayOf( SoftLockAction(SoftLockingAction.LOCK, null, vaultStates, ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = vaultStates), SoftLockAction(SoftLockingAction.UNLOCK, null, vaultStates, ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.UNLOCKED_ONLY), expectedSoftLockedStates = EMPTY_SET) @@ -87,7 +87,7 @@ class FlowSoftLocksTests { @Test(timeout=300_000) fun `flow reserves fungible states with its own flow id and by default releases them when completing`() { - val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toSet() + val vaultStates = fillVault(aliceNode, 10).states.mapToSet { it.ref } val softLockActions = arrayOf( SoftLockAction(SoftLockingAction.LOCK, null, vaultStates, ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = vaultStates) ) @@ -98,7 +98,7 @@ class FlowSoftLocksTests { @Test(timeout=300_000) fun `flow reserves fungible states with its own flow id and by default releases them when errors`() { - val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toSet() + val vaultStates = fillVault(aliceNode, 10).states.mapToSet { it.ref } val softLockActions = arrayOf( SoftLockAction( SoftLockingAction.LOCK, @@ -106,7 +106,7 @@ class FlowSoftLocksTests { vaultStates, ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = vaultStates, - exception = IllegalStateException("Throwing error after flow has soft locked states") + throwException = { throw IllegalStateException("Throwing error after flow has soft locked states") } ) ) assertFailsWith { @@ -119,7 +119,7 @@ class FlowSoftLocksTests { @Test(timeout=300_000) fun `flow reserves fungible states with random id and then manually releases them`() { val randomId = UUID.randomUUID() - val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toSet() + val vaultStates = fillVault(aliceNode, 10).states.mapToSet { it.ref } val softLockActions = arrayOf( SoftLockAction(SoftLockingAction.LOCK, randomId, vaultStates, ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = EMPTY_SET), SoftLockAction(SoftLockingAction.UNLOCK, randomId, vaultStates, ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.UNLOCKED_ONLY), expectedSoftLockedStates = EMPTY_SET) @@ -132,7 +132,7 @@ class FlowSoftLocksTests { @Test(timeout=300_000) fun `flow reserves fungible states with random id and does not release them upon completing`() { val randomId = UUID.randomUUID() - val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toSet() + val vaultStates = fillVault(aliceNode, 10).states.mapToSet { it.ref } val softLockActions = arrayOf( SoftLockAction(SoftLockingAction.LOCK, randomId, vaultStates, ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = EMPTY_SET) ) @@ -145,7 +145,7 @@ class FlowSoftLocksTests { fun `flow only releases by default reserved states with flow id upon completing`() { // lock with flow id and random id, dont manually release any. At the end, check that only flow id ones got unlocked. val randomId = UUID.randomUUID() - val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toList() + val vaultStates = fillVault(aliceNode, 10).states.map { it.ref } val flowIdStates = vaultStates.subList(0, vaultStates.size / 2).toSet() val randomIdStates = vaultStates.subList(vaultStates.size / 2, vaultStates.size).toSet() val softLockActions = arrayOf( @@ -161,7 +161,7 @@ class FlowSoftLocksTests { @Test(timeout=300_000) fun `flow reserves fungible states with flow id and random id, then releases the flow id ones - assert the random id ones are still locked`() { val randomId = UUID.randomUUID() - val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toList() + val vaultStates = fillVault(aliceNode, 10).states.map { it.ref } val flowIdStates = vaultStates.subList(0, vaultStates.size / 2).toSet() val randomIdStates = vaultStates.subList(vaultStates.size / 2, vaultStates.size).toSet() val softLockActions = arrayOf( @@ -178,7 +178,7 @@ class FlowSoftLocksTests { @Test(timeout=300_000) fun `flow reserves fungible states with flow id and random id, then releases the random id ones - assert the flow id ones are still locked inside the flow`() { val randomId = UUID.randomUUID() - val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toList() + val vaultStates = fillVault(aliceNode, 10).states.map { it.ref } val flowIdStates = vaultStates.subList(0, vaultStates.size / 2).toSet() val randomIdStates = vaultStates.subList(vaultStates.size / 2, vaultStates.size).toSet() val softLockActions = arrayOf( @@ -206,7 +206,7 @@ class FlowSoftLocksTests { @Test(timeout=300_000) fun `when flow soft locks, then errors and retries from previous checkpoint, softLockedStates are reverted back correctly`() { val randomId = UUID.randomUUID() - val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toList() + val vaultStates = fillVault(aliceNode, 10).states.map { it.ref } val flowIdStates = vaultStates.subList(0, vaultStates.size / 2).toSet() val randomIdStates = vaultStates.subList(vaultStates.size / 2, vaultStates.size).toSet() val softLockActions = arrayOf( @@ -226,7 +226,7 @@ class FlowSoftLocksTests { randomIdStates, ExpectedSoftLocks(EMPTY_SET, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = EMPTY_SET, - exception = SQLTransientConnectionException("connection is not available") + throwException = { throw SQLTransientConnectionException("connection is not available") } ) ) val flowCompleted = aliceNode.services.startFlow(LockingUnlockingFlow(softLockActions)).resultFuture.getOrThrow(30.seconds) @@ -235,7 +235,7 @@ class FlowSoftLocksTests { LockingUnlockingFlow.throwOnlyOnce = true } - private fun fillVault(node: TestStartedNode, thisManyStates: Int): Vault? { + private fun fillVault(node: TestStartedNode, thisManyStates: Int): Vault { val bankNode = mockNet.createPartyNode(BOC_NAME) val bank = bankNode.info.singleIdentity() val cashIssuer = bank.ref(1) @@ -265,7 +265,7 @@ data class SoftLockAction(val action: SoftLockingAction, val states: Set, val expectedSoftLocks: ExpectedSoftLocks, val expectedSoftLockedStates: Set, - val exception: Exception? = null, + val throwException: (() -> Nothing)? = null, val doCheckpoint: Boolean = false) internal class LockingUnlockingFlow(private val softLockActions: Array): FlowLogic() { @@ -296,10 +296,10 @@ internal class LockingUnlockingFlow(private val softLockActions: Array>, (List<*>) -> Collection<*>> = Collections.unmodifiableMap(linkedMapOf( Collection::class.java to { list -> Collections.unmodifiableCollection(list) }, - List::class.java to { list -> Collections.unmodifiableList(list) }, - Set::class.java to { list -> Collections.unmodifiableSet(LinkedHashSet(list)) }, - SortedSet::class.java to { list -> Collections.unmodifiableSortedSet(TreeSet(list)) }, + List::class.java to { list -> list.toImmutableList() }, + Set::class.java to { list -> list.toImmutableSet() }, + SortedSet::class.java to { list -> list.toImmutableSortedSet() }, NavigableSet::class.java to { list -> Collections.unmodifiableNavigableSet(TreeSet(list)) }, NonEmptySet::class.java to { list -> NonEmptySet.copyOf(list) } )) diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/MapSerializer.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/MapSerializer.kt index 6b1d7e7b12..f8a9673bd3 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/MapSerializer.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/MapSerializer.kt @@ -1,5 +1,7 @@ package net.corda.serialization.internal.amqp +import net.corda.core.internal.toImmutableMap +import net.corda.core.internal.toImmutableSortedMap import net.corda.core.internal.uncheckedCast import net.corda.core.serialization.SerializationContext import net.corda.serialization.internal.model.LocalTypeInformation @@ -9,8 +11,13 @@ import org.apache.qpid.proton.codec.Data import java.io.NotSerializableException import java.lang.reflect.ParameterizedType import java.lang.reflect.Type -import java.util.* -import kotlin.collections.LinkedHashMap +import java.util.Collections +import java.util.Dictionary +import java.util.EnumMap +import java.util.NavigableMap +import java.util.SortedMap +import java.util.TreeMap +import java.util.WeakHashMap private typealias MapCreationFunction = (Map<*, *>) -> Map<*, *> @@ -26,8 +33,8 @@ class MapSerializer(private val declaredType: ParameterizedType, factory: LocalS // NB: Order matters in this map, the most specific classes should be listed at the end private val supportedTypes: Map>, MapCreationFunction> = Collections.unmodifiableMap(linkedMapOf( // Interfaces - Map::class.java to { map -> Collections.unmodifiableMap(map) }, - SortedMap::class.java to { map -> Collections.unmodifiableSortedMap(TreeMap(map)) }, + Map::class.java to { map -> map.toImmutableMap() }, + SortedMap::class.java to { map -> map.toImmutableSortedMap() }, NavigableMap::class.java to { map -> Collections.unmodifiableNavigableMap(TreeMap(map)) }, // concrete classes for user convenience LinkedHashMap::class.java to { map -> LinkedHashMap(map) }, diff --git a/settings.gradle b/settings.gradle index 67e26a11be..1ea0db1695 100644 --- a/settings.gradle +++ b/settings.gradle @@ -64,6 +64,7 @@ include 'experimental:quasar-hook' include 'experimental:corda-utils' include 'experimental:nodeinfo' include 'experimental:netparams' +include 'experimental:raft-tests' include 'test-common' include 'test-cli' include 'test-utils' @@ -105,6 +106,7 @@ include 'samples:network-verifier:workflows' include 'serialization' include 'serialization-1.2' include 'serialization-tests' +include 'checkpoint-tests' include 'testing:cordapps:dbfailure:dbfcontracts' include 'testing:cordapps:dbfailure:dbfworkflows' include 'testing:cordapps:missingmigration'