ENT-11328: Update node initialisation to be flexible if JDK modules are not open to reflection

This commit is contained in:
Shams Asari 2024-01-03 16:17:02 +00:00
parent ab4e63ee0d
commit 0123b141d3
51 changed files with 782 additions and 935 deletions

View File

@ -4624,8 +4624,6 @@ public final class net.corda.core.node.NetworkParameters extends java.lang.Objec
@NotNull
public String toString()
##
public final class net.corda.core.node.NetworkParametersKt extends java.lang.Object
##
@CordaSerializable
public final class net.corda.core.node.NodeDiagnosticInfo extends java.lang.Object
public <init>(String, String, int, String, java.util.List)

View File

@ -121,7 +121,6 @@ buildscript {
ext.fontawesomefx_commons_version = constants.getProperty("fontawesomefxCommonsVersion")
ext.fontawesomefx_fontawesome_version = constants.getProperty("fontawesomefxFontawesomeVersion")
ext.javaassist_version = constants.getProperty("javaassistVersion")
ext.corda_revision = {
try {
"git rev-parse HEAD".execute().text.trim()
@ -131,6 +130,8 @@ buildscript {
}
}()
ext.corda_docs_link = "https://docs.corda.net/docs/corda-os/$baseVersion"
ext.node_jvm_args = project(":node:capsule").file("src/main/resources/node-jvm-args.txt").readLines()
repositories {
mavenLocal()
// Use system environment to activate caching with Artifactory,
@ -310,7 +311,6 @@ allprojects {
}
tasks.withType(Test).configureEach {
jvmArgs += project(":node:capsule").file("src/main/resources/node-jvm-args.txt").readLines()
jvmArgs += "--add-modules=jdk.incubator.foreign" // For the SharedMemoryIncremental
forkEvery = 20
ignoreFailures = project.hasProperty('tests.ignoreFailures') ? project.property('tests.ignoreFailures').toBoolean() : false

View File

@ -0,0 +1,9 @@
# Checkpoint tests
Restoring checkpoints require certain [JDK modules to be open](../node/capsule/src/main/resources/node-jvm-args.txt) (due to the use of
reflection). This isn't an issue for the node, as we can open up these modules via the Capsule and so doesn't impact the user in anyway. For
client code that connects to the node, or uses the Corda API outside of the node, we would rather not mandate that users also have to do
this. So, to ensure we don't accidently do that, we don't add these flags to our tests.
This module exists for those tests which are not using the out-of-process node driver, but need to test checkpoint deserialisation. The same
node JVM args are used, and so replicates the exact behaviour of checkpoint restoration as the node.

View File

@ -0,0 +1,25 @@
apply plugin: 'org.jetbrains.kotlin.jvm'
apply plugin: 'net.corda.plugins.quasar-utils'
description 'Checkpoint tests'
dependencies {
testImplementation project(path: ':core', configuration: 'testArtifacts')
testImplementation project(":serialization")
testImplementation project(":node-api")
testImplementation project(':core-test-utils')
testImplementation project(':test-utils')
testImplementation project(":node-driver")
testImplementation "junit:junit:$junit_version"
testImplementation "org.junit.jupiter:junit-jupiter-api:$junit_jupiter_version"
testImplementation "org.assertj:assertj-core:$assertj_version"
testImplementation "org.mockito.kotlin:mockito-kotlin:$mockito_kotlin_version"
testImplementation "com.esotericsoftware:kryo:$kryo_version"
testImplementation "com.google.guava:guava:$guava_version"
testImplementation "io.netty:netty-common:$netty_version"
testImplementation "org.slf4j:slf4j-api:$slf4j_version"
}
test {
jvmArgs += node_jvm_args
}

View File

@ -1,4 +1,4 @@
package net.corda.coretests.flows
package net.corda.core.flows
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.FiberExecutorScheduler
@ -11,11 +11,10 @@ import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.rootCause
import net.corda.core.utilities.getOrThrow
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.catchThrowable
import org.hamcrest.Matchers.lessThanOrEqualTo
import org.junit.Assert.assertThat
import org.junit.Test
import java.util.*
import java.util.UUID
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger
@ -65,7 +64,7 @@ class FastThreadLocalTest {
override fun initialValue() = ExpensiveObj()
}
runFibers(100, threadLocal::get) // Return value could be anything.
assertThat(expensiveObjCount.get(), lessThanOrEqualTo(3))
assertThat(expensiveObjCount.get()).isLessThanOrEqualTo(3)
}
/** @return the number of times a different expensive object was obtained post-suspend. */
@ -104,7 +103,6 @@ class FastThreadLocalTest {
}
private fun contentIsNotSerialized(threadLocalGet: () -> UnserializableObj) = scheduled(1, ::FastThreadLocalThread) {
// Use false like AbstractKryoSerializationScheme, the default of true doesn't work at all:
val serializer = Fiber.getFiberSerializer(false)
val returnValue = UUID.randomUUID()
val deserializedFiber = serializer.read(openFuture<ByteArray>().let {
@ -133,7 +131,7 @@ class FastThreadLocalTest {
obj = null
}
// In retainObj false case, check this doesn't attempt to serialize fields of currentThread:
Fiber.parkAndSerialize { fiber, _ -> bytesFuture.capture { serializer.write(fiber) } }
Fiber.parkAndCustomSerialize { fiber -> bytesFuture.capture { serializer.write(fiber) } }
return returnValue
}
}

View File

@ -0,0 +1,152 @@
package net.corda.nodeapi.internal.serialization.kryo
import net.corda.core.serialization.internal.checkpointDeserialize
import net.corda.core.serialization.internal.checkpointSerialize
import net.corda.testing.core.internal.CheckpointSerializationEnvironmentRule
import org.assertj.core.api.Assertions.assertThat
import org.junit.Ignore
import org.junit.Rule
import org.junit.Test
import java.time.Instant
import java.util.LinkedList
import kotlin.test.assertEquals
class KryoCheckpointTest {
companion object {
// A value big enough to trigger any stack overflow issues
private const val SIZE = 10_000
private const val CHUNK = 2
}
@Rule
@JvmField
val serializationRule = CheckpointSerializationEnvironmentRule()
@Ignore("Kryo optimizes boxed primitives so this does not work. Need to customise ReferenceResolver to stop it doing it.")
@Test(timeout = 300_000)
fun `linked hash map values can checkpoint without error, even with repeats for boxed primitives`() {
var lastValue = 0
val dummyMap = linkedMapOf<String, Int>()
for (i in 0..SIZE) {
dummyMap[i.toString()] = (i % 10)
}
var it = dummyMap.values.iterator()
while (it.hasNext()) {
lastValue = it.next()
val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT)
it = KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT)
}
assertEquals(SIZE % 10, lastValue)
}
@Test(timeout=300_000)
fun `ArrayList iterator can checkpoint without error`() {
testIteratorCheckpointing(ArrayList())
}
@Test(timeout=300_000)
fun `LinkedList iterator can checkpoint without error`() {
testIteratorCheckpointing(LinkedList())
}
@Test(timeout=300_000)
fun `HashSet iterator can checkpoint without error`() {
testIteratorCheckpointing(HashSet())
}
@Test(timeout=300_000)
fun `LinkedHashSet iterator can checkpoint without error`() {
testIteratorCheckpointing(LinkedHashSet())
}
@Test(timeout=300_000)
fun `HashMap iterator can checkpoint without error`() {
testMapIteratorCheckpointing(HashMap())
}
@Test(timeout=300_000)
fun `LinkedHashMap iterator can checkpoint without error`() {
testMapIteratorCheckpointing(LinkedHashMap())
}
@Test(timeout=300_000)
fun `Instant can checkpoint without error`() {
val original = Instant.now()
assertThat(checkpointRoundtrip(original)).isEqualTo(original)
}
private fun testIteratorCheckpointing(collection: MutableCollection<Int>) {
collection.addAll(0 until SIZE)
testIteratorCheckpointing(collection.iterator())
if (collection is List<*>) {
testListIteratorCheckpointing(collection)
}
}
private fun testIteratorCheckpointing(originalIterator: Iterator<*>) {
var endReached = false
for ((_, skip) in testIndices) {
repeat(skip) {
originalIterator.next()
}
val hasNext = originalIterator.hasNext()
val roundtripIterator = checkpointRoundtrip(originalIterator)
assertThat(hasNext).isEqualTo(originalIterator.hasNext()) // Make sure serialising it doesn't change it
assertThat(roundtripIterator.hasNext()).isEqualTo(hasNext)
if (!hasNext) {
endReached = true
break
}
assertThat(roundtripIterator.next()).isEqualTo(originalIterator.next())
}
assertThat(endReached).isTrue()
}
private fun testListIteratorCheckpointing(list: List<*>) {
for ((index, _) in testIndices) {
val originalIterator = list.listIterator(index)
while (true) {
val roundtripIterator = checkpointRoundtrip(originalIterator)
assertThat(roundtripIterator.previousIndex()).isEqualTo(originalIterator.previousIndex())
assertThat(roundtripIterator.hasPrevious()).isEqualTo(originalIterator.hasPrevious())
if (originalIterator.hasPrevious()) {
assertThat(roundtripIterator.previous()).isEqualTo(originalIterator.previous())
roundtripIterator.next()
originalIterator.next()
}
assertThat(roundtripIterator.nextIndex()).isEqualTo(originalIterator.nextIndex())
assertThat(roundtripIterator.hasNext()).isEqualTo(originalIterator.hasNext())
if (!originalIterator.hasNext()) break
assertThat(roundtripIterator.next()).isEqualTo(originalIterator.next())
}
}
}
private fun testMapIteratorCheckpointing(map: MutableMap<Int, Int>) {
repeat(SIZE) { index ->
map[index] = index
}
testIteratorCheckpointing(map.keys.iterator())
testIteratorCheckpointing(map.values.iterator())
testIteratorCheckpointing(map.entries.iterator())
}
private inline fun <reified T : Any> checkpointRoundtrip(obj: T): T {
val bytes = obj.checkpointSerialize(KRYO_CHECKPOINT_CONTEXT)
return bytes.checkpointDeserialize(KRYO_CHECKPOINT_CONTEXT)
}
/**
* Return a Sequence of indicies which just iterates over the first and last [CHUNK], otherwise the tests take too long. The second
* value of the [Pair] is the number of elements to skip over from the previous iteration.
*/
private val testIndices: Sequence<Pair<Int, Int>>
get() = generateSequence(Pair(0, 0)) { (previous, _) ->
when {
previous < CHUNK - 1 -> Pair(previous + 1, 0)
previous == CHUNK - 1 -> Pair(SIZE - CHUNK, SIZE - CHUNK - previous)
previous < SIZE - 1 -> Pair(previous + 1, 0)
else -> null
}
}
}

View File

@ -37,8 +37,6 @@ import net.corda.serialization.internal.encodingNotPermittedFormat
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.TestIdentity
import net.corda.testing.core.internal.CheckpointSerializationEnvironmentRule
import org.apache.commons.lang3.JavaVersion
import org.apache.commons.lang3.SystemUtils
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.assertj.core.api.Assertions.catchThrowable
@ -394,11 +392,7 @@ class KryoTests(private val compression: CordaSerializationEncoding?) {
val obj = Holder(ByteArray(20000))
val uncompressedSize = obj.checkpointSerialize(context.withEncoding(null)).size
val compressedSize = obj.checkpointSerialize(context.withEncoding(CordaSerializationEncoding.SNAPPY)).size
// If these need fixing, sounds like Kryo wire format changed and checkpoints might not survive an upgrade.
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_11))
assertEquals(20127, uncompressedSize)
else
assertEquals(20234, uncompressedSize)
assertEquals(1095, compressedSize)
}
}

View File

