ENT-2659 Eliminate lots of contention in serialization (#4120)

* Remove most contentious bit of mutex in the codebase
Fix up SerializerFactory
Make getSerializerFactory() non-blocking
Workaround for DJVM issues

* Some clean up and also de-contend the RPC client.

* Fix up attachment class loader code.

* Bug fix
This commit is contained in:
Rick Parker 2018-10-30 15:26:46 +00:00 committed by GitHub
parent 30fedec343
commit 13815d252e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 100 additions and 73 deletions

View File

@ -1,20 +1,23 @@
package net.corda.client.rpc
import com.github.benmanes.caffeine.cache.Caffeine
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.messaging.ClientRpcSslOptions
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.serialization.ClassWhitelist
import net.corda.core.serialization.internal.effectiveSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.messaging.ClientRpcSslOptions
import net.corda.core.utilities.days
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcConnectorTcpTransport
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.serialization.internal.AMQP_RPC_CLIENT_CONTEXT
import net.corda.serialization.internal.amqp.SerializerFactory
import java.time.Duration
/**
@ -293,7 +296,7 @@ class CordaRPCClient private constructor(
effectiveSerializationEnv
} catch (e: IllegalStateException) {
try {
AMQPClientSerializationScheme.initialiseSerialization(classLoader)
AMQPClientSerializationScheme.initialiseSerialization(classLoader, Caffeine.newBuilder().maximumSize(128).build<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>().asMap())
} catch (e: IllegalStateException) {
// Race e.g. two of these constructed in parallel, ignore.
}

View File

@ -3,7 +3,7 @@ package net.corda.client.rpc.internal.serialization.amqp
import net.corda.core.cordapp.Cordapp
import net.corda.core.serialization.ClassWhitelist
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationContext.*
import net.corda.core.serialization.SerializationContext.UseCase
import net.corda.core.serialization.SerializationCustomSerializer
import net.corda.core.serialization.internal.SerializationEnvironment
import net.corda.core.serialization.internal.nodeSerializationEnv
@ -19,24 +19,25 @@ import net.corda.serialization.internal.amqp.custom.RxNotificationSerializer
* This scheme is for use by the RPC Client calls.
*/
class AMQPClientSerializationScheme(
cordappCustomSerializers: Set<SerializationCustomSerializer<*,*>>,
serializerFactoriesForContexts: AccessOrderLinkedHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>
cordappCustomSerializers: Set<SerializationCustomSerializer<*,*>>,
serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>
) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) {
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, AccessOrderLinkedHashMap { 128 })
constructor(cordapps: List<Cordapp>, serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>) : this(cordapps.customSerializers, serializerFactoriesForContexts)
@Suppress("UNUSED")
constructor() : this(emptySet(), AccessOrderLinkedHashMap { 128 })
companion object {
/** Call from main only. */
fun initialiseSerialization(classLoader: ClassLoader? = null) {
nodeSerializationEnv = createSerializationEnv(classLoader)
fun initialiseSerialization(classLoader: ClassLoader? = null, serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory> = AccessOrderLinkedHashMap { 128 }) {
nodeSerializationEnv = createSerializationEnv(classLoader, serializerFactoriesForContexts)
}
fun createSerializationEnv(classLoader: ClassLoader? = null): SerializationEnvironment {
fun createSerializationEnv(classLoader: ClassLoader? = null, serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory> = AccessOrderLinkedHashMap { 128 }): SerializationEnvironment {
return SerializationEnvironment.with(
SerializationFactoryImpl().apply {
registerScheme(AMQPClientSerializationScheme(emptyList()))
registerScheme(AMQPClientSerializationScheme(emptyList(), serializerFactoriesForContexts))
},
storageContext = AMQP_STORAGE_CONTEXT,
p2pContext = if (classLoader != null) AMQP_P2P_CONTEXT.withClassLoader(classLoader) else AMQP_P2P_CONTEXT,

View File

@ -3,6 +3,7 @@ package net.corda.node.internal
import com.codahale.metrics.JmxReporter
import com.codahale.metrics.MetricFilter
import com.codahale.metrics.MetricRegistry
import com.github.benmanes.caffeine.cache.Caffeine
import com.palominolabs.metrics.newrelic.AllEnabledMetricAttributeFilter
import com.palominolabs.metrics.newrelic.NewRelicReporter
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
@ -22,6 +23,7 @@ import net.corda.core.messaging.RPCOps
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.serialization.ClassWhitelist
import net.corda.core.serialization.internal.SerializationEnvironment
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
@ -54,6 +56,7 @@ import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.persistence.CouldNotCreateDataSourceException
import net.corda.serialization.internal.*
import net.corda.serialization.internal.amqp.SerializerFactory
import org.apache.commons.lang.SystemUtils
import org.h2.jdbc.JdbcSQLException
import org.slf4j.Logger
@ -471,8 +474,8 @@ open class Node(configuration: NodeConfiguration,
val classloader = cordappLoader.appClassLoader
nodeSerializationEnv = SerializationEnvironment.with(
SerializationFactoryImpl().apply {
registerScheme(AMQPServerSerializationScheme(cordappLoader.cordapps))
registerScheme(AMQPClientSerializationScheme(cordappLoader.cordapps))
registerScheme(AMQPServerSerializationScheme(cordappLoader.cordapps, Caffeine.newBuilder().maximumSize(128).build<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>().asMap()))
registerScheme(AMQPClientSerializationScheme(cordappLoader.cordapps, Caffeine.newBuilder().maximumSize(128).build<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>().asMap()))
},
p2pContext = AMQP_P2P_CONTEXT.withClassLoader(classloader),
rpcServerContext = AMQP_RPC_SERVER_CONTEXT.withClassLoader(classloader),

View File

@ -9,7 +9,6 @@ import net.corda.serialization.internal.amqp.AbstractAMQPSerializationScheme
import net.corda.serialization.internal.amqp.AccessOrderLinkedHashMap
import net.corda.serialization.internal.amqp.SerializerFactory
import net.corda.serialization.internal.amqp.custom.RxNotificationSerializer
import java.util.concurrent.ConcurrentHashMap
/**
* When set as the serialization scheme, defines the RPC Server serialization scheme as using the Corda
@ -17,9 +16,10 @@ import java.util.concurrent.ConcurrentHashMap
*/
class AMQPServerSerializationScheme(
cordappCustomSerializers: Set<SerializationCustomSerializer<*, *>>,
serializerFactoriesForContexts: AccessOrderLinkedHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>
serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>
) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) {
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, AccessOrderLinkedHashMap { 128 })
constructor(cordapps: List<Cordapp>, serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>) : this(cordapps.customSerializers, serializerFactoriesForContexts)
constructor() : this(emptySet(), AccessOrderLinkedHashMap { 128 })

View File

@ -1,13 +1,12 @@
package net.corda.serialization.internal
import net.corda.core.crypto.SecureHash
import java.lang.ClassLoader
/**
* Drop-in replacement for [AttachmentsClassLoaderBuilder] in the serialization module.
* This version is not strongly-coupled to [net.corda.core.node.ServiceHub].
*/
@Suppress("UNUSED", "UNUSED_PARAMETER")
internal class AttachmentsClassLoaderBuilder(private val properties: Map<Any, Any>, private val deserializationClassLoader: ClassLoader) {
fun build(attachmentHashes: List<SecureHash>): AttachmentsClassLoader? = null
internal class AttachmentsClassLoaderBuilder() {
fun build(attachmentHashes: List<SecureHash>, properties: Map<Any, Any>, deserializationClassLoader: ClassLoader): AttachmentsClassLoader? = null
}

View File

@ -2,8 +2,11 @@ package net.corda.serialization.internal
import net.corda.core.KeepForDJVM
import net.corda.core.crypto.SecureHash
import net.corda.core.serialization.*
import net.corda.core.serialization.ClassWhitelist
import net.corda.core.serialization.EncodingWhitelist
import net.corda.core.serialization.SerializationEncoding
import net.corda.core.serialization.internal.CheckpointSerializationContext
import java.lang.UnsupportedOperationException
@KeepForDJVM
data class CheckpointSerializationContextImpl @JvmOverloads constructor(
@ -13,17 +16,13 @@ data class CheckpointSerializationContextImpl @JvmOverloads constructor(
override val objectReferencesEnabled: Boolean,
override val encoding: SerializationEncoding?,
override val encodingWhitelist: EncodingWhitelist = NullEncodingWhitelist) : CheckpointSerializationContext {
private val builder = AttachmentsClassLoaderBuilder(properties, deserializationClassLoader)
/**
* {@inheritDoc}
*
* We need to cache the AttachmentClassLoaders to avoid too many contexts, since the class loader is part of cache key for the context.
* Unsupported for checkpoints.
*/
override fun withAttachmentsClassLoader(attachmentHashes: List<SecureHash>): CheckpointSerializationContext {
properties[attachmentsClassLoaderEnabledPropertyName] as? Boolean == true || return this
val classLoader = builder.build(attachmentHashes) ?: return this
return withClassLoader(classLoader)
throw UnsupportedOperationException()
}
override fun withProperty(property: Any, value: Any): CheckpointSerializationContext {

View File

@ -31,8 +31,10 @@ data class SerializationContextImpl @JvmOverloads constructor(override val prefe
override val useCase: SerializationContext.UseCase,
override val encoding: SerializationEncoding?,
override val encodingWhitelist: EncodingWhitelist = NullEncodingWhitelist,
override val lenientCarpenterEnabled: Boolean = false) : SerializationContext {
private val builder = AttachmentsClassLoaderBuilder(properties, deserializationClassLoader)
override val lenientCarpenterEnabled: Boolean = false,
private val builder: AttachmentsClassLoaderBuilder = AttachmentsClassLoaderBuilder()
) : SerializationContext {
/**
* {@inheritDoc}
@ -41,7 +43,7 @@ data class SerializationContextImpl @JvmOverloads constructor(override val prefe
*/
override fun withAttachmentsClassLoader(attachmentHashes: List<SecureHash>): SerializationContext {
properties[attachmentsClassLoaderEnabledPropertyName] as? Boolean == true || return this
val classLoader = builder.build(attachmentHashes) ?: return this
val classLoader = builder.build(attachmentHashes, properties, deserializationClassLoader) ?: return this
return withClassLoader(classLoader)
}
@ -75,13 +77,13 @@ data class SerializationContextImpl @JvmOverloads constructor(override val prefe
* can replace it with an alternative version.
*/
@DeleteForDJVM
internal class AttachmentsClassLoaderBuilder(private val properties: Map<Any, Any>, private val deserializationClassLoader: ClassLoader) {
private val cache: Cache<List<SecureHash>, AttachmentsClassLoader> = Caffeine.newBuilder().weakValues().maximumSize(1024).build()
class AttachmentsClassLoaderBuilder() {
private val cache: Cache<Pair<List<SecureHash>, ClassLoader>, AttachmentsClassLoader> = Caffeine.newBuilder().weakValues().maximumSize(1024).build()
fun build(attachmentHashes: List<SecureHash>): AttachmentsClassLoader? {
fun build(attachmentHashes: List<SecureHash>, properties: Map<Any, Any>, deserializationClassLoader: ClassLoader): AttachmentsClassLoader? {
val serializationContext = properties[serializationContextKey] as? SerializeAsTokenContext ?: return null // Some tests don't set one.
try {
return cache.get(attachmentHashes) {
return cache.get(Pair(attachmentHashes, deserializationClassLoader)) {
val missing = ArrayList<SecureHash>()
val attachments = ArrayList<Attachment>()
attachmentHashes.forEach { id ->
@ -120,12 +122,13 @@ open class SerializationFactoryImpl(
// truncate sequence to at most magicSize, and make sure it's a copy to avoid holding onto large ByteArrays
val magic = CordaSerializationMagic(byteSequence.slice(end = magicSize).copyBytes())
val lookupKey = magic to target
return schemes.computeIfAbsent(lookupKey) {
// ConcurrentHashMap.get() is lock free, but computeIfAbsent is not, even if the key is in the map already.
return (schemes[lookupKey] ?: schemes.computeIfAbsent(lookupKey) {
registeredSchemes.filter { it.canDeserializeVersion(magic, target) }.forEach { return@computeIfAbsent it } // XXX: Not single?
logger.warn("Cannot find serialization scheme for: [$lookupKey, " +
"${if (magic == amqpMagic) "AMQP" else "UNKNOWN MAGIC"}] registeredSchemes are: $registeredSchemes")
throw UnsupportedOperationException("Serialization scheme $lookupKey not supported.")
} to magic
}) to magic
}
@Throws(NotSerializableException::class)

View File

@ -40,12 +40,19 @@ interface SerializerFactoryFactory {
@KeepForDJVM
abstract class AbstractAMQPSerializationScheme(
private val cordappCustomSerializers: Set<SerializationCustomSerializer<*, *>>,
private val serializerFactoriesForContexts: AccessOrderLinkedHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>,
maybeNotConcurrentSerializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>,
val sff: SerializerFactoryFactory = createSerializerFactoryFactory()
) : SerializationScheme {
@DeleteForDJVM
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, AccessOrderLinkedHashMap(128))
// This is a bit gross but a broader check for ConcurrentMap is not allowed inside DJVM.
private val serializerFactoriesForContexts: MutableMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory> = if (maybeNotConcurrentSerializerFactoriesForContexts is AccessOrderLinkedHashMap<*, *>) {
Collections.synchronizedMap(maybeNotConcurrentSerializerFactoriesForContexts)
} else {
maybeNotConcurrentSerializerFactoriesForContexts
}
// TODO: This method of initialisation for the Whitelist and plugin serializers will have to change
// when we have per-cordapp contexts and dynamic app reloading but for now it's the easiest way
companion object {
@ -166,8 +173,9 @@ abstract class AbstractAMQPSerializationScheme(
open val publicKeySerializer: CustomSerializer<*> = net.corda.serialization.internal.amqp.custom.PublicKeySerializer
private fun getSerializerFactory(context: SerializationContext): SerializerFactory {
return synchronized(serializerFactoriesForContexts) {
serializerFactoriesForContexts.computeIfAbsent(Pair(context.whitelist, context.deserializationClassLoader)) {
val key = Pair(context.whitelist, context.deserializationClassLoader)
// ConcurrentHashMap.get() is lock free, but computeIfAbsent is not, even if the key is in the map already.
return serializerFactoriesForContexts[key] ?: serializerFactoriesForContexts.computeIfAbsent(key) {
when (context.useCase) {
SerializationContext.UseCase.RPCClient ->
rpcClientSerializerFactory(context)
@ -178,7 +186,6 @@ abstract class AbstractAMQPSerializationScheme(
registerCustomSerializers(context, it)
}
}
}
}
override fun <T : Any> deserialize(byteSequence: ByteSequence, clazz: Class<T>, context: SerializationContext): T {

View File

@ -7,7 +7,10 @@ import net.corda.core.StubOutForDJVM
import net.corda.core.internal.kotlinObjectInstance
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.ClassWhitelist
import net.corda.core.utilities.*
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
import net.corda.serialization.internal.carpenter.*
import org.apache.qpid.proton.amqp.*
import java.io.NotSerializableException
@ -95,6 +98,9 @@ open class SerializerFactory(
val classloader: ClassLoader get() = classCarpenter.classloader
// Used to short circuit any computation for a given input, for performance.
private data class MemoType(val actualClass: Class<*>?, val declaredType: Type) : Type
/**
* Look up, and manufacture if necessary, a serializer for the given type.
*
@ -106,50 +112,56 @@ open class SerializerFactory(
// can be useful to enable but will be *extremely* chatty if you do
logger.trace { "Get Serializer for $actualClass ${declaredType.typeName}" }
val declaredClass = declaredType.asClass()
val actualType: Type = if (actualClass == null) declaredType
else inferTypeVariables(actualClass, declaredClass, declaredType) ?: declaredType
val ourType = MemoType(actualClass, declaredType)
// ConcurrentHashMap.get() is lock free, but computeIfAbsent is not, even if the key is in the map already.
return serializersByType[ourType] ?: run {
val serializer = when {
val declaredClass = declaredType.asClass()
val actualType: Type = if (actualClass == null) declaredType
else inferTypeVariables(actualClass, declaredClass, declaredType) ?: declaredType
val serializer = when {
// Declared class may not be set to Collection, but actual class could be a collection.
// In this case use of CollectionSerializer is perfectly appropriate.
(Collection::class.java.isAssignableFrom(declaredClass) ||
(actualClass != null && Collection::class.java.isAssignableFrom(actualClass))) &&
!EnumSet::class.java.isAssignableFrom(actualClass ?: declaredClass) -> {
val declaredTypeAmended = CollectionSerializer.deriveParameterizedType(declaredType, declaredClass, actualClass)
serializersByType.computeIfAbsent(declaredTypeAmended) {
CollectionSerializer(declaredTypeAmended, this)
(Collection::class.java.isAssignableFrom(declaredClass) ||
(actualClass != null && Collection::class.java.isAssignableFrom(actualClass))) &&
!EnumSet::class.java.isAssignableFrom(actualClass ?: declaredClass) -> {
val declaredTypeAmended = CollectionSerializer.deriveParameterizedType(declaredType, declaredClass, actualClass)
serializersByType.computeIfAbsent(declaredTypeAmended) {
CollectionSerializer(declaredTypeAmended, this)
}
}
}
// Declared class may not be set to Map, but actual class could be a map.
// In this case use of MapSerializer is perfectly appropriate.
(Map::class.java.isAssignableFrom(declaredClass) ||
(actualClass != null && Map::class.java.isAssignableFrom(actualClass))) -> {
val declaredTypeAmended = MapSerializer.deriveParameterizedType(declaredType, declaredClass, actualClass)
serializersByType.computeIfAbsent(declaredTypeAmended) {
makeMapSerializer(declaredTypeAmended)
}
}
Enum::class.java.isAssignableFrom(actualClass ?: declaredClass) -> {
logger.trace {
"class=[${actualClass?.simpleName} | $declaredClass] is an enumeration " +
"declaredType=${declaredType.typeName} " +
"isEnum=${declaredType::class.java.isEnum}"
(Map::class.java.isAssignableFrom(declaredClass) ||
(actualClass != null && Map::class.java.isAssignableFrom(actualClass))) -> {
val declaredTypeAmended = MapSerializer.deriveParameterizedType(declaredType, declaredClass, actualClass)
serializersByType.computeIfAbsent(declaredTypeAmended) {
makeMapSerializer(declaredTypeAmended)
}
}
Enum::class.java.isAssignableFrom(actualClass ?: declaredClass) -> {
logger.trace {
"class=[${actualClass?.simpleName} | $declaredClass] is an enumeration " +
"declaredType=${declaredType.typeName} " +
"isEnum=${declaredType::class.java.isEnum}"
}
serializersByType.computeIfAbsent(actualClass ?: declaredClass) {
whitelist.requireWhitelisted(actualType)
EnumSerializer(actualType, actualClass ?: declaredClass, this)
serializersByType.computeIfAbsent(actualClass ?: declaredClass) {
whitelist.requireWhitelisted(actualType)
EnumSerializer(actualType, actualClass ?: declaredClass, this)
}
}
else -> {
makeClassSerializer(actualClass ?: declaredClass, actualType, declaredType)
}
}
else -> {
makeClassSerializer(actualClass ?: declaredClass, actualType, declaredType)
}
serializersByDescriptor.putIfAbsent(serializer.typeDescriptor, serializer)
// Always store the short-circuit too, for performance.
serializersByType.putIfAbsent(ourType, serializer)
return serializer
}
serializersByDescriptor.putIfAbsent(serializer.typeDescriptor, serializer)
return serializer
}
/**

View File

@ -63,9 +63,9 @@ class StaticInitialisationOfSerializedObjectTest {
// we can't actually construct one
sf.get(null, D::class.java)
// post creation of the serializer we should have one element in the map, this
// post creation of the serializer we should have two elements in the map, this
// proves we didn't statically construct an instance of C when building the serializer
assertEquals(1, serialisersByType.size)
assertEquals(2, serialisersByType.size)
}
@Test