diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/AMQPClientSerializationScheme.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/AMQPClientSerializationScheme.kt index 6e094e87ee..473443bc9a 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/AMQPClientSerializationScheme.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/serialization/amqp/AMQPClientSerializationScheme.kt @@ -1,6 +1,7 @@ package net.corda.client.rpc.internal.serialization.amqp import net.corda.core.cordapp.Cordapp +import net.corda.core.internal.toSynchronised import net.corda.core.serialization.ClassWhitelist import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationContext.UseCase @@ -19,19 +20,19 @@ class AMQPClientSerializationScheme( cordappCustomSerializers: Set>, serializerFactoriesForContexts: MutableMap, SerializerFactory> ) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) { - constructor(cordapps: List) : this(cordapps.customSerializers, AccessOrderLinkedHashMap { 128 }) + constructor(cordapps: List) : this(cordapps.customSerializers, AccessOrderLinkedHashMap, SerializerFactory>(128).toSynchronised()) constructor(cordapps: List, serializerFactoriesForContexts: MutableMap, SerializerFactory>) : this(cordapps.customSerializers, serializerFactoriesForContexts) @Suppress("UNUSED") - constructor() : this(emptySet(), AccessOrderLinkedHashMap { 128 }) + constructor() : this(emptySet(), AccessOrderLinkedHashMap, SerializerFactory>(128).toSynchronised()) companion object { /** Call from main only. */ - fun initialiseSerialization(classLoader: ClassLoader? = null, serializerFactoriesForContexts: MutableMap, SerializerFactory> = AccessOrderLinkedHashMap { 128 }) { + fun initialiseSerialization(classLoader: ClassLoader? = null, serializerFactoriesForContexts: MutableMap, SerializerFactory> = AccessOrderLinkedHashMap, SerializerFactory>(128).toSynchronised()) { nodeSerializationEnv = createSerializationEnv(classLoader, serializerFactoriesForContexts) } - fun createSerializationEnv(classLoader: ClassLoader? = null, serializerFactoriesForContexts: MutableMap, SerializerFactory> = AccessOrderLinkedHashMap { 128 }): SerializationEnvironment { + fun createSerializationEnv(classLoader: ClassLoader? = null, serializerFactoriesForContexts: MutableMap, SerializerFactory> = AccessOrderLinkedHashMap, SerializerFactory>(128).toSynchronised()): SerializationEnvironment { return SerializationEnvironment.with( SerializationFactoryImpl().apply { registerScheme(AMQPClientSerializationScheme(emptyList(), serializerFactoriesForContexts)) diff --git a/node/src/main/kotlin/net/corda/node/serialization/amqp/AMQPServerSerializationScheme.kt b/node/src/main/kotlin/net/corda/node/serialization/amqp/AMQPServerSerializationScheme.kt index 1cb164b0eb..e1cc44d805 100644 --- a/node/src/main/kotlin/net/corda/node/serialization/amqp/AMQPServerSerializationScheme.kt +++ b/node/src/main/kotlin/net/corda/node/serialization/amqp/AMQPServerSerializationScheme.kt @@ -1,6 +1,7 @@ package net.corda.node.serialization.amqp import net.corda.core.cordapp.Cordapp +import net.corda.core.internal.toSynchronised import net.corda.core.serialization.ClassWhitelist import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationCustomSerializer @@ -19,10 +20,10 @@ class AMQPServerSerializationScheme( cordappCustomSerializers: Set>, serializerFactoriesForContexts: MutableMap, SerializerFactory> ) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) { - constructor(cordapps: List) : this(cordapps.customSerializers, AccessOrderLinkedHashMap { 128 }) + constructor(cordapps: List) : this(cordapps.customSerializers, AccessOrderLinkedHashMap, SerializerFactory>(128).toSynchronised()) constructor(cordapps: List, serializerFactoriesForContexts: MutableMap, SerializerFactory>) : this(cordapps.customSerializers, serializerFactoriesForContexts) - constructor() : this(emptySet(), AccessOrderLinkedHashMap { 128 }) + constructor() : this(emptySet(), AccessOrderLinkedHashMap, SerializerFactory>(128).toSynchronised() ) override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { throw UnsupportedOperationException() diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt index 9a0a8d042c..4e7729d20e 100644 --- a/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/RoundTripObservableSerializerTests.kt @@ -8,6 +8,8 @@ import com.nhaarman.mockito_kotlin.mock import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSerializer import net.corda.core.context.Trace import net.corda.core.internal.ThreadBox +import net.corda.core.internal.toSynchronised +import net.corda.core.serialization.ClassWhitelist import net.corda.node.internal.serialization.testutils.AMQPRoundTripRPCSerializationScheme import net.corda.node.internal.serialization.testutils.serializationContext import net.corda.node.serialization.amqp.RpcServerObservableSerializer @@ -16,6 +18,7 @@ import net.corda.nodeapi.RPCApi import net.corda.serialization.internal.amqp.AccessOrderLinkedHashMap import net.corda.serialization.internal.amqp.DeserializationInput import net.corda.serialization.internal.amqp.SerializationOutput +import net.corda.serialization.internal.amqp.SerializerFactory import org.apache.activemq.artemis.api.core.SimpleString import org.junit.Test import rx.Notification @@ -60,7 +63,7 @@ class RoundTripObservableSerializerTests { @Test fun roundTripTest1() { val serializationScheme = AMQPRoundTripRPCSerializationScheme( - serializationContext, emptySet(), AccessOrderLinkedHashMap { 128 }) + serializationContext, emptySet(), AccessOrderLinkedHashMap, SerializerFactory>(128).toSynchronised()) // Fake up a message ID, needs to be used on both "sides". The server setting it in the subscriptionMap, // the client as a property of the deserializer which, in the actual RPC client, is pulled off of diff --git a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt index 7a803cb661..86629bcbed 100644 --- a/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt +++ b/node/src/test/kotlin/net/corda/node/internal/serialization/testutils/AMQPTestSerialiationScheme.kt @@ -24,7 +24,7 @@ import net.corda.client.rpc.internal.ObservableContext as ClientObservableContex class AMQPRoundTripRPCSerializationScheme( private val serializationContext: SerializationContext, cordappCustomSerializers: Set>, - serializerFactoriesForContexts: AccessOrderLinkedHashMap, SerializerFactory>) + serializerFactoriesForContexts: MutableMap, SerializerFactory>) : AbstractAMQPSerializationScheme( cordappCustomSerializers, serializerFactoriesForContexts ) { diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/AMQPSerializationScheme.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/AMQPSerializationScheme.kt index b9feeccc0a..7cbe22b116 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/AMQPSerializationScheme.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/AMQPSerializationScheme.kt @@ -9,6 +9,7 @@ import net.corda.core.StubOutForDJVM import net.corda.core.cordapp.Cordapp import net.corda.core.internal.isAbstractClass import net.corda.core.internal.objectOrNewInstance +import net.corda.core.internal.toSynchronised import net.corda.core.internal.uncheckedCast import net.corda.core.serialization.* import net.corda.core.utilities.ByteSequence @@ -44,7 +45,7 @@ abstract class AbstractAMQPSerializationScheme( val sff: SerializerFactoryFactory = createSerializerFactoryFactory() ) : SerializationScheme { @DeleteForDJVM - constructor(cordapps: List) : this(cordapps.customSerializers, AccessOrderLinkedHashMap(128)) + constructor(cordapps: List) : this(cordapps.customSerializers, AccessOrderLinkedHashMap, SerializerFactory>(128).toSynchronised()) // This is a bit gross but a broader check for ConcurrentMap is not allowed inside DJVM. private val serializerFactoriesForContexts: MutableMap, SerializerFactory> = if (maybeNotConcurrentSerializerFactoriesForContexts is AccessOrderLinkedHashMap<*, *>) { diff --git a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/AbstractAMQPSerializationSchemeTest.kt b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/AbstractAMQPSerializationSchemeTest.kt index eaaeffb043..b8be1f9b3b 100644 --- a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/AbstractAMQPSerializationSchemeTest.kt +++ b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/AbstractAMQPSerializationSchemeTest.kt @@ -1,5 +1,6 @@ package net.corda.serialization.internal.amqp +import net.corda.core.internal.toSynchronised import net.corda.core.serialization.ClassWhitelist import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.deserialize @@ -39,7 +40,7 @@ class AbstractAMQPSerializationSchemeTest { val factory = SerializerFactoryBuilder.build(TESTING_CONTEXT.whitelist, TESTING_CONTEXT.deserializationClassLoader) val maxFactories = 512 - val backingMap = AccessOrderLinkedHashMap, SerializerFactory>({ maxFactories }) + val backingMap = AccessOrderLinkedHashMap, SerializerFactory>({ maxFactories }).toSynchronised() val scheme = object : AbstractAMQPSerializationScheme(emptySet(), backingMap, createSerializerFactoryFactory()) { override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { return factory