@ -1,6 +1,5 @@
package net.corda.serialization.internal
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.io.Output
import net.corda.core.serialization.SerializationToken
@ -14,9 +13,7 @@ import net.corda.core.serialization.internal.checkpointDeserialize
import net.corda.core.serialization.internal.checkpointSerialize
import net.corda.core.utilities.OpaqueBytes
import net.corda.coretesting.internal.rigorousMock
import net.corda.nodeapi.internal.serialization.kryo.CordaClassResolver
import net.corda.nodeapi.internal.serialization.kryo.CordaKryo
import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer
import net.corda.nodeapi.internal.serialization.kryo.KryoCheckpointSerializer
import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
import net.corda.testing.core.internal.CheckpointSerializationEnvironmentRule
import org.assertj.core.api.Assertions.assertThat
@ -110,7 +107,7 @@ class SerializationTokenTest {
val context = serializeAsTokenContext(tokenizableBefore)
val testContext = this.context.withTokenContext(context)
val kryo: Kryo = DefaultKryoCustomizer.customize(CordaKryo(CordaClassResolver(this.context)))
val kryo = KryoCheckpointSerializer.createFiberSerializer(this.context).kryo
val stream = ByteArrayOutputStream()
Output(stream).use {
kryoMagic.writeTo(it)

View File

@ -17,8 +17,7 @@ platformVersion=14
openTelemetryVersion=1.20.1
openTelemetrySemConvVersion=1.20.1-alpha
guavaVersion=28.0-jre
# Quasar version to use with Java 8:
quasarVersion=0.9.0_r3
quasarVersion=0.9.1_r3-SNAPSHOT
dockerJavaVersion=3.2.5
proguardVersion=7.3.1
// bouncy castle version must not be changed on a patch release. Needs a full release test cycle to flush out any issues.

View File

@ -1,4 +1,4 @@
This is a Kotlin 1.2 version of the `core` module, which is consumed by the `verifier` module, for verifying contracts written in Kotlin
1.2. This is just a "shell" module which uses the existing the code in `core` and compiles it with the 1.2 compiler.
To allow `core` to benefit from new APIs introduced since 1.2, those APIs much be copied into this module with the same `kotlin` package.
To allow `core` to benefit from new APIs introduced since 1.2, those APIs must be copied into this module with the same `kotlin` package.

View File

@ -201,11 +201,9 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
FlowWithExternalProcess(party) {
@Suspendable
override fun testCode(): Any {
val e = createException()
return await(ExternalAsyncOperation(serviceHub, (SerializableLambda2 { _, _ ->
CompletableFuture<Any>().apply {
completeExceptionally(e)
completeExceptionally(createException())
}
})))
}

View File

@ -256,8 +256,7 @@ class FlowExternalOperationTest : AbstractFlowExternalOperationTest() {
@Suspendable
override fun testCode() {
val e = createException()
await(ExternalOperation(serviceHub, (SerializableLambda2 { _, _ -> throw e })))
await(ExternalOperation(serviceHub, (SerializableLambda2 { _, _ -> throw createException() })))
}
private fun createException() = when (exceptionType) {

View File

@ -2,6 +2,11 @@
package net.corda.core.internal
import com.google.common.collect.ImmutableList
import com.google.common.collect.ImmutableMap
import com.google.common.collect.ImmutableSet
import com.google.common.collect.ImmutableSortedMap
import com.google.common.collect.ImmutableSortedSet
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.SecureHash
@ -52,6 +57,8 @@ import java.time.Duration
import java.time.temporal.Temporal
import java.util.Collections
import java.util.PrimitiveIterator
import java.util.SortedMap
import java.util.SortedSet
import java.util.Spliterator
import java.util.Spliterator.DISTINCT
import java.util.Spliterator.IMMUTABLE
@ -61,6 +68,8 @@ import java.util.Spliterator.SIZED
import java.util.Spliterator.SORTED
import java.util.Spliterator.SUBSIZED
import java.util.Spliterators
import java.util.TreeMap
import java.util.TreeSet
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import java.util.jar.JarEntry
@ -160,6 +169,79 @@ inline fun <T, R> Iterable<T>.flatMapToSet(transform: (T) -> Iterable<R>): Set<R
}
}
// The following "toImmutable" methods will try to return collections which are friendly to use in mock node checkpoints. This avoids the
// tests from having to add the necessary `--add-opens` args (this is not an issue with normal nodes). This is primarily achieved by using
// the Guava immutable collections. However, they don't support null values, which is why you will see checks for the presense of null
// before using them.
/**
* Returns an immutable [List] which cannot be modified, nor its contents changed indirectly via the receiver. Tries to avoid copying data.
*/
@Suppress("UNCHECKED_CAST", "PLATFORM_CLASS_MAPPED_TO_KOTLIN")
fun <T> Collection<T>.toImmutableList(): List<T> {
return when {
isEmpty() -> emptyList()
size == 1 -> listOf(first())
this is ImmutableList -> this
contains(null as T) -> (this as java.util.Collection<T>).toArray().asList() as List<T>
else -> ImmutableList.copyOf(this)
}
}
/**
* Returns an immutable, iteration preserving, [Set] which cannot be modified, nor its contents changed indirectly via the receiver. Tries
* to avoid copying data.
*/
@Suppress("UNCHECKED_CAST")
fun <T> Collection<T>.toImmutableSet(): Set<T> {
return when {
isEmpty() -> emptySet()
size == 1 -> setOf(first())
contains(null as T) -> Collections.unmodifiableSet(LinkedHashSet(this))
else -> ImmutableSet.copyOf(this)
}
}
/**
* Returns an immutable [SortedSet] which cannot be modified, nor its contents changed indirectly via the receiver. Tries to avoid copying
* data.
*/
@Suppress("UNCHECKED_CAST")
fun <T> Collection<T>.toImmutableSortedSet(): SortedSet<T> {
return when {
isEmpty() -> Collections.emptySortedSet()
contains(null as T) -> Collections.unmodifiableSortedSet(TreeSet(this))
else -> ImmutableSortedSet.copyOf(this)
}
}
/**
* Returns an immutable, iteration preserving, [Map] which cannot be modified, nor its contents changed indirectly via the receiver. Tries
* to avoid copying data.
*/
@Suppress("UNCHECKED_CAST")
fun <K, V> Map<K, V>.toImmutableMap(): Map<K, V> {
return when {
isEmpty() -> emptyMap()
size == 1 -> entries.first().let { Collections.singletonMap(it.key, it.value) }
containsValue(null as V) || containsKey(null as K) -> Collections.unmodifiableMap(LinkedHashMap(this))
else -> ImmutableMap.copyOf(this)
}
}
/**
* Returns an immutable [SortedMap] which cannot be modified, nor its contents changed indirectly via the receiver. Tries to avoid copying
* data.
*/
@Suppress("UNCHECKED_CAST")
fun <K, V> Map<K, V>.toImmutableSortedMap(): SortedMap<K, V> {
return when {
isEmpty() -> Collections.emptySortedMap()
containsValue(null as V) || containsKey(null as K) -> Collections.unmodifiableSortedMap(TreeMap(this))
else -> ImmutableSortedMap.copyOf(this)
}
}
fun InputStream.copyTo(target: Path, vararg options: CopyOption): Long = Files.copy(this, target, *options)
/** Same as [InputStream.readBytes] but also closes the stream. */
@ -430,6 +512,8 @@ val Class<*>.packageNameOrNull: String? // This intentionally does not go via `p
}
}
val Class<*>.fullyQualifiedPackage: String get() = "${module.name}/$packageName"
inline val Class<*>.isAbstractClass: Boolean get() = Modifier.isAbstract(modifiers)
inline val Class<*>.isConcreteClass: Boolean get() = !isInterface && !isAbstractClass

View File

@ -5,6 +5,8 @@ import net.corda.core.crypto.toStringShort
import net.corda.core.identity.Party
import net.corda.core.internal.noPackageOverlap
import net.corda.core.internal.requirePackageValid
import net.corda.core.internal.toImmutableList
import net.corda.core.internal.toImmutableMap
import net.corda.core.node.services.AttachmentId
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.DeprecatedConstructorForDeserialization
@ -12,8 +14,6 @@ import net.corda.core.utilities.days
import java.security.PublicKey
import java.time.Duration
import java.time.Instant
import java.util.Collections.unmodifiableList
import java.util.Collections.unmodifiableMap
// DOCSTART 1
/**
@ -245,38 +245,20 @@ data class NetworkParameters(
fun toImmutable(): NetworkParameters {
return NetworkParameters(
minimumPlatformVersion = minimumPlatformVersion,
notaries = unmodifiable(notaries),
notaries = notaries.toImmutableList(),
maxMessageSize = maxMessageSize,
maxTransactionSize = maxTransactionSize,
modifiedTime = modifiedTime,
epoch = epoch,
whitelistedContractImplementations = unmodifiable(whitelistedContractImplementations) { entry ->
unmodifiableList(entry.value)
},
whitelistedContractImplementations = whitelistedContractImplementations.mapValues { it.value.toImmutableList() }.toImmutableMap(),
eventHorizon = eventHorizon,
packageOwnership = unmodifiable(packageOwnership),
packageOwnership = packageOwnership.toImmutableMap(),
recoveryMaximumBackupInterval = recoveryMaximumBackupInterval,
confidentialIdentityMinimumBackupInterval = confidentialIdentityMinimumBackupInterval
)
}
}
private fun <T> unmodifiable(list: List<T>): List<T> {
return if (list.isEmpty()) {
emptyList()
} else {
unmodifiableList(list)
}
}
private inline fun <K, V> unmodifiable(map: Map<K, V>, transform: (Map.Entry<K, V>) -> V = Map.Entry<K, V>::value): Map<K, V> {
return if (map.isEmpty()) {
emptyMap()
} else {
unmodifiableMap(map.mapValues(transform))
}
}
/**
* Data class storing information about notaries available in the network.
* @property identity Identity of the notary (note that it can be an identity of the distributed node).

View File

@ -22,6 +22,7 @@ import net.corda.core.internal.deserialiseCommands
import net.corda.core.internal.deserialiseComponentGroup
import net.corda.core.internal.eagerDeserialise
import net.corda.core.internal.isUploaderTrusted
import net.corda.core.internal.toImmutableList
import net.corda.core.internal.uncheckedCast
import net.corda.core.internal.verification.AbstractVerifier
import net.corda.core.internal.verification.Verifier
@ -32,7 +33,6 @@ import net.corda.core.serialization.SerializationFactory
import net.corda.core.serialization.internal.AttachmentsClassLoaderBuilder
import net.corda.core.serialization.internal.AttachmentsClassLoaderCache
import net.corda.core.utilities.contextLogger
import java.util.Collections.unmodifiableList
import java.util.function.Predicate
import java.util.function.Supplier
@ -125,18 +125,6 @@ private constructor(
companion object {
private val logger = contextLogger()
private fun <T> protect(list: List<T>): List<T> {
return list.run {
if (isEmpty()) {
emptyList()
} else {
unmodifiableList(this)
}
}
}
private fun <T> protectOrNull(list: List<T>?): List<T>? = list?.let(::protect)
@CordaInternal
internal fun create(
inputs: List<StateAndRef<ContractState>>,
@ -168,9 +156,9 @@ private constructor(
privacySalt = privacySalt,
networkParameters = networkParameters,
references = references,
componentGroups = protectOrNull(componentGroups),
serializedInputs = protectOrNull(serializedInputs),
serializedReferences = protectOrNull(serializedReferences),
componentGroups = componentGroups?.toImmutableList(),
serializedInputs = serializedInputs?.toImmutableList(),
serializedReferences = serializedReferences?.toImmutableList(),
isAttachmentTrusted = isAttachmentTrusted,
verifierFactory = verifierFactory,
attachmentsClassLoaderCache = attachmentsClassLoaderCache,
@ -197,16 +185,16 @@ private constructor(
references: List<StateAndRef<ContractState>>,
digestService: DigestService): LedgerTransaction {
return LedgerTransaction(
inputs = protect(inputs),
outputs = protect(outputs),
commands = protect(commands),
attachments = protect(attachments),
inputs = inputs.toImmutableList(),
outputs = outputs.toImmutableList(),
commands = commands.toImmutableList(),
attachments = attachments.toImmutableList(),
id = id,
notary = notary,
timeWindow = timeWindow,
privacySalt = privacySalt,
networkParameters = networkParameters,
references = protect(references),
references = references.toImmutableList(),
componentGroups = null,
serializedInputs = null,
serializedReferences = null,

View File

@ -0,0 +1,5 @@
# Raft tests
Testing the `RaftUniquenessProvider` provider requires the `java.nio` packaage to be open (the atomix library does reflection into
`java.nio.Bits`). This module has this package opened up to allow mock and unit tests to work. This is preferred over having `java.nio` open
in every module as this is an experimental feature.

View File

@ -0,0 +1,33 @@
apply plugin: 'org.jetbrains.kotlin.jvm'
//apply plugin: 'net.corda.plugins.quasar-utils'
description 'Raft tests'
dependencies {
// testImplementation project(path: ':core', configuration: 'testArtifacts')
testImplementation project(":core")
// testImplementation project(":serialization")
testImplementation project(":node-api")
testImplementation project(":node")
testImplementation project(':core-test-utils')
testImplementation project(':test-utils')
testImplementation project(":node-driver")
testImplementation "junit:junit:$junit_version"
// testImplementation "org.junit.jupiter:junit-jupiter-api:$junit_jupiter_version"
testImplementation "org.assertj:assertj-core:$assertj_version"
// testImplementation "org.mockito.kotlin:mockito-kotlin:$mockito_kotlin_version"
// testImplementation "com.esotericsoftware:kryo:$kryo_version"
// testImplementation "com.google.guava:guava:$guava_version"
// testImplementation "io.netty:netty-common:$netty_version"
// testImplementation "org.slf4j:slf4j-api:$slf4j_version"
testImplementation "io.dropwizard.metrics:metrics-jmx:$metrics_version"
testImplementation 'io.atomix.copycat:copycat-client:1.2.3'
testImplementation 'io.atomix.copycat:copycat-server:1.2.3'
}
test {
enabled = true // Something is disabling the test
jvmArgs += [
"--add-opens=java.base/java.nio=ALL-UNNAMED"
]
}

View File

@ -26,9 +26,8 @@ import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.configureDatabase
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.hamcrest.Matchers.instanceOf
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Assert.assertThat
import org.junit.Before
import org.junit.Rule
import org.junit.Test
@ -119,7 +118,7 @@ class RaftTransactionCommitLogTests {
states, txId, requestingPartyName.toString(), requestSignature, timeWindow
)
val commitError = client.submit(commitCommand).getOrThrow()
assertThat(commitError, instanceOf(NotaryError.TimeWindowInvalid::class.java))
assertThat(commitError).isInstanceOf(NotaryError.TimeWindowInvalid::class.java)
}
@Test(timeout=300_000)
@ -158,7 +157,7 @@ class RaftTransactionCommitLogTests {
val address = Address(myAddress.host, myAddress.port)
val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null }, NodeSchemaService(extraSchemas = setOf(RaftNotarySchemaV1)))
databases.add(database)
val stateMachineFactory = { RaftTransactionCommitLog(database, Clock.systemUTC(), { RaftUniquenessProvider.createMap(TestingNamedCacheFactory()) }) }
val stateMachineFactory = { RaftTransactionCommitLog(database, Clock.systemUTC()) { RaftUniquenessProvider.createMap(TestingNamedCacheFactory()) } }
val server = CopycatServer.builder(address)
.withStateMachine(stateMachineFactory)

View File

@ -1,4 +1,4 @@
package net.corda.node.services.transactions
package net.corda.notary.experimental.raft
import com.codahale.metrics.MetricRegistry
import net.corda.core.contracts.TimeWindow
@ -25,9 +25,6 @@ import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.notary.common.BatchSignature
import net.corda.notary.experimental.raft.RaftConfig
import net.corda.notary.experimental.raft.RaftNotarySchemaV1
import net.corda.notary.experimental.raft.RaftUniquenessProvider
import net.corda.notary.jpa.JPANotaryConfiguration
import net.corda.notary.jpa.JPANotarySchemaV1
import net.corda.notary.jpa.JPAUniquenessProvider

View File

@ -1,136 +0,0 @@
package net.corda.nodeapi.internal.serialization.kryo
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import java.lang.reflect.Constructor
import java.lang.reflect.Field
import java.util.LinkedHashMap
import java.util.LinkedHashSet
import java.util.LinkedList
/**
* The [LinkedHashMap] and [LinkedHashSet] have a problem with the default Quasar/Kryo serialisation
* in that serialising an iterator (and subsequent [LinkedHashMap.Entry]) over a sufficiently large
* data set can lead to a stack overflow (because the object map is traversed recursively).
*
* We've added our own custom serializer in order to ensure that the iterator is correctly deserialized.
*/
internal object LinkedHashMapIteratorSerializer : Serializer<Iterator<*>>() {
private val DUMMY_MAP = linkedMapOf(1L to 1)
private val outerMapField: Field = getIterator()::class.java.superclass.getDeclaredField("this$0").apply { isAccessible = true }
private val currentField: Field = getIterator()::class.java.superclass.getDeclaredField("current").apply { isAccessible = true }
private val KEY_ITERATOR_CLASS: Class<MutableIterator<Long>> = DUMMY_MAP.keys.iterator().javaClass
private val VALUE_ITERATOR_CLASS: Class<MutableIterator<Int>> = DUMMY_MAP.values.iterator().javaClass
private val MAP_ITERATOR_CLASS: Class<MutableIterator<MutableMap.MutableEntry<Long, Int>>> = DUMMY_MAP.iterator().javaClass
fun getIterator(): Any = DUMMY_MAP.iterator()
override fun write(kryo: Kryo, output: Output, obj: Iterator<*>) {
val current: Map.Entry<*, *>? = currentField.get(obj) as Map.Entry<*, *>?
kryo.writeClassAndObject(output, outerMapField.get(obj))
kryo.writeClassAndObject(output, current)
}
override fun read(kryo: Kryo, input: Input, type: Class<out Iterator<*>>): Iterator<*> {
val outerMap = kryo.readClassAndObject(input) as Map<*, *>
return when (type) {
KEY_ITERATOR_CLASS -> {
val current = (kryo.readClassAndObject(input) as? Map.Entry<*, *>)?.key
outerMap.keys.iterator().returnToIteratorLocation(kryo, current)
}
VALUE_ITERATOR_CLASS -> {
val current = (kryo.readClassAndObject(input) as? Map.Entry<*, *>)?.value
outerMap.values.iterator().returnToIteratorLocation(kryo, current)
}
MAP_ITERATOR_CLASS -> {
val current = (kryo.readClassAndObject(input) as? Map.Entry<*, *>)
outerMap.iterator().returnToIteratorLocation(kryo, current)
}
else -> throw IllegalStateException("Invalid type")
}
}
private fun Iterator<*>.returnToIteratorLocation(kryo: Kryo, current: Any?): Iterator<*> {
while (this.hasNext()) {
val key = this.next()
if (iteratedObjectsEqual(kryo, key, current)) break
}
return this
}
private fun iteratedObjectsEqual(kryo: Kryo, a: Any?, b: Any?): Boolean = if (a == null || b == null) {
a == b
} else {
a === b || mapEntriesEqual(kryo, a, b) || kryoOptimisesAwayReferencesButEqual(kryo, a, b)
}
/**
* Kryo can substitute brand new created instances for some types during deserialization, making the identity check fail.
* Fall back to equality for those.
*/
private fun kryoOptimisesAwayReferencesButEqual(kryo: Kryo, a: Any, b: Any) =
(!kryo.referenceResolver.useReferences(a.javaClass) && !kryo.referenceResolver.useReferences(b.javaClass) && a == b)
private fun mapEntriesEqual(kryo: Kryo, a: Any, b: Any) =
(a is Map.Entry<*, *> && b is Map.Entry<*, *> && iteratedObjectsEqual(kryo, a.key, b.key))
}
/**
* The [LinkedHashMap] and [LinkedHashSet] have a problem with the default Quasar/Kryo serialisation
* in that serialising an iterator (and subsequent [LinkedHashMap.Entry]) over a sufficiently large
* data set can lead to a stack overflow (because the object map is traversed recursively).
*
* We've added our own custom serializer in order to ensure that only the key/value are recorded.
* The rest of the list isn't required at this scope.
*/
object LinkedHashMapEntrySerializer : Serializer<Map.Entry<*, *>>() {
// Create a dummy map so that we can get the LinkedHashMap$Entry from it
// The element type of the map doesn't matter. The entry is all we want
private val DUMMY_MAP = linkedMapOf(1L to 1)
fun getEntry(): Any = DUMMY_MAP.entries.first()
private val constr: Constructor<*> = getEntry()::class.java.declaredConstructors.single().apply { isAccessible = true }
/**
* Kryo would end up serialising "this" entry, then serialise "this.after" recursively, leading to a very large stack.
* we'll skip that and just write out the key/value
*/
override fun write(kryo: Kryo, output: Output, obj: Map.Entry<*, *>) {
val e: Map.Entry<*, *> = obj
kryo.writeClassAndObject(output, e.key)
kryo.writeClassAndObject(output, e.value)
}
override fun read(kryo: Kryo, input: Input, type: Class<out Map.Entry<*, *>>): Map.Entry<*, *> {
val key = kryo.readClassAndObject(input)
val value = kryo.readClassAndObject(input)
return constr.newInstance(0, key, value, null) as Map.Entry<*, *>
}
}
/**
* Also, add a [ListIterator] serializer to avoid more linked list issues.
*/
object LinkedListItrSerializer : Serializer<ListIterator<Any>>() {
// Create a dummy list so that we can get the ListItr from it
// The element type of the list doesn't matter. The iterator is all we want
private val DUMMY_LIST = LinkedList<Long>(listOf(1))
fun getListItr(): Any = DUMMY_LIST.listIterator()
private val outerListField: Field = getListItr()::class.java.getDeclaredField("this$0").apply { isAccessible = true }
override fun write(kryo: Kryo, output: Output, obj: ListIterator<Any>) {
kryo.writeClassAndObject(output, outerListField.get(obj))
output.writeInt(obj.nextIndex())
}
override fun read(kryo: Kryo, input: Input, type: Class<out ListIterator<Any>>): ListIterator<Any> {
val list = kryo.readClassAndObject(input) as LinkedList<*>
val index = input.readInt()
return list.listIterator(index)
}
}

View File

@ -6,10 +6,8 @@ import com.esotericsoftware.kryo.SerializerFactory
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.serializers.ClosureSerializer
import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer
import com.esotericsoftware.kryo.serializers.FieldSerializer
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy
import de.javakaffee.kryoserializers.ArraysAsListSerializer
import de.javakaffee.kryoserializers.BitSetSerializer
import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer
import de.javakaffee.kryoserializers.guava.ImmutableListSerializer
@ -20,7 +18,6 @@ import de.javakaffee.kryoserializers.guava.ImmutableSortedSetSerializer
import net.corda.core.contracts.ContractAttachment
import net.corda.core.contracts.ContractClassName
import net.corda.core.contracts.PrivacySalt
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.SecureHash
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.AbstractAttachment
@ -40,29 +37,20 @@ import net.corda.core.utilities.toNonEmptySet
import net.corda.serialization.internal.DefaultWhitelist
import net.corda.serialization.internal.GeneratedAttachment
import net.corda.serialization.internal.MutableClassWhitelist
import net.i2p.crypto.eddsa.EdDSAPrivateKey
import net.i2p.crypto.eddsa.EdDSAPublicKey
import org.bouncycastle.jcajce.provider.asymmetric.ec.BCECPrivateKey
import org.bouncycastle.jcajce.provider.asymmetric.ec.BCECPublicKey
import org.bouncycastle.jcajce.provider.asymmetric.rsa.BCRSAPrivateCrtKey
import org.bouncycastle.jcajce.provider.asymmetric.rsa.BCRSAPublicKey
import org.bouncycastle.pqc.jcajce.provider.sphincs.BCSphincs256PrivateKey
import org.bouncycastle.pqc.jcajce.provider.sphincs.BCSphincs256PublicKey
import org.objenesis.instantiator.ObjectInstantiator
import org.objenesis.strategy.InstantiatorStrategy
import org.objenesis.strategy.StdInstantiatorStrategy
import org.slf4j.Logger
import java.io.BufferedInputStream
import java.io.ByteArrayOutputStream
import java.io.FileInputStream
import java.io.InputStream
import java.lang.invoke.SerializedLambda
import java.lang.reflect.Modifier.isPublic
import java.security.KeyPair
import java.security.PrivateKey
import java.security.PublicKey
import java.security.cert.CertPath
import java.security.cert.X509Certificate
import java.util.*
import kotlin.collections.ArrayList
object DefaultKryoCustomizer {
private val serializationWhitelists: List<SerializationWhitelist> by lazy {
@ -71,11 +59,7 @@ object DefaultKryoCustomizer {
fun customize(kryo: Kryo, publicKeySerializer: Serializer<PublicKey> = PublicKeySerializer): Kryo {
return kryo.apply {
isRegistrationRequired = false
references = true
// Needed because of https://github.com/EsotericSoftware/kryo/issues/864
setOptimizedGenerics(false)
val defaultFactoryConfig = FieldSerializer.FieldSerializerConfig()
// Take the safest route here and allow subclasses to have fields named the same as super classes.
defaultFactoryConfig.extendedFieldNames = true
@ -83,78 +67,60 @@ object DefaultKryoCustomizer {
// For checkpoints we still want all the synthetic fields. This allows inner classes to reference
// their parents after deserialization.
defaultFactoryConfig.ignoreSyntheticFields = false
kryo.setDefaultSerializer(SerializerFactory.FieldSerializerFactory(defaultFactoryConfig))
setDefaultSerializer(SerializerFactory.FieldSerializerFactory(defaultFactoryConfig))
instantiatorStrategy = CustomInstantiatorStrategy()
addDefaultSerializer(Iterator::class.java, object : SerializerFactory.BaseSerializerFactory<IteratorSerializer>() {
override fun newSerializer(kryo: Kryo, type: Class<*>): IteratorSerializer {
val config = CompatibleFieldSerializer.CompatibleFieldSerializerConfig().apply {
ignoreSyntheticFields = false
extendedFieldNames = true
}
return IteratorSerializer(type, CompatibleFieldSerializer(kryo, type, config))
}
})
// Required for HashCheckingStream (de)serialization.
// Note that return type should be specifically set to InputStream, otherwise it may not work,
// i.e. val aStream : InputStream = HashCheckingStream(...).
addDefaultSerializer(Iterator::class.java, IteratorSerializerFactory)
addDefaultSerializer(InputStream::class.java, InputStreamSerializer)
addDefaultSerializer(SerializeAsToken::class.java, SerializeAsTokenSerializer<SerializeAsToken>())
addDefaultSerializer(Logger::class.java, LoggerSerializer)
addDefaultSerializer(X509Certificate::class.java, X509CertificateSerializer)
addDefaultSerializer(CertPath::class.java, CertPathSerializer)
addDefaultSerializer(PrivateKey::class.java, PrivateKeySerializer)
addDefaultSerializer(PublicKey::class.java, publicKeySerializer)
// WARNING: reordering the registrations here will cause a change in the serialized form, since classes
// with custom serializers get written as registration ids. This will break backwards-compatibility.
// Please add any new registrations to the end.
addDefaultSerializer(LinkedHashMapIteratorSerializer.getIterator()::class.java.superclass, LinkedHashMapIteratorSerializer)
register(LinkedHashMapEntrySerializer.getEntry()::class.java, LinkedHashMapEntrySerializer)
register(LinkedListItrSerializer.getListItr()::class.java, LinkedListItrSerializer)
register(Arrays.asList("").javaClass, ArraysAsListSerializer())
registerIfPackageOpen(linkedMapOf(1 to 1).entries.first()::class.java, { LinkedHashMapEntrySerializer }, fallbackWrite = false)
register(LazyMappedList::class.java, LazyMappedListSerializer)
register(SignedTransaction::class.java, SignedTransactionSerializer)
register(WireTransaction::class.java, WireTransactionSerializer)
register(SerializedBytes::class.java, SerializedBytesSerializer)
if (Collections::class.java.isPackageOpen) {
UnmodifiableCollectionsSerializer.registerSerializers(this)
} else {
registerAsInaccessible(Collections.unmodifiableCollection(listOf("")).javaClass)
registerAsInaccessible(Collections.unmodifiableList(ArrayList<Any>()).javaClass)
registerAsInaccessible(Collections.unmodifiableList(LinkedList<Any>()).javaClass)
registerAsInaccessible(Collections.unmodifiableSet(HashSet<Any>()).javaClass)
registerAsInaccessible(Collections.unmodifiableSortedSet(TreeSet<Any>()).javaClass)
registerAsInaccessible(Collections.unmodifiableMap(HashMap<Any, Any>()).javaClass)
registerAsInaccessible(Collections.unmodifiableSortedMap(TreeMap<Any, Any>()).javaClass)
}
ImmutableListSerializer.registerSerializers(this)
ImmutableSetSerializer.registerSerializers(this)
ImmutableSortedSetSerializer.registerSerializers(this)
ImmutableMapSerializer.registerSerializers(this)
ImmutableMultimapSerializer.registerSerializers(this)
// InputStream subclasses whitelisting, required for attachments.
register(BufferedInputStream::class.java, InputStreamSerializer)
register(Class.forName("sun.net.www.protocol.jar.JarURLConnection\$JarURLInputStream"), InputStreamSerializer)
register(PublicKey::class.java, publicKeySerializer)
register(PrivateKey::class.java, PrivateKeySerializer)
register(EdDSAPublicKey::class.java, publicKeySerializer)
register(EdDSAPrivateKey::class.java, PrivateKeySerializer)
register(CompositeKey::class.java, publicKeySerializer) // Using a custom serializer for compactness
// Exceptions. We don't bother sending the stack traces as the client will fill in its own anyway.
register(Array<StackTraceElement>::class, read = { _, _ -> emptyArray() }, write = { _, _, _ -> })
// This ensures a NonEmptySetSerializer is constructed with an initial value.
register(NonEmptySet::class.java, NonEmptySetSerializer)
register(BitSet::class.java, BitSetSerializer())
register(Class::class.java, ClassSerializer)
register(FileInputStream::class.java, InputStreamSerializer)
register(CertPath::class.java, CertPathSerializer)
register(BCECPrivateKey::class.java, PrivateKeySerializer)
register(BCECPublicKey::class.java, publicKeySerializer)
register(BCRSAPrivateCrtKey::class.java, PrivateKeySerializer)
register(BCRSAPublicKey::class.java, publicKeySerializer)
register(BCSphincs256PrivateKey::class.java, PrivateKeySerializer)
register(BCSphincs256PublicKey::class.java, publicKeySerializer)
register(NotaryChangeWireTransaction::class.java, NotaryChangeWireTransactionSerializer)
register(PartyAndCertificate::class.java, PartyAndCertificateSerializer)
// Don't deserialize PrivacySalt via its default constructor.
register(PrivacySalt::class.java, PrivacySaltSerializer)
register(KeyPair::class.java, KeyPairSerializer)
// Used by the remote verifier, and will possibly be removed in future.
register(ContractAttachment::class.java, ContractAttachmentSerializer)
register(java.lang.invoke.SerializedLambda::class.java)
registerIfPackageOpen(SerializedLambda::class.java, fallbackWrite = false)
register(ClosureSerializer.Closure::class.java, CordaClosureSerializer)
register(ContractUpgradeWireTransaction::class.java, ContractUpgradeWireTransactionSerializer)
register(ContractUpgradeFilteredTransaction::class.java, ContractUpgradeFilteredTransactionSerializer)

View File

@ -1,52 +0,0 @@
package net.corda.nodeapi.internal.serialization.kryo
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import java.lang.reflect.Field
class IteratorSerializer(type: Class<*>, private val serializer: Serializer<Iterator<*>>) : Serializer<Iterator<*>>(false, false) {
private val iterableReferenceField = findField(type, "this\$0")?.apply { isAccessible = true }
private val expectedModCountField = findField(type, "expectedModCount")?.apply { isAccessible = true }
private val iterableReferenceFieldType = iterableReferenceField?.type
private val modCountField = when (iterableReferenceFieldType) {
null -> null
else -> findField(iterableReferenceFieldType, "modCount")?.apply { isAccessible = true }
}
override fun write(kryo: Kryo, output: Output, obj: Iterator<*>) {
serializer.write(kryo, output, obj)
}
override fun read(kryo: Kryo, input: Input, type: Class<out Iterator<*>>): Iterator<*> {
val iterator = serializer.read(kryo, input, type)
return fixIterator(iterator)
}
private fun fixIterator(iterator: Iterator<*>) : Iterator<*> {
// Set expectedModCount of iterator
val iterableInstance = iterableReferenceField?.get(iterator) ?: return iterator
val modCountValue = modCountField?.getInt(iterableInstance) ?: return iterator
expectedModCountField?.setInt(iterator, modCountValue)
return iterator
}
/**
* Find field in clazz or any superclass
*/
private fun findField(clazz: Class<*>, fieldName: String): Field? {
return clazz.declaredFields.firstOrNull { x -> x.name == fieldName } ?: when {
clazz.superclass != null -> {
// Look in superclasses
findField(clazz.superclass, fieldName)
}
else -> null // Not found
}
}
}

View File

@ -0,0 +1,64 @@
package net.corda.nodeapi.internal.serialization.kryo
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.SerializerFactory
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer
import net.corda.core.internal.fullyQualifiedPackage
import java.util.Collections
import java.util.LinkedList
object IteratorSerializerFactory : SerializerFactory.BaseSerializerFactory<Serializer<*>>() {
private val linkedListListIteratorClass = LinkedList<Any>().listIterator()::class.java
override fun newSerializer(kryo: Kryo, type: Class<*>): Serializer<out Iterator<*>> {
return when {
!type.isPackageOpen -> FallbackEmptyIteratorSerializer
type == linkedListListIteratorClass -> LinkedListListIteratorSerializer
else -> {
val config = CompatibleFieldSerializer.CompatibleFieldSerializerConfig().apply {
ignoreSyntheticFields = false
extendedFieldNames = true
}
CompatibleFieldSerializer(kryo, type, config)
}
}
}
private object LinkedListListIteratorSerializer : Serializer<ListIterator<*>>() {
private val outerListField = linkedListListIteratorClass.getDeclaredField("this$0").apply { isAccessible = true }
override fun write(kryo: Kryo, output: Output, obj: ListIterator<*>) {
kryo.writeClassAndObject(output, outerListField.get(obj))
output.writeInt(obj.nextIndex())
}
override fun read(kryo: Kryo, input: Input, type: Class<out ListIterator<*>>): ListIterator<*> {
val list = kryo.readClassAndObject(input) as LinkedList<*>
val index = input.readInt()
return list.listIterator(index)
}
}
private object FallbackEmptyIteratorSerializer : Serializer<Iterator<*>>() {
override fun write(kryo: Kryo, output: Output, obj: Iterator<*>) {
val hasNext = obj.hasNext()
output.writeBoolean(hasNext)
}
override fun read(kryo: Kryo, input: Input, type: Class<out Iterator<*>>): Iterator<*> {
val hasNext = input.readBoolean()
if (hasNext) {
throw UnsupportedOperationException("Restoring checkpoints containing iterators is not supported in this test environment. " +
"If you wish to restore these checkpoints in your tests then use the out-of-process node driver, or add " +
"--add-opens=${type.fullyQualifiedPackage}=ALL-UNNAMED to the test JVM args.")
} else {
// If the iterator didn't have any elements left (which can happen commonly when iterating over a singleton collection) then
// there's no need to make a fuss. We can return an empty iterator and move on.
return Collections.emptyIterator<Any>()
}
}
}
}

View File

@ -1,8 +1,6 @@
package net.corda.nodeapi.internal.serialization.kryo
import com.esotericsoftware.kryo.ClassResolver
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.Registration
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.SerializerFactory
@ -10,13 +8,13 @@ import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer
import com.esotericsoftware.kryo.serializers.FieldSerializer
import com.esotericsoftware.kryo.util.MapReferenceResolver
import net.corda.core.contracts.PrivacySalt
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.DigestService
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.internal.LazyMappedList
import net.corda.core.internal.fullyQualifiedPackage
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.SerializeAsTokenContext
import net.corda.core.serialization.SerializedBytes
@ -28,25 +26,21 @@ import net.corda.core.transactions.NotaryChangeWireTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.SgxSupport
import net.corda.core.utilities.contextLogger
import net.corda.serialization.internal.serializationContextKey
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.io.InputStream
import java.lang.reflect.InvocationTargetException
import java.security.KeyPair
import java.security.PrivateKey
import java.security.PublicKey
import java.security.cert.CertPath
import java.security.cert.CertificateFactory
import java.security.cert.X509Certificate
import java.util.Collections
import java.util.concurrent.ConcurrentHashMap
import javax.annotation.concurrent.ThreadSafe
import kotlin.reflect.KClass
import kotlin.reflect.KMutableProperty
import kotlin.reflect.KParameter
import kotlin.reflect.full.memberProperties
import kotlin.reflect.full.primaryConstructor
import kotlin.reflect.jvm.isAccessible
import kotlin.reflect.jvm.javaType
/**
* Serialization utilities, using the Kryo framework with a custom serializer for immutable data classes and a dead
@ -72,94 +66,6 @@ object SerializedBytesSerializer : Serializer<SerializedBytes<Any>>() {
}
}
/**
* Serializes properties and deserializes by using the constructor. This assumes that all backed properties are
* set via the constructor and the class is immutable.
*/
class ImmutableClassSerializer<T : Any>(val klass: KClass<T>) : Serializer<T>() {
val props by lazy { klass.memberProperties.sortedBy { it.name } }
val propsByName by lazy { props.associateBy { it.name } }
val constructor by lazy { klass.primaryConstructor!! }
init {
// Verify that this class is immutable (all properties are final).
// We disable this check inside SGX as the reflection blows up.
if (!SgxSupport.isInsideEnclave) {
props.forEach {
require(it !is KMutableProperty<*>) { "$it mutable property of class: ${klass} is unsupported" }
}
}
}
// Just a utility to help us catch cases where nodes are running out of sync versions.
private fun hashParameters(params: List<KParameter>): Int {
return params.map {
(it.name ?: "") + it.index.toString() + it.type.javaType.typeName
}.hashCode()
}
override fun write(kryo: Kryo, output: Output, obj: T) {
output.writeVarInt(constructor.parameters.size, true)
output.writeInt(hashParameters(constructor.parameters))
for (param in constructor.parameters) {
val kProperty = propsByName[param.name!!]!!
kProperty.isAccessible = true
when (param.type.javaType.typeName) {
"int" -> output.writeVarInt(kProperty.get(obj) as Int, true)
"long" -> output.writeVarLong(kProperty.get(obj) as Long, true)
"short" -> output.writeShort(kProperty.get(obj) as Int)
"char" -> output.writeChar(kProperty.get(obj) as Char)
"byte" -> output.writeByte(kProperty.get(obj) as Byte)
"double" -> output.writeDouble(kProperty.get(obj) as Double)
"float" -> output.writeFloat(kProperty.get(obj) as Float)
"boolean" -> output.writeBoolean(kProperty.get(obj) as Boolean)
else -> try {
kryo.writeClassAndObject(output, kProperty.get(obj))
} catch (e: Exception) {
throw IllegalStateException("Failed to serialize ${param.name} in ${klass.qualifiedName}", e)
}
}
}
}
@Suppress("ComplexMethod")
override fun read(kryo: Kryo, input: Input, type: Class<out T>): T {
require(type.kotlin == klass)
val numFields = input.readVarInt(true)
val fieldTypeHash = input.readInt()
// A few quick checks for data evolution. Note that this is not guaranteed to catch every problem! But it's
// good enough for a prototype.
if (numFields != constructor.parameters.size)
throw KryoException("Mismatch between number of constructor parameters and number of serialised fields " +
"for ${klass.qualifiedName} ($numFields vs ${constructor.parameters.size})")
if (fieldTypeHash != hashParameters(constructor.parameters))
throw KryoException("Hashcode mismatch for parameter types for ${klass.qualifiedName}: unsupported type evolution has happened.")
val args = arrayOfNulls<Any?>(numFields)
var cursor = 0
for (param in constructor.parameters) {
args[cursor++] = when (param.type.javaType.typeName) {
"int" -> input.readVarInt(true)
"long" -> input.readVarLong(true)
"short" -> input.readShort()
"char" -> input.readChar()
"byte" -> input.readByte()
"double" -> input.readDouble()
"float" -> input.readFloat()
"boolean" -> input.readBoolean()
else -> kryo.readClassAndObject(input)
}
}
// If the constructor throws an exception, pass it through instead of wrapping it.
return try {
constructor.call(*args)
} catch (e: InvocationTargetException) {
throw e.cause!!
}
}
}
// TODO This is a temporary inefficient serializer for sending InputStreams through RPC. This may be done much more
// efficiently using Artemis's large message feature.
object InputStreamSerializer : Serializer<InputStream>() {
@ -187,7 +93,7 @@ object InputStreamSerializer : Serializer<InputStream>() {
chunks.add(chunk)
}
}
val flattened = ByteArray(chunks.sumBy { it.size })
val flattened = ByteArray(chunks.sumOf { it.size })
var offset = 0
for (chunk in chunks) {
System.arraycopy(chunk, 0, flattened, offset, chunk.size)
@ -198,16 +104,6 @@ object InputStreamSerializer : Serializer<InputStream>() {
}
inline fun <T> Kryo.useClassLoader(cl: ClassLoader, body: () -> T): T {
val tmp = this.classLoader ?: ClassLoader.getSystemClassLoader()
this.classLoader = cl
try {
return body()
} finally {
this.classLoader = tmp
}
}
fun Output.writeBytesWithLength(byteArray: ByteArray) {
this.writeInt(byteArray.size, true)
this.writeBytes(byteArray)
@ -301,8 +197,8 @@ object PrivateKeySerializer : Serializer<PrivateKey>() {
}
override fun read(kryo: Kryo, input: Input, type: Class<out PrivateKey>): PrivateKey {
val A = input.readBytesWithLength()
return Crypto.decodePrivateKey(A)
val encodedKey = input.readBytesWithLength()
return Crypto.decodePrivateKey(encodedKey)
}
}
@ -315,63 +211,8 @@ object PublicKeySerializer : Serializer<PublicKey>() {
}
override fun read(kryo: Kryo, input: Input, type: Class<out PublicKey>): PublicKey {
val A = input.readBytesWithLength()
return Crypto.decodePublicKey(A)
}
}
/**
* Helper function for reading lists with number of elements at the beginning.
* @param minLen minimum number of elements we expect for list to include, defaults to 1
* @param expectedLen expected length of the list, defaults to null if arbitrary length list read
*/
inline fun <reified T> readListOfLength(kryo: Kryo, input: Input, minLen: Int = 1, expectedLen: Int? = null): List<T> {
val elemCount = input.readInt()
if (elemCount < minLen) throw KryoException("Cannot deserialize list, too little elements. Minimum required: $minLen, got: $elemCount")
if (expectedLen != null && elemCount != expectedLen)
throw KryoException("Cannot deserialize list, expected length: $expectedLen, got: $elemCount.")
return (1..elemCount).map { kryo.readClassAndObject(input) as T }
}
/**
* We need to disable whitelist checking during calls from our Kryo code to register a serializer, since it checks
* for existing registrations and then will enter our [CordaClassResolver.getRegistration] method.
*/
open class CordaKryo(classResolver: ClassResolver) : Kryo(classResolver, MapReferenceResolver()) {
override fun register(type: Class<*>?): Registration {
(classResolver as? CordaClassResolver)?.disableWhitelist()
try {
return super.register(type)
} finally {
(classResolver as? CordaClassResolver)?.enableWhitelist()
}
}
override fun register(type: Class<*>?, id: Int): Registration {
(classResolver as? CordaClassResolver)?.disableWhitelist()
try {
return super.register(type, id)
} finally {
(classResolver as? CordaClassResolver)?.enableWhitelist()
}
}
override fun register(type: Class<*>?, serializer: Serializer<*>?): Registration {
(classResolver as? CordaClassResolver)?.disableWhitelist()
try {
return super.register(type, serializer)
} finally {
(classResolver as? CordaClassResolver)?.enableWhitelist()
}
}
override fun register(registration: Registration?): Registration {
(classResolver as? CordaClassResolver)?.disableWhitelist()
try {
return super.register(registration)
} finally {
(classResolver as? CordaClassResolver)?.enableWhitelist()
}
val encodedKey = input.readBytesWithLength()
return Crypto.decodePublicKey(encodedKey)
}
}
@ -388,23 +229,60 @@ inline fun <T : Any> Kryo.register(
)
}
internal val Class<*>.isPackageOpen: Boolean get() = module.isOpen(packageName, KryoCheckpointSerializer::class.java.module)
/**
* Use this method to mark any types which can have the same instance within it more than once. This will make sure
* the serialised form is stable across multiple serialise-deserialise cycles. Using this on a type with internal cyclic
* references will throw a stack overflow exception during serialisation.
*
*/
inline fun <reified T : Any> Kryo.noReferencesWithin() {
register(T::class.java, NoReferencesSerializer(getSerializer(T::class.java)))
fun Kryo.registerIfPackageOpen(type: Class<*>, createSerializer: () -> Serializer<*>, fallbackWrite: Boolean = true) {
val serializer = if (type.isPackageOpen) createSerializer() else serializerForInaccesible(type, fallbackWrite)
register(type, serializer)
}
class NoReferencesSerializer<T>(private val baseSerializer: Serializer<T>) : Serializer<T>() {
/**
*
*/
fun Kryo.registerIfPackageOpen(type: Class<*>, fallbackWrite: Boolean = true) {
if (type.isPackageOpen) {
register(type)
} else {
registerAsInaccessible(type, fallbackWrite)
}
}
override fun read(kryo: Kryo, input: Input, type: Class<out T>): T {
return kryo.withoutReferences { baseSerializer.read(kryo, input, type) }
/**
*
*/
fun Kryo.registerAsInaccessible(type: Class<*>, fallbackWrite: Boolean = true) {
register(type, serializerForInaccesible(type, fallbackWrite))
}
private fun Kryo.serializerForInaccesible(type: Class<*>, fallbackWrite: Boolean = true): Serializer<*> {
// Find the most specific serializer already registered to use for writing. This will be useful to make sure as much of the object
// graph is serialised and covered in the writing phase.
return InaccessibleSerializer<Any>(if (fallbackWrite) getSerializer(type) else null)
}
private class InaccessibleSerializer<T : Any>(private val fallbackWrite: Serializer<T>? = null) : Serializer<T>() {
companion object {
private val logger = contextLogger()
private val typesLogged = Collections.newSetFromMap<Class<*>>(ConcurrentHashMap())
}
override fun write(kryo: Kryo, output: Output, obj: T) {
kryo.withoutReferences { baseSerializer.write(kryo, output, obj) }
val type = obj.javaClass
if (typesLogged.add(type)) {
logger.warn("${type.fullyQualifiedPackage} is not open to this test environment and so ${type.name} objects are not " +
"supported in checkpoints. This will most likely not be an issue unless checkpoints are restored.")
}
fallbackWrite?.write(kryo, output, obj)
}
override fun read(kryo: Kryo, input: Input, type: Class<out T>): T {
throw UnsupportedOperationException("Restoring checkpoints containing ${type.name} objects is not supported in this test " +
"environment. If you wish to restore these checkpoints in your tests then use the out-of-process node driver, or add " +
"--add-opens=${type.fullyQualifiedPackage}=ALL-UNNAMED to the test JVM args.")
}
}
@ -490,7 +368,8 @@ class ThrowableSerializer<T>(kryo: Kryo, type: Class<T>) : Serializer<Throwable>
}
}
private val delegate: Serializer<Throwable> = uncheckedCast(SerializerFactory.ReflectionSerializerFactory.newSerializer(kryo, FieldSerializer::class.java, type)) as Serializer<Throwable>
@Suppress("UNCHECKED_CAST")
private val delegate: Serializer<Throwable> = SerializerFactory.ReflectionSerializerFactory.newSerializer(kryo, FieldSerializer::class.java, type) as Serializer<Throwable>
override fun write(kryo: Kryo, output: Output, throwable: Throwable) {
delegate.write(kryo, output, throwable)
@ -509,9 +388,22 @@ class ThrowableSerializer<T>(kryo: Kryo, type: Class<T>) : Serializer<Throwable>
/** For serializing the utility [LazyMappedList]. It will serialize the fully resolved object.*/
@ThreadSafe
@SuppressWarnings("ALL")
object LazyMappedListSerializer : Serializer<List<*>>() {
// Using a MutableList so that Kryo will always write an instance of java.util.ArrayList.
override fun write(kryo: Kryo, output: Output, obj: List<*>) = kryo.writeClassAndObject(output, obj.toMutableList())
override fun read(kryo: Kryo, input: Input, type: Class<out List<*>>) = kryo.readClassAndObject(input) as? List<*>
}
object KeyPairSerializer : Serializer<KeyPair>() {
override fun write(kryo: Kryo, output: Output, obj: KeyPair) {
kryo.writeObject(output, obj.public)
kryo.writeObject(output, obj.private)
}
override fun read(kryo: Kryo, input: Input, type: Class<out KeyPair>): KeyPair {
return KeyPair(
kryo.readObject(input, PublicKey::class.java),
kryo.readObject(input, PrivateKey::class.java)
)
}
}

View File

@ -1,13 +1,21 @@
package net.corda.nodeapi.internal.serialization.kryo
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.io.serialization.kryo.CollectionsSetFromMapSerializer
import co.paralleluniverse.io.serialization.kryo.ExternalizableKryoSerializer
import co.paralleluniverse.io.serialization.kryo.JdkProxySerializer
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import co.paralleluniverse.io.serialization.kryo.ReferenceSerializer
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.serializers.ClosureSerializer
import com.esotericsoftware.kryo.serializers.DefaultSerializers
import com.esotericsoftware.kryo.util.MapReferenceResolver
import de.javakaffee.kryoserializers.GregorianCalendarSerializer
import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.CheckpointCustomSerializer
import net.corda.core.serialization.ClassWhitelist
@ -25,7 +33,24 @@ import net.corda.serialization.internal.CordaSerializationMagic
import net.corda.serialization.internal.QuasarWhitelist
import net.corda.serialization.internal.SectionId
import net.corda.serialization.internal.encodingNotPermittedFormat
import java.io.Externalizable
import java.lang.ref.Reference
import java.lang.reflect.InvocationHandler
import java.net.URI
import java.util.Collections
import java.util.EnumMap
import java.util.EnumSet
import java.util.GregorianCalendar
import java.util.LinkedList
import java.util.TreeMap
import java.util.TreeSet
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import java.util.regex.Pattern
val kryoMagic = CordaSerializationMagic("corda".toByteArray() + byteArrayOf(0, 0))
@ -40,20 +65,27 @@ private object AutoCloseableSerialisationDetector : Serializer<AutoCloseable>()
override fun read(kryo: Kryo, input: Input, type: Class<out AutoCloseable>) = throw IllegalStateException("Should not reach here!")
}
private object FutureSerialisationDetector : Serializer<Future<*>>() {
override fun write(kryo: Kryo, output: Output, future: Future<*>) {
val message = "${future.javaClass.name}, which is a Future, has been detected during flow checkpointing. " +
"Restoring Futures across node restarts is not supported. Make sure code accessing it is " +
"confined to a private method or the reference is nulled out."
throw UnsupportedOperationException(message)
}
override fun read(kryo: Kryo, input: Input, type: Class<out Future<*>>) = throw IllegalStateException("Should not reach here!")
}
object KryoCheckpointSerializer : CheckpointSerializer {
private val kryoPoolsForContexts = ConcurrentHashMap<Triple<ClassWhitelist, ClassLoader, Iterable<CheckpointCustomSerializer<*,*>>>, KryoPool>()
private fun getPool(context: CheckpointSerializationContext): KryoPool {
return kryoPoolsForContexts.computeIfAbsent(Triple(context.whitelist, context.deserializationClassLoader, context.checkpointCustomSerializers)) {
KryoPool {
val classResolver = CordaClassResolver(context)
val serializer = Fiber.getFiberSerializer(classResolver,false) as KryoSerializer
// TODO The ClassResolver can only be set in the Kryo constructor and Quasar doesn't provide us with a way of doing that
val field = Kryo::class.java.getDeclaredField("classResolver").apply { isAccessible = true }
val serializer = createFiberSerializer(context)
serializer.kryo.apply {
field.set(this, classResolver)
// don't allow overriding the public key serializer for checkpointing
DefaultKryoCustomizer.customize(this)
addDefaultSerializer(AutoCloseable::class.java, AutoCloseableSerialisationDetector)
addDefaultSerializer(Future::class.java, FutureSerialisationDetector)
register(ClosureSerializer.Closure::class.java, CordaClosureSerializer)
classLoader = it.second
@ -62,12 +94,75 @@ object KryoCheckpointSerializer : CheckpointSerializer {
warnAboutDuplicateSerializers(customSerializers)
val classToSerializer = mapInputClassToCustomSerializer(context.deserializationClassLoader, customSerializers)
addDefaultCustomSerializers(this, classToSerializer)
referenceResolver
registerCommonClasses(this)
}
}
}
}
fun createFiberSerializer(context: CheckpointSerializationContext): KryoSerializer {
// val serializer = Fiber.getFiberSerializer(classResolver, false) as KryoSerializer
// (this as ReplaceableObjectKryo).isIgnoreInaccessibleClasses = true
val kryo = Kryo(CordaClassResolver(context), MapReferenceResolver())
kryo.isRegistrationRequired = false
// Needed because of https://github.com/EsotericSoftware/kryo/issues/864
kryo.setOptimizedGenerics(false)
DefaultKryoCustomizer.customize(kryo)
return Fiber.getFiberSerializer(kryo, false) as KryoSerializer
}
/**
* Copy of [co.paralleluniverse.io.serialization.kryo.KryoUtil.registerCommonClasses] ...
*/
fun registerCommonClasses(kryo: Kryo) {
kryo.register(BooleanArray::class.java)
kryo.register(ByteArray::class.java)
kryo.register(ShortArray::class.java)
kryo.register(CharArray::class.java)
kryo.register(IntArray::class.java)
kryo.register(FloatArray::class.java)
kryo.register(LongArray::class.java)
kryo.register(DoubleArray::class.java)
kryo.register(Array<String>::class.java)
kryo.register(Array<IntArray>::class.java)
kryo.register(ArrayList::class.java)
kryo.register(LinkedList::class.java)
kryo.register(HashMap::class.java)
kryo.register(LinkedHashMap::class.java)
kryo.register(TreeMap::class.java)
kryo.register(EnumMap::class.java)
kryo.register(HashSet::class.java)
kryo.register(LinkedHashSet::class.java)
kryo.register(TreeSet::class.java)
kryo.register(EnumSet::class.java)
kryo.registerIfPackageOpen(Collections.newSetFromMap(emptyMap<Any, Boolean>()).javaClass, ::CollectionsSetFromMapSerializer)
if (GregorianCalendar::class.java.isPackageOpen) {
// If possible register a more efficient serializer for GregorianCalendar, otherwise a default serializer is already registered.
kryo.register(GregorianCalendar::class.java, GregorianCalendarSerializer())
}
kryo.register(InvocationHandler::class.java, JdkProxySerializer())
if (Collections::class.java.isPackageOpen) {
SynchronizedCollectionsSerializer.registerSerializers(kryo)
} else {
kryo.registerAsInaccessible(Collections.synchronizedCollection(listOf(1)).javaClass)
kryo.registerAsInaccessible(Collections.synchronizedList(ArrayList<Any>()).javaClass)
kryo.registerAsInaccessible(Collections.synchronizedList(LinkedList<Any>()).javaClass)
kryo.registerAsInaccessible(Collections.synchronizedSet(HashSet<Any>()).javaClass)
kryo.registerAsInaccessible(Collections.synchronizedSortedSet(TreeSet<Any>()).javaClass)
kryo.registerAsInaccessible(Collections.synchronizedMap(HashMap<Any, Any>()).javaClass)
kryo.registerAsInaccessible(Collections.synchronizedSortedMap(TreeMap<Any, Any>()).javaClass)
}
kryo.addDefaultSerializer(Externalizable::class.java, ExternalizableKryoSerializer<Externalizable>())
kryo.addDefaultSerializer(Reference::class.java, ReferenceSerializer())
kryo.addDefaultSerializer(URI::class.java, DefaultSerializers.URISerializer::class.java)
kryo.addDefaultSerializer(UUID::class.java, DefaultSerializers.UUIDSerializer::class.java)
kryo.addDefaultSerializer(AtomicBoolean::class.java, DefaultSerializers.AtomicBooleanSerializer::class.java)
kryo.addDefaultSerializer(AtomicInteger::class.java, DefaultSerializers.AtomicIntegerSerializer::class.java)
kryo.addDefaultSerializer(AtomicLong::class.java, DefaultSerializers.AtomicLongSerializer::class.java)
kryo.addDefaultSerializer(Pattern::class.java, DefaultSerializers.PatternSerializer::class.java)
}
/**
* Returns a sorted list of CustomSerializerCheckpointAdaptor based on the custom serializers inside context.
*

View File

@ -13,7 +13,7 @@ import java.io.SequenceInputStream
private val serializationBufferPool = LazyPool(
newInstance = { ByteArray(64 * 1024) })
internal fun <T> kryoInput(underlying: InputStream, task: Input.() -> T): T {
fun <T> kryoInput(underlying: InputStream, task: Input.() -> T): T {
return serializationBufferPool.run {
Input(it).use { input ->
input.inputStream = underlying
@ -22,7 +22,7 @@ internal fun <T> kryoInput(underlying: InputStream, task: Input.() -> T): T {
}
}
internal fun <T> kryoOutput(task: Output.() -> T): ByteArray {
fun <T> kryoOutput(task: Output.() -> T): ByteArray {
return byteArrayOutput { underlying ->
serializationBufferPool.run {
Output(it).use { output ->
@ -33,11 +33,11 @@ internal fun <T> kryoOutput(task: Output.() -> T): ByteArray {
}
}
internal fun Output.substitute(transform: (OutputStream) -> OutputStream) {
fun Output.substitute(transform: (OutputStream) -> OutputStream) {
flush()
outputStream = transform(outputStream)
}
internal fun Input.substitute(transform: (InputStream) -> InputStream) {
fun Input.substitute(transform: (InputStream) -> InputStream) {
inputStream = transform(SequenceInputStream(buffer.copyOfRange(position(), limit()).inputStream(), inputStream))
}

View File

@ -0,0 +1,36 @@
package net.corda.nodeapi.internal.serialization.kryo
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
/**
* The [LinkedHashMap] and [LinkedHashSet] have a problem with the default Quasar/Kryo serialisation
* in that serialising an iterator (and subsequent [LinkedHashMap.Entry]) over a sufficiently large
* data set can lead to a stack overflow (because the object map is traversed recursively).
*
* We've added our own custom serializer in order to ensure that only the key/value are recorded.
* The rest of the list isn't required at this scope.
*/
object LinkedHashMapEntrySerializer : Serializer<Map.Entry<*, *>>() {
// Create a dummy map so that we can get the LinkedHashMap$Entry from it
// The element type of the map doesn't matter. The entry is all we want
private val constructor = linkedMapOf(1L to 1).entries.first()::class.java.declaredConstructors.single().apply { isAccessible = true }
/**
* Kryo would end up serialising "this" entry, then serialise "this.after" recursively, leading to a very large stack.
* we'll skip that and just write out the key/value
*/
override fun write(kryo: Kryo, output: Output, obj: Map.Entry<*, *>) {
val e: Map.Entry<*, *> = obj
kryo.writeClassAndObject(output, e.key)
kryo.writeClassAndObject(output, e.value)
}
override fun read(kryo: Kryo, input: Input, type: Class<out Map.Entry<*, *>>): Map.Entry<*, *> {
val key = kryo.readClassAndObject(input)
val value = kryo.readClassAndObject(input)
return constructor.newInstance(0, key, value, null) as Map.Entry<*, *>
}
}

View File

@ -1,122 +0,0 @@
package net.corda.nodeapi.internal.serialization.kryo
import org.mockito.kotlin.doReturn
import org.mockito.kotlin.whenever
import net.corda.core.serialization.EncodingWhitelist
import net.corda.core.serialization.internal.CheckpointSerializationContext
import net.corda.core.serialization.internal.checkpointDeserialize
import net.corda.core.serialization.internal.checkpointSerialize
import net.corda.coretesting.internal.rigorousMock
import net.corda.serialization.internal.AllWhitelist
import net.corda.serialization.internal.CheckpointSerializationContextImpl
import net.corda.serialization.internal.CordaSerializationEncoding
import net.corda.testing.core.internal.CheckpointSerializationEnvironmentRule
import org.assertj.core.api.Assertions.assertThat
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
import java.util.*
import kotlin.collections.ArrayList
import kotlin.collections.HashMap
import kotlin.collections.HashSet
import kotlin.collections.LinkedHashMap
import kotlin.collections.LinkedHashSet
@RunWith(Parameterized::class)
class ArrayListItrConcurrentModificationException(private val compression: CordaSerializationEncoding?) {
companion object {
@Parameters(name = "{0}")
@JvmStatic
fun compression() = arrayOf<CordaSerializationEncoding?>(null) + CordaSerializationEncoding.values()
}
@get:Rule
val serializationRule = CheckpointSerializationEnvironmentRule(inheritable = true)
private lateinit var context: CheckpointSerializationContext
@Before
fun setup() {
context = CheckpointSerializationContextImpl(
deserializationClassLoader = javaClass.classLoader,
whitelist = AllWhitelist,
properties = emptyMap(),
objectReferencesEnabled = true,
encoding = compression,
encodingWhitelist = rigorousMock<EncodingWhitelist>().also {
if (compression != null) doReturn(true).whenever(it).acceptEncoding(compression)
})
}
@Test(timeout=300_000)
fun `ArrayList iterator can checkpoint without error`() {
runTestWithCollection(ArrayList())
}
@Test(timeout=300_000)
fun `HashSet iterator can checkpoint without error`() {
runTestWithCollection(HashSet())
}
@Test(timeout=300_000)
fun `LinkedHashSet iterator can checkpoint without error`() {
runTestWithCollection(LinkedHashSet())
}
@Test(timeout=300_000)
fun `HashMap iterator can checkpoint without error`() {
runTestWithCollection(HashMap())
}
@Test(timeout=300_000)
fun `LinkedHashMap iterator can checkpoint without error`() {
runTestWithCollection(LinkedHashMap())
}
@Test(timeout=300_000)
fun `LinkedList iterator can checkpoint without error`() {
runTestWithCollection(LinkedList())
}
private data class TestCheckpoint<C,I>(val list: C, val iterator: I)
private fun runTestWithCollection(collection: MutableCollection<Int>) {
for (i in 1..100) {
collection.add(i)
}
val iterator = collection.iterator()
iterator.next()
val checkpoint = TestCheckpoint(collection, iterator)
val serializedBytes = checkpoint.checkpointSerialize(context)
val deserializedCheckpoint = serializedBytes.checkpointDeserialize(context)
assertThat(deserializedCheckpoint.list).isEqualTo(collection)
assertThat(deserializedCheckpoint.iterator.next()).isEqualTo(2)
assertThat(deserializedCheckpoint.iterator.hasNext()).isTrue()
}
private fun runTestWithCollection(collection: MutableMap<Int, Int>) {
for (i in 1..100) {
collection[i] = i
}
val iterator = collection.iterator()
iterator.next()
val checkpoint = TestCheckpoint(collection, iterator)
val serializedBytes = checkpoint.checkpointSerialize(context)
val deserializedCheckpoint = serializedBytes.checkpointDeserialize(context)
assertThat(deserializedCheckpoint.list).isEqualTo(collection)
assertThat(deserializedCheckpoint.iterator.next().key).isEqualTo(2)
assertThat(deserializedCheckpoint.iterator.hasNext()).isTrue()
}
}

View File

@ -1,171 +0,0 @@
package net.corda.nodeapi.internal.serialization.kryo
import org.junit.Ignore
import org.junit.Test
import org.junit.jupiter.api.assertDoesNotThrow
import java.util.LinkedList
import kotlin.test.assertEquals
class KryoCheckpointTest {
private val testSize = 1000L
/**
* This test just ensures that the checkpoints still work in light of [LinkedHashMapEntrySerializer].
*/
@Test(timeout=300_000)
fun `linked hash map can checkpoint without error`() {
var lastKey = ""
val dummyMap = linkedMapOf<String, Long>()
for (i in 0..testSize) {
dummyMap[i.toString()] = i
}
var it = dummyMap.iterator()
while (it.hasNext()) {
lastKey = it.next().key
val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT)
it = KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT)
}
assertEquals(testSize.toString(), lastKey)
}
@Test(timeout=300_000)
fun `empty linked hash map can checkpoint without error`() {
val dummyMap = linkedMapOf<String, Long>()
val it = dummyMap.iterator()
val itKeys = dummyMap.keys.iterator()
val itValues = dummyMap.values.iterator()
val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT)
val bytesKeys = KryoCheckpointSerializer.serialize(itKeys, KRYO_CHECKPOINT_CONTEXT)
val bytesValues = KryoCheckpointSerializer.serialize(itValues, KRYO_CHECKPOINT_CONTEXT)
assertDoesNotThrow {
KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT)
KryoCheckpointSerializer.deserialize(bytesKeys, itKeys.javaClass, KRYO_CHECKPOINT_CONTEXT)
KryoCheckpointSerializer.deserialize(bytesValues, itValues.javaClass, KRYO_CHECKPOINT_CONTEXT)
}
}
@Test(timeout=300_000)
fun `linked hash map with null values can checkpoint without error`() {
val dummyMap = linkedMapOf<String?, Long?>().apply {
put("foo", 2L)
put(null, null)
put("bar", 3L)
}
val it = dummyMap.iterator()
val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT)
val itKeys = dummyMap.keys.iterator()
itKeys.next()
itKeys.next()
val bytesKeys = KryoCheckpointSerializer.serialize(itKeys, KRYO_CHECKPOINT_CONTEXT)
val itValues = dummyMap.values.iterator()
val bytesValues = KryoCheckpointSerializer.serialize(itValues, KRYO_CHECKPOINT_CONTEXT)
assertDoesNotThrow {
KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT)
val desItKeys = KryoCheckpointSerializer.deserialize(bytesKeys, itKeys.javaClass, KRYO_CHECKPOINT_CONTEXT)
assertEquals("bar", desItKeys.next())
KryoCheckpointSerializer.deserialize(bytesValues, itValues.javaClass, KRYO_CHECKPOINT_CONTEXT)
}
}
@Test(timeout=300_000)
fun `linked hash map keys can checkpoint without error`() {
var lastKey = ""
val dummyMap = linkedMapOf<String, Long>()
for (i in 0..testSize) {
dummyMap[i.toString()] = i
}
var it = dummyMap.keys.iterator()
while (it.hasNext()) {
lastKey = it.next()
val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT)
it = KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT)
}
assertEquals(testSize.toString(), lastKey)
}
@Test(timeout=300_000)
fun `linked hash map values can checkpoint without error`() {
var lastValue = 0L
val dummyMap = linkedMapOf<String, Long>()
for (i in 0..testSize) {
dummyMap[i.toString()] = i
}
var it = dummyMap.values.iterator()
while (it.hasNext()) {
lastValue = it.next()
val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT)
it = KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT)
}
assertEquals(testSize, lastValue)
}
@Test(timeout = 300_000)
fun `linked hash map values can checkpoint without error, even with repeats`() {
var lastValue = "0"
val dummyMap = linkedMapOf<String, String>()
for (i in 0..testSize) {
dummyMap[i.toString()] = (i % 10).toString()
}
var it = dummyMap.values.iterator()
while (it.hasNext()) {
lastValue = it.next()
val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT)
it = KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT)
}
assertEquals((testSize % 10).toString(), lastValue)
}
@Ignore("Kryo optimizes boxed primitives so this does not work. Need to customise ReferenceResolver to stop it doing it.")
@Test(timeout = 300_000)
fun `linked hash map values can checkpoint without error, even with repeats for boxed primitives`() {
var lastValue = 0L
val dummyMap = linkedMapOf<String, Long>()
for (i in 0..testSize) {
dummyMap[i.toString()] = (i % 10)
}
var it = dummyMap.values.iterator()
while (it.hasNext()) {
lastValue = it.next()
val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT)
it = KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT)
}
assertEquals(testSize % 10, lastValue)
}
/**
* This test just ensures that the checkpoints still work in light of [LinkedHashMapEntrySerializer].
*/
@Test(timeout=300_000)
fun `linked hash set can checkpoint without error`() {
var result: Any = 0L
val dummySet = linkedSetOf<Any>().apply { addAll(0..testSize) }
var it = dummySet.iterator()
while (it.hasNext()) {
result = it.next()
val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT)
it = KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT)
}
assertEquals(testSize, result)
}
/**
* This test just ensures that the checkpoints still work in light of [LinkedListItrSerializer].
*/
@Test(timeout=300_000)
fun `linked list can checkpoint without error`() {
var result: Any = 0L
val dummyList = LinkedList<Long>().apply { addAll(0..testSize) }
var it = dummyList.iterator()
while (it.hasNext()) {
result = it.next()
val bytes = KryoCheckpointSerializer.serialize(it, KRYO_CHECKPOINT_CONTEXT)
it = KryoCheckpointSerializer.deserialize(bytes, it.javaClass, KRYO_CHECKPOINT_CONTEXT)
}
assertEquals(testSize, result)
}
}

