Fix cache races. (#4311)

This commit is contained in:
Tudor Malene 2018-11-27 16:36:02 +00:00 committed by GitHub
parent 5d2ad46553
commit 06ebb97a31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 17 additions and 10 deletions

View File

@ -1,6 +1,7 @@
package net.corda.client.rpc.internal.serialization.amqp package net.corda.client.rpc.internal.serialization.amqp
import net.corda.core.cordapp.Cordapp import net.corda.core.cordapp.Cordapp
import net.corda.core.internal.toSynchronised
import net.corda.core.serialization.ClassWhitelist import net.corda.core.serialization.ClassWhitelist
import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationContext.UseCase import net.corda.core.serialization.SerializationContext.UseCase
@ -19,19 +20,19 @@ class AMQPClientSerializationScheme(
cordappCustomSerializers: Set<SerializationCustomSerializer<*,*>>, cordappCustomSerializers: Set<SerializationCustomSerializer<*,*>>,
serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory> serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>
) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) { ) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) {
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, AccessOrderLinkedHashMap { 128 }) constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, AccessOrderLinkedHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>(128).toSynchronised())
constructor(cordapps: List<Cordapp>, serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>) : this(cordapps.customSerializers, serializerFactoriesForContexts) constructor(cordapps: List<Cordapp>, serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>) : this(cordapps.customSerializers, serializerFactoriesForContexts)
@Suppress("UNUSED") @Suppress("UNUSED")
constructor() : this(emptySet(), AccessOrderLinkedHashMap { 128 }) constructor() : this(emptySet(), AccessOrderLinkedHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>(128).toSynchronised())
companion object { companion object {
/** Call from main only. */ /** Call from main only. */
fun initialiseSerialization(classLoader: ClassLoader? = null, serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory> = AccessOrderLinkedHashMap { 128 }) { fun initialiseSerialization(classLoader: ClassLoader? = null, serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory> = AccessOrderLinkedHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>(128).toSynchronised()) {
nodeSerializationEnv = createSerializationEnv(classLoader, serializerFactoriesForContexts) nodeSerializationEnv = createSerializationEnv(classLoader, serializerFactoriesForContexts)
} }
fun createSerializationEnv(classLoader: ClassLoader? = null, serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory> = AccessOrderLinkedHashMap { 128 }): SerializationEnvironment { fun createSerializationEnv(classLoader: ClassLoader? = null, serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory> = AccessOrderLinkedHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>(128).toSynchronised()): SerializationEnvironment {
return SerializationEnvironment.with( return SerializationEnvironment.with(
SerializationFactoryImpl().apply { SerializationFactoryImpl().apply {
registerScheme(AMQPClientSerializationScheme(emptyList(), serializerFactoriesForContexts)) registerScheme(AMQPClientSerializationScheme(emptyList(), serializerFactoriesForContexts))

View File

@ -1,6 +1,7 @@
package net.corda.node.serialization.amqp package net.corda.node.serialization.amqp
import net.corda.core.cordapp.Cordapp import net.corda.core.cordapp.Cordapp
import net.corda.core.internal.toSynchronised
import net.corda.core.serialization.ClassWhitelist import net.corda.core.serialization.ClassWhitelist
import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationCustomSerializer import net.corda.core.serialization.SerializationCustomSerializer
@ -19,10 +20,10 @@ class AMQPServerSerializationScheme(
cordappCustomSerializers: Set<SerializationCustomSerializer<*, *>>, cordappCustomSerializers: Set<SerializationCustomSerializer<*, *>>,
serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory> serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>
) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) { ) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) {
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, AccessOrderLinkedHashMap { 128 }) constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, AccessOrderLinkedHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>(128).toSynchronised())
constructor(cordapps: List<Cordapp>, serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>) : this(cordapps.customSerializers, serializerFactoriesForContexts) constructor(cordapps: List<Cordapp>, serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>) : this(cordapps.customSerializers, serializerFactoriesForContexts)
constructor() : this(emptySet(), AccessOrderLinkedHashMap { 128 }) constructor() : this(emptySet(), AccessOrderLinkedHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>(128).toSynchronised() )
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
throw UnsupportedOperationException() throw UnsupportedOperationException()

View File

@ -8,6 +8,8 @@ import com.nhaarman.mockito_kotlin.mock
import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSerializer import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSerializer
import net.corda.core.context.Trace import net.corda.core.context.Trace
import net.corda.core.internal.ThreadBox import net.corda.core.internal.ThreadBox
import net.corda.core.internal.toSynchronised
import net.corda.core.serialization.ClassWhitelist
import net.corda.node.internal.serialization.testutils.AMQPRoundTripRPCSerializationScheme import net.corda.node.internal.serialization.testutils.AMQPRoundTripRPCSerializationScheme
import net.corda.node.internal.serialization.testutils.serializationContext import net.corda.node.internal.serialization.testutils.serializationContext
import net.corda.node.serialization.amqp.RpcServerObservableSerializer import net.corda.node.serialization.amqp.RpcServerObservableSerializer
@ -16,6 +18,7 @@ import net.corda.nodeapi.RPCApi
import net.corda.serialization.internal.amqp.AccessOrderLinkedHashMap import net.corda.serialization.internal.amqp.AccessOrderLinkedHashMap
import net.corda.serialization.internal.amqp.DeserializationInput import net.corda.serialization.internal.amqp.DeserializationInput
import net.corda.serialization.internal.amqp.SerializationOutput import net.corda.serialization.internal.amqp.SerializationOutput
import net.corda.serialization.internal.amqp.SerializerFactory
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.Test import org.junit.Test
import rx.Notification import rx.Notification
@ -60,7 +63,7 @@ class RoundTripObservableSerializerTests {
@Test @Test
fun roundTripTest1() { fun roundTripTest1() {
val serializationScheme = AMQPRoundTripRPCSerializationScheme( val serializationScheme = AMQPRoundTripRPCSerializationScheme(
serializationContext, emptySet(), AccessOrderLinkedHashMap { 128 }) serializationContext, emptySet(), AccessOrderLinkedHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>(128).toSynchronised())
// Fake up a message ID, needs to be used on both "sides". The server setting it in the subscriptionMap, // Fake up a message ID, needs to be used on both "sides". The server setting it in the subscriptionMap,
// the client as a property of the deserializer which, in the actual RPC client, is pulled off of // the client as a property of the deserializer which, in the actual RPC client, is pulled off of

View File

@ -24,7 +24,7 @@ import net.corda.client.rpc.internal.ObservableContext as ClientObservableContex
class AMQPRoundTripRPCSerializationScheme( class AMQPRoundTripRPCSerializationScheme(
private val serializationContext: SerializationContext, private val serializationContext: SerializationContext,
cordappCustomSerializers: Set<SerializationCustomSerializer<*, *>>, cordappCustomSerializers: Set<SerializationCustomSerializer<*, *>>,
serializerFactoriesForContexts: AccessOrderLinkedHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>) serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>)
: AbstractAMQPSerializationScheme( : AbstractAMQPSerializationScheme(
cordappCustomSerializers, serializerFactoriesForContexts cordappCustomSerializers, serializerFactoriesForContexts
) { ) {

View File

@ -9,6 +9,7 @@ import net.corda.core.StubOutForDJVM
import net.corda.core.cordapp.Cordapp import net.corda.core.cordapp.Cordapp
import net.corda.core.internal.isAbstractClass import net.corda.core.internal.isAbstractClass
import net.corda.core.internal.objectOrNewInstance import net.corda.core.internal.objectOrNewInstance
import net.corda.core.internal.toSynchronised
import net.corda.core.internal.uncheckedCast import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.* import net.corda.core.serialization.*
import net.corda.core.utilities.ByteSequence import net.corda.core.utilities.ByteSequence
@ -44,7 +45,7 @@ abstract class AbstractAMQPSerializationScheme(
val sff: SerializerFactoryFactory = createSerializerFactoryFactory() val sff: SerializerFactoryFactory = createSerializerFactoryFactory()
) : SerializationScheme { ) : SerializationScheme {
@DeleteForDJVM @DeleteForDJVM
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, AccessOrderLinkedHashMap(128)) constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, AccessOrderLinkedHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>(128).toSynchronised())
// This is a bit gross but a broader check for ConcurrentMap is not allowed inside DJVM. // This is a bit gross but a broader check for ConcurrentMap is not allowed inside DJVM.
private val serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory> = if (maybeNotConcurrentSerializerFactoriesForContexts is AccessOrderLinkedHashMap<*, *>) { private val serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory> = if (maybeNotConcurrentSerializerFactoriesForContexts is AccessOrderLinkedHashMap<*, *>) {

View File

@ -1,5 +1,6 @@
package net.corda.serialization.internal.amqp package net.corda.serialization.internal.amqp
import net.corda.core.internal.toSynchronised
import net.corda.core.serialization.ClassWhitelist import net.corda.core.serialization.ClassWhitelist
import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.deserialize import net.corda.core.serialization.deserialize
@ -39,7 +40,7 @@ class AbstractAMQPSerializationSchemeTest {
val factory = SerializerFactoryBuilder.build(TESTING_CONTEXT.whitelist, TESTING_CONTEXT.deserializationClassLoader) val factory = SerializerFactoryBuilder.build(TESTING_CONTEXT.whitelist, TESTING_CONTEXT.deserializationClassLoader)
val maxFactories = 512 val maxFactories = 512
val backingMap = AccessOrderLinkedHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>({ maxFactories }) val backingMap = AccessOrderLinkedHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>({ maxFactories }).toSynchronised()
val scheme = object : AbstractAMQPSerializationScheme(emptySet(), backingMap, createSerializerFactoryFactory()) { val scheme = object : AbstractAMQPSerializationScheme(emptySet(), backingMap, createSerializerFactoryFactory()) {
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
return factory return factory