From 84d94d44addc9a795799e832df487e50797d6da4 Mon Sep 17 00:00:00 2001 From: Chris Rankin Date: Tue, 15 May 2018 12:03:33 +0100 Subject: [PATCH] ENT-1463: Hide more AMQP ConcurrentHashMaps behind interfaces. (#3147) --- .../serialization/SerializationScheme.kt | 10 ++++--- .../amqp/AMQPSerializationScheme.kt | 29 ++++++++++++------- .../amqp/SerializationSchemaTests.kt | 3 +- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt index 33962ab02b..a94aad8786 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt @@ -92,7 +92,12 @@ internal class AttachmentsClassLoaderBuilder(private val properties: Map, SerializationScheme> +) : SerializationFactory() { + constructor() : this(ConcurrentHashMap()) + companion object { val magicSize = sequenceOf(kryoMagic, amqpMagic).map { it.size }.distinct().single() } @@ -103,9 +108,6 @@ open class SerializationFactoryImpl : SerializationFactory() { private val logger = LoggerFactory.getLogger(javaClass) - // TODO: This is read-mostly. Probably a faster implementation to be found. - private val schemes: ConcurrentHashMap, SerializationScheme> = ConcurrentHashMap() - private fun schemeFor(byteSequence: ByteSequence, target: SerializationContext.UseCase): Pair { // truncate sequence to at most magicSize, and make sure it's a copy to avoid holding onto large ByteArrays val magic = CordaSerializationMagic(byteSequence.slice(end = magicSize).copyBytes()) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt index 8811a63eac..505228a377 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt @@ -35,10 +35,11 @@ interface SerializerFactoryFactory { } abstract class AbstractAMQPSerializationScheme( - private val cordappCustomSerializers: Set>, - val sff: SerializerFactoryFactory = createSerializerFactoryFactory() + private val cordappCustomSerializers: Set>, + private val serializerFactoriesForContexts: MutableMap, SerializerFactory>, + val sff: SerializerFactoryFactory = createSerializerFactoryFactory() ) : SerializationScheme { - constructor(cordapps: List) : this(cordapps.customSerializers) + constructor(cordapps: List) : this(cordapps.customSerializers, ConcurrentHashMap()) // TODO: This method of initialisation for the Whitelist and plugin serializers will have to change // when we have per-cordapp contexts and dynamic app reloading but for now it's the easiest way @@ -128,8 +129,6 @@ abstract class AbstractAMQPSerializationScheme( } } - private val serializerFactoriesForContexts = ConcurrentHashMap, SerializerFactory>() - protected abstract fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory protected abstract fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory protected open val publicKeySerializer: CustomSerializer.Implements = net.corda.nodeapi.internal.serialization.amqp.custom.PublicKeySerializer @@ -165,9 +164,13 @@ abstract class AbstractAMQPSerializationScheme( } // TODO: This will eventually cover server RPC as well and move to node module, but for now this is not implemented -class AMQPServerSerializationScheme(cordappCustomSerializers: Set> = emptySet()) - : AbstractAMQPSerializationScheme(cordappCustomSerializers) { - constructor(cordapps: List) : this(cordapps.customSerializers) +class AMQPServerSerializationScheme( + cordappCustomSerializers: Set>, + serializerFactoriesForContexts: MutableMap, SerializerFactory> +) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) { + constructor(cordapps: List) : this(cordapps.customSerializers, ConcurrentHashMap()) + + constructor() : this(emptySet(), ConcurrentHashMap()) override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { throw UnsupportedOperationException() @@ -185,9 +188,13 @@ class AMQPServerSerializationScheme(cordappCustomSerializers: Set> = emptySet()) - : AbstractAMQPSerializationScheme(cordappCustomSerializers) { - constructor(cordapps: List) : this(cordapps.customSerializers) +class AMQPClientSerializationScheme( + cordappCustomSerializers: Set>, + serializerFactoriesForContexts: MutableMap, SerializerFactory> +) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) { + constructor(cordapps: List) : this(cordapps.customSerializers, ConcurrentHashMap()) + + constructor() : this(emptySet(), ConcurrentHashMap()) override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { throw UnsupportedOperationException() diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationSchemaTests.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationSchemaTests.kt index a9c75359b7..cd49520e42 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationSchemaTests.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationSchemaTests.kt @@ -4,6 +4,7 @@ import net.corda.core.serialization.* import net.corda.core.utilities.ByteSequence import net.corda.nodeapi.internal.serialization.* import org.junit.Test +import java.util.concurrent.ConcurrentHashMap import kotlin.test.assertEquals // Make sure all serialization calls in this test don't get stomped on by anything else @@ -43,7 +44,7 @@ class TestSerializerFactoryFactory : SerializerFactoryFactoryImpl() { } } -class AMQPTestSerializationScheme : AbstractAMQPSerializationScheme(emptySet(), TestSerializerFactoryFactory()) { +class AMQPTestSerializationScheme : AbstractAMQPSerializationScheme(emptySet(), ConcurrentHashMap(), TestSerializerFactoryFactory()) { override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { throw UnsupportedOperationException() }