View File

@ -1,9 +1,3 @@
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.security=ALL-UNNAMED
--add-opens=java.base/java.security.cert=ALL-UNNAMED
--add-opens=java.base/java.time=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.sql/java.sql=ALL-UNNAMED

View File

@ -5,7 +5,6 @@ import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy
import de.javakaffee.kryoserializers.ArraysAsListSerializer
import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint
import net.corda.core.contracts.BelongsToContract
import net.corda.core.contracts.Contract
@ -58,7 +57,6 @@ import org.objenesis.strategy.StdInstantiatorStrategy
import java.io.ByteArrayOutputStream
import java.lang.reflect.Modifier
import java.security.PublicKey
import java.util.Arrays
import kotlin.test.assertEquals
import kotlin.test.assertTrue
@ -305,8 +303,6 @@ class CustomSerializationSchemeDriverTest {
kryo.isRegistrationRequired = false
kryo.instantiatorStrategy = CustomInstantiatorStrategy()
kryo.classLoader = classLoader
@Suppress("ReplaceJavaStaticMethodWithKotlinAnalog")
kryo.register(Arrays.asList("").javaClass, ArraysAsListSerializer())
}
//Stolen from DefaultKryoCustomizer.kt

View File

@ -6,10 +6,10 @@ import net.corda.core.flows.StartableByRPC
import net.corda.core.internal.packageName
import net.corda.core.messaging.startFlow
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.schema.NodeSchemaService.NodeCoreV1
import net.corda.node.services.schema.PersistentStateServiceTests.TestSchema
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import net.corda.testing.driver.internal.InProcessImpl
@ -17,11 +17,8 @@ import net.corda.testing.internal.vault.DummyLinearStateSchemaV1
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.cordappsForPackages
import net.corda.testing.node.internal.enclosedCordapp
import org.hibernate.annotations.Cascade
import org.hibernate.annotations.CascadeType
import org.junit.Ignore
import org.junit.Test
import javax.persistence.*
import kotlin.test.assertEquals
import kotlin.test.assertTrue
@ -115,34 +112,4 @@ class NodeSchemaServiceTest {
return (this.serviceHub as ServiceHubInternal).schemaService.schemas.map { it.name }
}
}
class SchemaFamily
object TestSchema : MappedSchema(SchemaFamily::class.java, 1, setOf(Parent::class.java, Child::class.java)) {
@Entity
@Table(name = "Parents")
class Parent : PersistentState() {
@OneToMany(fetch = FetchType.LAZY)
@JoinColumns(JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id"), JoinColumn(name = "output_index", referencedColumnName = "output_index"))
@OrderColumn
@Cascade(CascadeType.PERSIST)
var children: MutableSet<Child> = mutableSetOf()
}
@Suppress("unused")
@Entity
@Table(name = "Children")
class Child {
@Id
@GeneratedValue
@Column(name = "child_id", unique = true, nullable = false)
var childId: Int? = null
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumns(JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id"), JoinColumn(name = "output_index", referencedColumnName = "output_index"))
var parent: Parent? = null
}
}
}

View File

@ -3,6 +3,7 @@ package net.corda.node.internal
import co.paralleluniverse.fibers.instrument.Retransform
import com.codahale.metrics.Gauge
import com.codahale.metrics.MetricRegistry
import com.google.common.collect.ImmutableList
import com.google.common.collect.MutableClassToInstanceMap
import com.google.common.util.concurrent.MoreExecutors
import com.zaxxer.hikari.pool.HikariPool
@ -210,6 +211,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
@Suppress("LeakingThis")
private var tokenizableServices: MutableList<SerializeAsToken>? = mutableListOf(platformClock, this)
private var frozenTokenizableServices: List<SerializeAsToken>? = null
val metricRegistry = MetricRegistry()
protected val cacheFactory = cacheFactoryPrototype.bindWithConfig(configuration).bindWithMetrics(metricRegistry).tokenize()
@ -361,10 +363,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
private val nodeServicesContext = object : NodeServicesContext {
override val platformVersion = versionInfo.platformVersion
override val configurationWithOptions = configuration.configurationWithOptions
// Note: tokenizableServices passed by reference meaning that any subsequent modification to the content in the `AbstractNode` will
// be reflected in the context as well. However, since context only has access to immutable collection it can only read (but not modify)
// the content.
override val tokenizableServices: List<SerializeAsToken> = this@AbstractNode.tokenizableServices!!
override val tokenizableServices: List<SerializeAsToken> get() = this@AbstractNode.frozenTokenizableServices!!
}
private val nodeLifecycleEventsDistributor = NodeLifecycleEventsDistributor().apply { add(checkpointDumper) }
@ -483,6 +482,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
"Node's platform version is lower than network's required minimumPlatformVersion"
}
networkMapCache.start(netParams.notaries)
services.networkParameters = netParams
database.transaction {
networkParametersStorage.setCurrentParameters(signedNetParams, trustRoots)
@ -622,10 +622,10 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
vaultService.start()
ScheduledActivityObserver.install(vaultService, schedulerService, flowLogicRefFactory)
val frozenTokenizableServices = tokenizableServices!!
frozenTokenizableServices = ImmutableList.copyOf(tokenizableServices!!)
tokenizableServices = null
verifyCheckpointsCompatible(frozenTokenizableServices)
verifyCheckpointsCompatible(frozenTokenizableServices!!)
partyInfoCache.start()
encryptionService.start(nodeInfo.legalIdentities[0])
@ -634,7 +634,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
state machine manager from starting (just below this) until the service is ready.
*/
nodeLifecycleEventsDistributor.distributeEvent(NodeLifecycleEvent.BeforeStateMachineStart(nodeServicesContext)).get()
val callback = smm.start(frozenTokenizableServices)
val callback = smm.start(frozenTokenizableServices!!)
val smmStartedFuture = rootFuture.map { callback() }
// Shut down the SMM so no Fibers are scheduled.
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
@ -1205,8 +1205,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
override val attachmentsClassLoaderCache: AttachmentsClassLoaderCache get() = this@AbstractNode.attachmentsClassLoaderCache
@Volatile
private lateinit var _networkParameters: NetworkParameters
override val networkParameters: NetworkParameters get() = _networkParameters
override lateinit var networkParameters: NetworkParameters
init {
this@AbstractNode.attachments.servicesForResolution = this
@ -1214,7 +1213,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
fun start(myInfo: NodeInfo, networkParameters: NetworkParameters) {
this._myInfo = myInfo
this._networkParameters = networkParameters
this.networkParameters = networkParameters
}
override fun <T : SerializeAsToken> cordaService(type: Class<T>): T {
@ -1296,7 +1295,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
}
override fun onNewNetworkParameters(networkParameters: NetworkParameters) {
this._networkParameters = networkParameters
this.networkParameters = networkParameters
}
override fun tryExternalVerification(stx: SignedTransaction, checkSufficientSignatures: Boolean): Boolean {

View File

@ -10,6 +10,7 @@ import net.corda.core.internal.ThreadBox
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.toImmutableList
import net.corda.core.messaging.DataFeed
import net.corda.core.node.services.SignedTransactionWithStatus
import net.corda.core.serialization.SerializationContext
@ -41,7 +42,6 @@ import org.hibernate.annotations.Type
import rx.Observable
import rx.subjects.PublishSubject
import java.time.Instant
import java.util.Collections
import javax.persistence.AttributeConverter
import javax.persistence.Column
import javax.persistence.Convert
@ -398,20 +398,20 @@ open class DBTransactionStorage(private val database: CordaPersistence, cacheFac
}
// Cache value type to just store the immutable bits of a signed transaction plus conversion helpers
internal class TxCacheValue(
private class TxCacheValue(
val txBits: SerializedBytes<CoreTransaction>,
val sigs: List<TransactionSignature>,
val status: TransactionStatus
) {
constructor(stx: SignedTransaction, status: TransactionStatus) : this(
stx.txBits,
Collections.unmodifiableList(stx.sigs),
stx.sigs.toImmutableList(),
status
)
constructor(stx: SignedTransaction, status: TransactionStatus, sigs: List<TransactionSignature>?) : this(
stx.txBits,
if (sigs == null) Collections.unmodifiableList(stx.sigs) else Collections.unmodifiableList(stx.sigs + sigs).distinct(),
if (sigs == null) stx.sigs.toImmutableList() else stx.sigs.toMutableSet().apply { addAll(sigs) }.toImmutableList(),
status
)
fun toSignedTx() = SignedTransaction(txBits, sigs)

View File

@ -32,11 +32,11 @@ import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.isIdempotentFlow
import net.corda.core.internal.location
import net.corda.core.internal.toPath
import net.corda.core.internal.uncheckedCast
import net.corda.core.internal.telemetry.ComponentTelemetryIds
import net.corda.core.internal.telemetry.SerializedTelemetry
import net.corda.core.internal.telemetry.telemetryServiceInternal
import net.corda.core.internal.toPath
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.internal.CheckpointSerializationContext
@ -363,7 +363,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
errorAndTerminate("Caught unrecoverable error from flow. Forcibly terminating the JVM, this might leave resources open, and most likely will.", t)
}
logFlowError(t)
Try.Failure<R>(t)
Try.Failure(t)
}
val softLocksId = if (softLockedStates.isNotEmpty()) logic.runId.uuid else null
val finalEvent = when (resultOrError) {
@ -499,7 +499,6 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
permissionName,
permissionGranted)
serviceHub.auditService.recordAuditEvent(checkPermissionEvent)
@Suppress("ConstantConditionIf")
if (!permissionGranted) {
throw FlowPermissionException("User ${context.principal()} not permissioned for $permissionName on flow $id")
}
@ -540,7 +539,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
val serializationContext = TransientReference(transientValues.checkpointSerializationContext)
val transaction = extractThreadLocalTransaction()
val telemetryIds = retrieveTelemetryIds()
parkAndSerialize { _, _ ->
parkAndCustomSerialize { _ ->
setLoggingContext()
logger.trace { "Suspended on $ioRequest" }

View File

@ -1,25 +1,41 @@
package net.corda.node.services.schema
import net.corda.core.contracts.*
import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionState
import net.corda.core.crypto.SecureHash
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import net.corda.core.schemas.QueryableState
import net.corda.coretesting.internal.rigorousMock
import net.corda.node.services.api.SchemaService
import net.corda.node.services.schema.NodeSchemaServiceTest.TestSchema
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.currentDBSession
import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.TestIdentity
import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.configureDatabase
import net.corda.coretesting.internal.rigorousMock
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.hibernate.annotations.Cascade
import org.hibernate.annotations.CascadeType
import org.junit.After
import org.junit.Before
import org.junit.Test
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.FetchType
import javax.persistence.GeneratedValue
import javax.persistence.Id
import javax.persistence.JoinColumn
import javax.persistence.JoinColumns
import javax.persistence.ManyToOne
import javax.persistence.OneToMany
import javax.persistence.OrderColumn
import javax.persistence.Table
import kotlin.test.assertEquals
class PersistentStateServiceTests {
@ -81,4 +97,32 @@ class PersistentStateServiceTests {
database.close()
}
class SchemaFamily
object TestSchema : MappedSchema(SchemaFamily::class.java, 1, setOf(Parent::class.java, Child::class.java)) {
@Entity
@Table(name = "Parents")
class Parent : PersistentState() {
@OneToMany(fetch = FetchType.LAZY)
@JoinColumns(JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id"), JoinColumn(name = "output_index", referencedColumnName = "output_index"))
@OrderColumn
@Cascade(CascadeType.PERSIST)
var children: MutableSet<Child> = mutableSetOf()
}
@Suppress("unused")
@Entity
@Table(name = "Children")
class Child {
@Id
@GeneratedValue
@Column(name = "child_id", unique = true, nullable = false)
var childId: Int? = null
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumns(JoinColumn(name = "transaction_id", referencedColumnName = "transaction_id"), JoinColumn(name = "output_index", referencedColumnName = "output_index"))
var parent: Parent? = null
}
}
}

View File

@ -10,6 +10,9 @@ import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.messaging.MessageRecipients
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.seconds
@ -41,15 +44,15 @@ class FlowOperatorTests {
val EUGENE_NAME = CordaX500Name("Eugene", "EugeneCorp", "GB")
}
lateinit var mockNet: InternalMockNetwork
lateinit var aliceNode: TestStartedNode
private lateinit var mockNet: InternalMockNetwork
private lateinit var aliceNode: TestStartedNode
private lateinit var aliceParty: Party
lateinit var bobNode: TestStartedNode
private lateinit var bobNode: TestStartedNode
private lateinit var bobParty: Party
lateinit var charlieNode: TestStartedNode
private lateinit var charlieNode: TestStartedNode
private lateinit var charlieParty: Party
lateinit var daveNode: TestStartedNode
lateinit var daveParty: Party
private lateinit var daveNode: TestStartedNode
private lateinit var daveParty: Party
private lateinit var eugeneNode: TestStartedNode
private lateinit var eugeneParty: Party
@ -216,8 +219,7 @@ class FlowOperatorTests {
fun `mixed query should return all flows which are waiting for counter party to process`() {
charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { AcceptingFlow("Hello", it) }
val future = CompletableFuture<String>()
aliceNode.services.startFlow(ExternalAsyncOperationFlow(future))
aliceNode.services.startFlow(ExternalAsyncOperationFlow())
val bobStart = aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(bobParty)))
val daveStart = aliceNode.services.startFlow(GetFlowInfoFlow(listOf(daveParty)))
charlieNode.services.startFlow(ReceiveFlow("Hello", listOf(charlieParty)))
@ -249,8 +251,7 @@ class FlowOperatorTests {
@Test(timeout = 300_000)
fun `query should return all flows which are waiting for counter party (the flow must have counter party) to process grouped by party`() {
val future = CompletableFuture<String>()
aliceNode.services.startFlow(ExternalAsyncOperationFlow(future))
aliceNode.services.startFlow(ExternalAsyncOperationFlow())
val bobStart = aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(bobParty)))
val daveStart = aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(daveParty)))
@ -380,12 +381,11 @@ class FlowOperatorTests {
@Test(timeout = 300_000)
fun `query should return all flows which are waiting for async external operations`() {
val future = CompletableFuture<String>()
val start = aliceNode.services.startFlow(ExternalAsyncOperationFlow(future))
val start = aliceNode.services.startFlow(ExternalAsyncOperationFlow())
val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock)
executeTest(5.seconds, { future.complete("Hello") }) {
executeTest(5.seconds, { aliceNode.services.startFlow(CompleteFutureFlow("Hello")) }) {
val result = cut.queryWaitingFlows(WaitingFlowQuery(
waitingSources = mutableListOf(WaitingSource.EXTERNAL_OPERATION)
)) // the list of counter parties must be empty to get any external operation
@ -400,12 +400,11 @@ class FlowOperatorTests {
@Test(timeout = 300_000)
fun `query should return all flows which are waiting for external operations`() {
val future = CompletableFuture<String>()
val start = aliceNode.services.startFlow(ExternalOperationFlow(future))
val start = aliceNode.services.startFlow(ExternalOperationFlow())
val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock)
executeTest(5.seconds, { future.complete("Hello") }) {
executeTest(5.seconds, { aliceNode.services.startFlow(CompleteFutureFlow("Hello")) }) {
val result = cut.queryWaitingFlows(WaitingFlowQuery())
assertEquals(1, result.size)
@ -512,33 +511,46 @@ class FlowOperatorTests {
}
@InitiatingFlow
class ExternalAsyncOperationFlow(private val future: CompletableFuture<String>) : FlowLogic<Unit>() {
class ExternalAsyncOperationFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
await(ExternalOperation(future))
await(ExternalOperation(serviceHub.cordaService(FutureService::class.java)))
}
class ExternalOperation(private val future: CompletableFuture<String>) : FlowExternalAsyncOperation<String> {
class ExternalOperation(private val futureService: FutureService) : FlowExternalAsyncOperation<String> {
override fun execute(deduplicationId: String): CompletableFuture<String> {
return future
return futureService.future
}
}
}
@InitiatingFlow
class ExternalOperationFlow(private val future: CompletableFuture<String>) : FlowLogic<Unit>() {
class ExternalOperationFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
await(ExternalOperation(future))
await(ExternalOperation(serviceHub.cordaService(FutureService::class.java)))
}
class ExternalOperation(private val future: CompletableFuture<String>) : FlowExternalOperation<String> {
class ExternalOperation(private val futureService: FutureService) : FlowExternalOperation<String> {
override fun execute(deduplicationId: String): String {
return future.get()
return futureService.future.get()
}
}
}
class CompleteFutureFlow(private val value: String) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
serviceHub.cordaService(FutureService::class.java).future.complete(value)
}
}
@Suppress("unused")
@CordaService
class FutureService(private val services: AppServiceHub) : SingletonSerializeAsToken() {
val future = CompletableFuture<String>()
}
@InitiatingFlow
class SleepFlow : FlowLogic<Unit>() {
@Suspendable

View File

@ -6,6 +6,7 @@ import net.corda.core.contracts.StateRef
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.mapToSet
import net.corda.core.node.services.Vault
import net.corda.core.node.services.VaultService
import net.corda.core.node.services.queryBy
@ -34,7 +35,6 @@ import org.junit.After
import org.junit.Assert.assertEquals
import org.junit.Before
import org.junit.Test
import java.lang.IllegalStateException
import java.sql.SQLTransientConnectionException
import java.util.UUID
import kotlin.test.assertFailsWith
@ -75,7 +75,7 @@ class FlowSoftLocksTests {
@Test(timeout=300_000)
fun `flow reserves fungible states with its own flow id and then manually releases them`() {
val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toSet()
val vaultStates = fillVault(aliceNode, 10).states.mapToSet { it.ref }
val softLockActions = arrayOf(
SoftLockAction(SoftLockingAction.LOCK, null, vaultStates, ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = vaultStates),
SoftLockAction(SoftLockingAction.UNLOCK, null, vaultStates, ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.UNLOCKED_ONLY), expectedSoftLockedStates = EMPTY_SET)
@ -87,7 +87,7 @@ class FlowSoftLocksTests {
@Test(timeout=300_000)
fun `flow reserves fungible states with its own flow id and by default releases them when completing`() {
val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toSet()
val vaultStates = fillVault(aliceNode, 10).states.mapToSet { it.ref }
val softLockActions = arrayOf(
SoftLockAction(SoftLockingAction.LOCK, null, vaultStates, ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = vaultStates)
)
@ -98,7 +98,7 @@ class FlowSoftLocksTests {
@Test(timeout=300_000)
fun `flow reserves fungible states with its own flow id and by default releases them when errors`() {
val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toSet()
val vaultStates = fillVault(aliceNode, 10).states.mapToSet { it.ref }
val softLockActions = arrayOf(
SoftLockAction(
SoftLockingAction.LOCK,
@ -106,7 +106,7 @@ class FlowSoftLocksTests {
vaultStates,
ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.LOCKED_ONLY),
expectedSoftLockedStates = vaultStates,
exception = IllegalStateException("Throwing error after flow has soft locked states")
throwException = { throw IllegalStateException("Throwing error after flow has soft locked states") }
)
)
assertFailsWith<IllegalStateException> {
@ -119,7 +119,7 @@ class FlowSoftLocksTests {
@Test(timeout=300_000)
fun `flow reserves fungible states with random id and then manually releases them`() {
val randomId = UUID.randomUUID()
val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toSet()
val vaultStates = fillVault(aliceNode, 10).states.mapToSet { it.ref }
val softLockActions = arrayOf(
SoftLockAction(SoftLockingAction.LOCK, randomId, vaultStates, ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = EMPTY_SET),
SoftLockAction(SoftLockingAction.UNLOCK, randomId, vaultStates, ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.UNLOCKED_ONLY), expectedSoftLockedStates = EMPTY_SET)
@ -132,7 +132,7 @@ class FlowSoftLocksTests {
@Test(timeout=300_000)
fun `flow reserves fungible states with random id and does not release them upon completing`() {
val randomId = UUID.randomUUID()
val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toSet()
val vaultStates = fillVault(aliceNode, 10).states.mapToSet { it.ref }
val softLockActions = arrayOf(
SoftLockAction(SoftLockingAction.LOCK, randomId, vaultStates, ExpectedSoftLocks(vaultStates, QueryCriteria.SoftLockingType.LOCKED_ONLY), expectedSoftLockedStates = EMPTY_SET)
)
@ -145,7 +145,7 @@ class FlowSoftLocksTests {
fun `flow only releases by default reserved states with flow id upon completing`() {
// lock with flow id and random id, dont manually release any. At the end, check that only flow id ones got unlocked.
val randomId = UUID.randomUUID()
val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toList()
val vaultStates = fillVault(aliceNode, 10).states.map { it.ref }
val flowIdStates = vaultStates.subList(0, vaultStates.size / 2).toSet()
val randomIdStates = vaultStates.subList(vaultStates.size / 2, vaultStates.size).toSet()
val softLockActions = arrayOf(
@ -161,7 +161,7 @@ class FlowSoftLocksTests {
@Test(timeout=300_000)
fun `flow reserves fungible states with flow id and random id, then releases the flow id ones - assert the random id ones are still locked`() {
val randomId = UUID.randomUUID()
val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toList()
val vaultStates = fillVault(aliceNode, 10).states.map { it.ref }
val flowIdStates = vaultStates.subList(0, vaultStates.size / 2).toSet()
val randomIdStates = vaultStates.subList(vaultStates.size / 2, vaultStates.size).toSet()
val softLockActions = arrayOf(
@ -178,7 +178,7 @@ class FlowSoftLocksTests {
@Test(timeout=300_000)
fun `flow reserves fungible states with flow id and random id, then releases the random id ones - assert the flow id ones are still locked inside the flow`() {
val randomId = UUID.randomUUID()
val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toList()
val vaultStates = fillVault(aliceNode, 10).states.map { it.ref }
val flowIdStates = vaultStates.subList(0, vaultStates.size / 2).toSet()
val randomIdStates = vaultStates.subList(vaultStates.size / 2, vaultStates.size).toSet()
val softLockActions = arrayOf(
@ -206,7 +206,7 @@ class FlowSoftLocksTests {
@Test(timeout=300_000)
fun `when flow soft locks, then errors and retries from previous checkpoint, softLockedStates are reverted back correctly`() {
val randomId = UUID.randomUUID()
val vaultStates = fillVault(aliceNode, 10)!!.states.map { it.ref }.toList()
val vaultStates = fillVault(aliceNode, 10).states.map { it.ref }
val flowIdStates = vaultStates.subList(0, vaultStates.size / 2).toSet()
val randomIdStates = vaultStates.subList(vaultStates.size / 2, vaultStates.size).toSet()
val softLockActions = arrayOf(
@ -226,7 +226,7 @@ class FlowSoftLocksTests {
randomIdStates,
ExpectedSoftLocks(EMPTY_SET, QueryCriteria.SoftLockingType.LOCKED_ONLY),
expectedSoftLockedStates = EMPTY_SET,
exception = SQLTransientConnectionException("connection is not available")
throwException = { throw SQLTransientConnectionException("connection is not available") }
)
)
val flowCompleted = aliceNode.services.startFlow(LockingUnlockingFlow(softLockActions)).resultFuture.getOrThrow(30.seconds)
@ -235,7 +235,7 @@ class FlowSoftLocksTests {
LockingUnlockingFlow.throwOnlyOnce = true
}
private fun fillVault(node: TestStartedNode, thisManyStates: Int): Vault<Cash.State>? {
private fun fillVault(node: TestStartedNode, thisManyStates: Int): Vault<Cash.State> {
val bankNode = mockNet.createPartyNode(BOC_NAME)
val bank = bankNode.info.singleIdentity()
val cashIssuer = bank.ref(1)
@ -265,7 +265,7 @@ data class SoftLockAction(val action: SoftLockingAction,
val states: Set<StateRef>,
val expectedSoftLocks: ExpectedSoftLocks,
val expectedSoftLockedStates: Set<StateRef>,
val exception: Exception? = null,
val throwException: (() -> Nothing)? = null,
val doCheckpoint: Boolean = false)
internal class LockingUnlockingFlow(private val softLockActions: Array<SoftLockAction>): FlowLogic<Boolean>() {
@ -296,10 +296,10 @@ internal class LockingUnlockingFlow(private val softLockActions: Array<SoftLockA
}
}
softLockAction.exception?.let {
if (softLockAction.throwException != null) {
if (throwOnlyOnce) {
throwOnlyOnce = false
throw it
softLockAction.throwException.invoke()
}
}
}

View File

@ -19,7 +19,6 @@ import net.corda.core.serialization.internal.CheckpointSerializationContext
import net.corda.coretesting.internal.rigorousMock
import net.corda.node.services.attachments.NodeAttachmentTrustCalculator
import net.corda.nodeapi.internal.serialization.kryo.CordaClassResolver
import net.corda.nodeapi.internal.serialization.kryo.CordaKryo
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.services.InternalMockAttachmentStorage
@ -173,12 +172,6 @@ class CordaClassResolverTests {
CordaClassResolver(emptyWhitelistContext).getRegistration(Interface::class.java)
}
@Test(timeout=300_000)
fun `Calling register method on modified Kryo does not consult the whitelist`() {
val kryo = CordaKryo(CordaClassResolver(emptyWhitelistContext))
kryo.register(NotSerializable::class.java)
}
@Test(timeout=300_000)
fun `Calling register method on unmodified Kryo does consult the whitelist`() {
val kryo = Kryo(CordaClassResolver(emptyWhitelistContext), MapReferenceResolver())

View File

@ -1,5 +1,8 @@
package net.corda.serialization.internal.amqp
import net.corda.core.internal.toImmutableList
import net.corda.core.internal.toImmutableSet
import net.corda.core.internal.toImmutableSortedSet
import net.corda.core.serialization.SerializationContext
import net.corda.core.utilities.NonEmptySet
import net.corda.serialization.internal.model.LocalTypeInformation
@ -9,8 +12,10 @@ import org.apache.qpid.proton.codec.Data
import java.io.NotSerializableException
import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
import java.util.*
import kotlin.collections.LinkedHashSet
import java.util.Collections
import java.util.NavigableSet
import java.util.SortedSet
import java.util.TreeSet
/**
* Serialization / deserialization of predefined set of supported [Collection] types covering mostly [List]s and [Set]s.
@ -26,9 +31,9 @@ class CollectionSerializer(private val declaredType: ParameterizedType, factory:
// NB: Order matters in this map, the most specific classes should be listed at the end
private val supportedTypes: Map<Class<out Collection<*>>, (List<*>) -> Collection<*>> = Collections.unmodifiableMap(linkedMapOf(
Collection::class.java to { list -> Collections.unmodifiableCollection(list) },
List::class.java to { list -> Collections.unmodifiableList(list) },
Set::class.java to { list -> Collections.unmodifiableSet(LinkedHashSet(list)) },
SortedSet::class.java to { list -> Collections.unmodifiableSortedSet(TreeSet(list)) },
List::class.java to { list -> list.toImmutableList() },
Set::class.java to { list -> list.toImmutableSet() },
SortedSet::class.java to { list -> list.toImmutableSortedSet() },
NavigableSet::class.java to { list -> Collections.unmodifiableNavigableSet(TreeSet(list)) },
NonEmptySet::class.java to { list -> NonEmptySet.copyOf(list) }
))

View File

@ -1,5 +1,7 @@
package net.corda.serialization.internal.amqp
import net.corda.core.internal.toImmutableMap
import net.corda.core.internal.toImmutableSortedMap
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.SerializationContext
import net.corda.serialization.internal.model.LocalTypeInformation
@ -9,8 +11,13 @@ import org.apache.qpid.proton.codec.Data
import java.io.NotSerializableException
import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
import java.util.*
import kotlin.collections.LinkedHashMap
import java.util.Collections
import java.util.Dictionary
import java.util.EnumMap
import java.util.NavigableMap
import java.util.SortedMap
import java.util.TreeMap
import java.util.WeakHashMap
private typealias MapCreationFunction = (Map<*, *>) -> Map<*, *>
@ -26,8 +33,8 @@ class MapSerializer(private val declaredType: ParameterizedType, factory: LocalS
// NB: Order matters in this map, the most specific classes should be listed at the end
private val supportedTypes: Map<Class<out Map<*, *>>, MapCreationFunction> = Collections.unmodifiableMap(linkedMapOf(
// Interfaces
Map::class.java to { map -> Collections.unmodifiableMap(map) },
SortedMap::class.java to { map -> Collections.unmodifiableSortedMap(TreeMap(map)) },
Map::class.java to { map -> map.toImmutableMap() },
SortedMap::class.java to { map -> map.toImmutableSortedMap() },
NavigableMap::class.java to { map -> Collections.unmodifiableNavigableMap(TreeMap(map)) },
// concrete classes for user convenience
LinkedHashMap::class.java to { map -> LinkedHashMap(map) },

View File

@ -64,6 +64,7 @@ include 'experimental:quasar-hook'
include 'experimental:corda-utils'
include 'experimental:nodeinfo'
include 'experimental:netparams'
include 'experimental:raft-tests'
include 'test-common'
include 'test-cli'
include 'test-utils'
@ -105,6 +106,7 @@ include 'samples:network-verifier:workflows'
include 'serialization'
include 'serialization-1.2'
include 'serialization-tests'
include 'checkpoint-tests'
include 'testing:cordapps:dbfailure:dbfcontracts'
include 'testing:cordapps:dbfailure:dbfworkflows'
include 'testing:cordapps:missingmigration'