diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index 2def3fbf20..1ef330d3e1 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -7,6 +7,7 @@ import net.corda.core.messaging.CordaRPCOps import net.corda.core.utilities.NetworkHostAndPort import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport import net.corda.nodeapi.ConnectionDirection +import net.corda.nodeapi.internal.serialization.AMQP_RPC_CLIENT_CONTEXT import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT import java.time.Duration diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/KryoClientSerializationScheme.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/KryoClientSerializationScheme.kt index 2275cd0d3f..e044117699 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/KryoClientSerializationScheme.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/KryoClientSerializationScheme.kt @@ -5,6 +5,11 @@ import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationDefaults import net.corda.core.utilities.ByteSequence import net.corda.nodeapi.internal.serialization.* +import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme +import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme +import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer +import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1 +import net.corda.nodeapi.internal.serialization.kryo.RPCKryo import java.util.concurrent.atomic.AtomicBoolean class KryoClientSerializationScheme : AbstractKryoSerializationScheme() { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ClientContexts.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ClientContexts.kt index bd6135522a..dc3712d56e 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ClientContexts.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ClientContexts.kt @@ -4,15 +4,25 @@ package net.corda.nodeapi.internal.serialization import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationDefaults +import net.corda.nodeapi.internal.serialization.amqp.AmqpHeaderV1_0 +import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1 /* * Serialisation contexts for the client. * These have been refactored into a separate file to prevent * servers from trying to instantiate any of them. */ + val KRYO_RPC_CLIENT_CONTEXT = SerializationContextImpl(KryoHeaderV0_1, SerializationDefaults.javaClass.classLoader, GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), emptyMap(), true, SerializationContext.UseCase.RPCClient) + +val AMQP_RPC_CLIENT_CONTEXT = SerializationContextImpl(AmqpHeaderV1_0, + SerializationDefaults.javaClass.classLoader, + GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), + emptyMap(), + true, + SerializationContext.UseCase.RPCClient) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/CordaClassResolver.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/CordaClassResolver.kt index 97c50dcd67..52690a1780 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/CordaClassResolver.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/CordaClassResolver.kt @@ -12,6 +12,7 @@ import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SerializationContext import net.corda.core.utilities.loggerFor import net.corda.nodeapi.internal.serialization.amqp.hasAnnotationInHierarchy +import net.corda.nodeapi.internal.serialization.kryo.ThrowableSerializer import java.io.PrintWriter import java.lang.reflect.Modifier.isAbstract import java.nio.charset.StandardCharsets diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt index a6be235f88..304340f342 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt @@ -1,26 +1,13 @@ package net.corda.nodeapi.internal.serialization -import co.paralleluniverse.fibers.Fiber -import co.paralleluniverse.io.serialization.kryo.KryoSerializer -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.KryoException -import com.esotericsoftware.kryo.Serializer -import com.esotericsoftware.kryo.io.Input -import com.esotericsoftware.kryo.io.Output -import com.esotericsoftware.kryo.pool.KryoPool -import com.esotericsoftware.kryo.serializers.ClosureSerializer import com.google.common.cache.Cache import com.google.common.cache.CacheBuilder import net.corda.core.contracts.Attachment import net.corda.core.crypto.SecureHash -import net.corda.core.internal.LazyPool -import net.corda.core.internal.uncheckedCast import net.corda.core.serialization.* import net.corda.core.utilities.ByteSequence -import net.corda.core.utilities.OpaqueBytes import net.corda.nodeapi.internal.AttachmentsClassLoader import org.slf4j.LoggerFactory -import java.io.ByteArrayOutputStream import java.io.NotSerializableException import java.util.* import java.util.concurrent.ConcurrentHashMap @@ -158,124 +145,8 @@ open class SerializationFactoryImpl : SerializationFactory() { override fun hashCode(): Int = registeredSchemes.hashCode() } -private object AutoCloseableSerialisationDetector : Serializer() { - override fun write(kryo: Kryo, output: Output, closeable: AutoCloseable) { - val message = "${closeable.javaClass.name}, which is a closeable resource, has been detected during flow checkpointing. " + - "Restoring such resources across node restarts is not supported. Make sure code accessing it is " + - "confined to a private method or the reference is nulled out." - throw UnsupportedOperationException(message) - } - - override fun read(kryo: Kryo, input: Input, type: Class) = throw IllegalStateException("Should not reach here!") -} - -abstract class AbstractKryoSerializationScheme : SerializationScheme { - private val kryoPoolsForContexts = ConcurrentHashMap, KryoPool>() - - protected abstract fun rpcClientKryoPool(context: SerializationContext): KryoPool - protected abstract fun rpcServerKryoPool(context: SerializationContext): KryoPool - - private fun getPool(context: SerializationContext): KryoPool { - return kryoPoolsForContexts.computeIfAbsent(Pair(context.whitelist, context.deserializationClassLoader)) { - when (context.useCase) { - SerializationContext.UseCase.Checkpoint -> - KryoPool.Builder { - val serializer = Fiber.getFiberSerializer(false) as KryoSerializer - val classResolver = CordaClassResolver(context).apply { setKryo(serializer.kryo) } - // TODO The ClassResolver can only be set in the Kryo constructor and Quasar doesn't provide us with a way of doing that - val field = Kryo::class.java.getDeclaredField("classResolver").apply { isAccessible = true } - serializer.kryo.apply { - field.set(this, classResolver) - DefaultKryoCustomizer.customize(this) - addDefaultSerializer(AutoCloseable::class.java, AutoCloseableSerialisationDetector) - register(ClosureSerializer.Closure::class.java, CordaClosureSerializer) - classLoader = it.second - } - }.build() - SerializationContext.UseCase.RPCClient -> - rpcClientKryoPool(context) - SerializationContext.UseCase.RPCServer -> - rpcServerKryoPool(context) - else -> - KryoPool.Builder { - DefaultKryoCustomizer.customize(CordaKryo(CordaClassResolver(context))).apply { classLoader = it.second } - }.build() - } - } - } - - private fun withContext(kryo: Kryo, context: SerializationContext, block: (Kryo) -> T): T { - kryo.context.ensureCapacity(context.properties.size) - context.properties.forEach { kryo.context.put(it.key, it.value) } - try { - return block(kryo) - } finally { - kryo.context.clear() - } - } - - override fun deserialize(byteSequence: ByteSequence, clazz: Class, context: SerializationContext): T { - val pool = getPool(context) - val headerSize = KryoHeaderV0_1.size - val header = byteSequence.take(headerSize) - if (header != KryoHeaderV0_1) { - throw KryoException("Serialized bytes header does not match expected format.") - } - Input(byteSequence.bytes, byteSequence.offset + headerSize, byteSequence.size - headerSize).use { input -> - return pool.run { kryo -> - withContext(kryo, context) { - if (context.objectReferencesEnabled) { - uncheckedCast(kryo.readClassAndObject(input)) - } else { - kryo.withoutReferences { uncheckedCast(kryo.readClassAndObject(input)) } - } - } - } - } - } - - override fun serialize(obj: T, context: SerializationContext): SerializedBytes { - val pool = getPool(context) - return pool.run { kryo -> - withContext(kryo, context) { - serializeOutputStreamPool.run { stream -> - serializeBufferPool.run { buffer -> - Output(buffer).use { - it.outputStream = stream - it.writeBytes(KryoHeaderV0_1.bytes) - if (context.objectReferencesEnabled) { - kryo.writeClassAndObject(it, obj) - } else { - kryo.withoutReferences { kryo.writeClassAndObject(it, obj) } - } - } - SerializedBytes(stream.toByteArray()) - } - } - } - } - } -} - -private val serializeBufferPool = LazyPool( - newInstance = { ByteArray(64 * 1024) } -) -private val serializeOutputStreamPool = LazyPool( - clear = ByteArrayOutputStream::reset, - shouldReturnToPool = { it.size() < 256 * 1024 }, // Discard if it grew too large - newInstance = { ByteArrayOutputStream(64 * 1024) } -) - -// "corda" + majorVersionByte + minorVersionMSB + minorVersionLSB -val KryoHeaderV0_1: OpaqueBytes = OpaqueBytes("corda\u0000\u0000\u0001".toByteArray(Charsets.UTF_8)) -val KRYO_P2P_CONTEXT = SerializationContextImpl(KryoHeaderV0_1, - SerializationDefaults.javaClass.classLoader, - GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), - emptyMap(), - true, - SerializationContext.UseCase.P2P) interface SerializationScheme { // byteSequence expected to just be the 8 bytes necessary for versioning diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt index baab649669..7380cfdd0e 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt @@ -6,6 +6,11 @@ import net.corda.core.serialization.ClassWhitelist import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationDefaults import net.corda.nodeapi.internal.serialization.amqp.AmqpHeaderV1_0 +import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1 + +object QuasarWhitelist : ClassWhitelist { + override fun hasListed(type: Class<*>): Boolean = true +} /* * Serialisation contexts for the server. @@ -16,18 +21,28 @@ import net.corda.nodeapi.internal.serialization.amqp.AmqpHeaderV1_0 * CANNOT always be instantiated outside of the server and so * MUST be kept separate! */ + val KRYO_RPC_SERVER_CONTEXT = SerializationContextImpl(KryoHeaderV0_1, SerializationDefaults.javaClass.classLoader, GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), emptyMap(), true, SerializationContext.UseCase.RPCServer) + val KRYO_STORAGE_CONTEXT = SerializationContextImpl(KryoHeaderV0_1, SerializationDefaults.javaClass.classLoader, AllButBlacklisted, emptyMap(), true, SerializationContext.UseCase.Storage) + +val KRYO_P2P_CONTEXT = SerializationContextImpl(KryoHeaderV0_1, + SerializationDefaults.javaClass.classLoader, + GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), + emptyMap(), + true, + SerializationContext.UseCase.P2P) + val KRYO_CHECKPOINT_CONTEXT = SerializationContextImpl(KryoHeaderV0_1, SerializationDefaults.javaClass.classLoader, QuasarWhitelist, @@ -35,9 +50,6 @@ val KRYO_CHECKPOINT_CONTEXT = SerializationContextImpl(KryoHeaderV0_1, true, SerializationContext.UseCase.Checkpoint) -object QuasarWhitelist : ClassWhitelist { - override fun hasListed(type: Class<*>): Boolean = true -} val AMQP_STORAGE_CONTEXT = SerializationContextImpl(AmqpHeaderV1_0, SerializationDefaults.javaClass.classLoader, @@ -45,3 +57,18 @@ val AMQP_STORAGE_CONTEXT = SerializationContextImpl(AmqpHeaderV1_0, emptyMap(), true, SerializationContext.UseCase.Storage) + +val AMQP_P2P_CONTEXT = SerializationContextImpl(AmqpHeaderV1_0, + SerializationDefaults.javaClass.classLoader, + GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), + emptyMap(), + true, + SerializationContext.UseCase.P2P) + +val AMQP_RPC_SERVER_CONTEXT = SerializationContextImpl(AmqpHeaderV1_0, + SerializationDefaults.javaClass.classLoader, + GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), + emptyMap(), + true, + SerializationContext.UseCase.RPCServer) + diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/AMQPSerializationScheme.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt similarity index 92% rename from node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/AMQPSerializationScheme.kt rename to node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt index 501840e3df..2f888c70fe 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/AMQPSerializationScheme.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt @@ -1,13 +1,12 @@ @file:JvmName("AMQPSerializationScheme") -package net.corda.nodeapi.internal.serialization +package net.corda.nodeapi.internal.serialization.amqp import net.corda.core.serialization.* import net.corda.core.utilities.ByteSequence -import net.corda.nodeapi.internal.serialization.amqp.AmqpHeaderV1_0 -import net.corda.nodeapi.internal.serialization.amqp.DeserializationInput -import net.corda.nodeapi.internal.serialization.amqp.SerializationOutput -import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory +import net.corda.nodeapi.internal.serialization.DefaultWhitelist +import net.corda.nodeapi.internal.serialization.MutableClassWhitelist +import net.corda.nodeapi.internal.serialization.SerializationScheme import java.util.* import java.util.concurrent.ConcurrentHashMap @@ -25,7 +24,7 @@ fun SerializerFactory.addToWhitelist(vararg types: Class<*>) { } abstract class AbstractAMQPSerializationScheme : SerializationScheme { - internal companion object { + companion object { private val serializationWhitelists: List by lazy { ServiceLoader.load(SerializationWhitelist::class.java, this::class.java.classLoader).toList() + DefaultWhitelist } @@ -132,9 +131,3 @@ class AMQPClientSerializationScheme : AbstractAMQPSerializationScheme() { } -val AMQP_P2P_CONTEXT = SerializationContextImpl(AmqpHeaderV1_0, - SerializationDefaults.javaClass.classLoader, - GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()), - emptyMap(), - true, - SerializationContext.UseCase.P2P) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/CordaClosureSerializer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/CordaClosureSerializer.kt similarity index 94% rename from node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/CordaClosureSerializer.kt rename to node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/CordaClosureSerializer.kt index ec032a8d63..c821509b84 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/CordaClosureSerializer.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/CordaClosureSerializer.kt @@ -1,4 +1,4 @@ -package net.corda.nodeapi.internal.serialization +package net.corda.nodeapi.internal.serialization.kryo import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.Output diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/DefaultKryoCustomizer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/DefaultKryoCustomizer.kt similarity index 97% rename from node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/DefaultKryoCustomizer.kt rename to node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/DefaultKryoCustomizer.kt index 7532ecffe9..e19fd49604 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/DefaultKryoCustomizer.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/DefaultKryoCustomizer.kt @@ -1,4 +1,4 @@ -package net.corda.nodeapi.internal.serialization +package net.corda.nodeapi.internal.serialization.kryo import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Serializer @@ -23,6 +23,10 @@ import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.WireTransaction import net.corda.core.utilities.NonEmptySet import net.corda.core.utilities.toNonEmptySet +import net.corda.nodeapi.internal.serialization.CordaClassResolver +import net.corda.nodeapi.internal.serialization.DefaultWhitelist +import net.corda.nodeapi.internal.serialization.GeneratedAttachment +import net.corda.nodeapi.internal.serialization.MutableClassWhitelist import net.i2p.crypto.eddsa.EdDSAPrivateKey import net.i2p.crypto.eddsa.EdDSAPublicKey import org.bouncycastle.asn1.x500.X500Name diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/Kryo.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/Kryo.kt similarity index 98% rename from node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/Kryo.kt rename to node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/Kryo.kt index b4d0dc9c8c..8cdc24f941 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/Kryo.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/Kryo.kt @@ -1,4 +1,4 @@ -package net.corda.nodeapi.internal.serialization +package net.corda.nodeapi.internal.serialization.kryo import com.esotericsoftware.kryo.* import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory @@ -21,6 +21,8 @@ import net.corda.core.serialization.SerializedBytes import net.corda.core.toFuture import net.corda.core.toObservable import net.corda.core.transactions.* +import net.corda.nodeapi.internal.serialization.CordaClassResolver +import net.corda.nodeapi.internal.serialization.serializationContextKey import org.bouncycastle.asn1.ASN1InputStream import org.bouncycastle.asn1.x500.X500Name import org.bouncycastle.cert.X509CertificateHolder @@ -280,7 +282,7 @@ object SignedTransactionSerializer : Serializer() { sealed class UseCaseSerializer(private val allowedUseCases: EnumSet) : Serializer() { protected fun checkUseCase() { - checkUseCase(allowedUseCases) + net.corda.nodeapi.internal.serialization.checkUseCase(allowedUseCases) } } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoSerializationScheme.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoSerializationScheme.kt new file mode 100644 index 0000000000..cff437dd59 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/KryoSerializationScheme.kt @@ -0,0 +1,132 @@ +package net.corda.nodeapi.internal.serialization.kryo + +import java.util.concurrent.ConcurrentHashMap +import java.io.ByteArrayOutputStream +import co.paralleluniverse.fibers.Fiber +import co.paralleluniverse.io.serialization.kryo.KryoSerializer +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.KryoException +import com.esotericsoftware.kryo.Serializer +import com.esotericsoftware.kryo.io.Input +import com.esotericsoftware.kryo.io.Output +import com.esotericsoftware.kryo.pool.KryoPool +import com.esotericsoftware.kryo.serializers.ClosureSerializer +import net.corda.core.internal.uncheckedCast +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.ByteSequence +import net.corda.core.serialization.* +import net.corda.core.internal.LazyPool +import net.corda.nodeapi.internal.serialization.CordaClassResolver +import net.corda.nodeapi.internal.serialization.SerializationScheme + +// "corda" + majorVersionByte + minorVersionMSB + minorVersionLSB +val KryoHeaderV0_1: OpaqueBytes = OpaqueBytes("corda\u0000\u0000\u0001".toByteArray(Charsets.UTF_8)) + +private object AutoCloseableSerialisationDetector : Serializer() { + override fun write(kryo: Kryo, output: Output, closeable: AutoCloseable) { + val message = "${closeable.javaClass.name}, which is a closeable resource, has been detected during flow checkpointing. " + + "Restoring such resources across node restarts is not supported. Make sure code accessing it is " + + "confined to a private method or the reference is nulled out." + throw UnsupportedOperationException(message) + } + + override fun read(kryo: Kryo, input: Input, type: Class) = throw IllegalStateException("Should not reach here!") +} + +abstract class AbstractKryoSerializationScheme : SerializationScheme { + private val kryoPoolsForContexts = ConcurrentHashMap, KryoPool>() + + protected abstract fun rpcClientKryoPool(context: SerializationContext): KryoPool + protected abstract fun rpcServerKryoPool(context: SerializationContext): KryoPool + + private fun getPool(context: SerializationContext): KryoPool { + return kryoPoolsForContexts.computeIfAbsent(Pair(context.whitelist, context.deserializationClassLoader)) { + when (context.useCase) { + SerializationContext.UseCase.Checkpoint -> + KryoPool.Builder { + val serializer = Fiber.getFiberSerializer(false) as KryoSerializer + val classResolver = CordaClassResolver(context).apply { setKryo(serializer.kryo) } + // TODO The ClassResolver can only be set in the Kryo constructor and Quasar doesn't provide us with a way of doing that + val field = Kryo::class.java.getDeclaredField("classResolver").apply { isAccessible = true } + serializer.kryo.apply { + field.set(this, classResolver) + DefaultKryoCustomizer.customize(this) + addDefaultSerializer(AutoCloseable::class.java, AutoCloseableSerialisationDetector) + register(ClosureSerializer.Closure::class.java, CordaClosureSerializer) + classLoader = it.second + } + }.build() + SerializationContext.UseCase.RPCClient -> + rpcClientKryoPool(context) + SerializationContext.UseCase.RPCServer -> + rpcServerKryoPool(context) + else -> + KryoPool.Builder { + DefaultKryoCustomizer.customize(CordaKryo(CordaClassResolver(context))).apply { classLoader = it.second } + }.build() + } + } + } + + private fun withContext(kryo: Kryo, context: SerializationContext, block: (Kryo) -> T): T { + kryo.context.ensureCapacity(context.properties.size) + context.properties.forEach { kryo.context.put(it.key, it.value) } + try { + return block(kryo) + } finally { + kryo.context.clear() + } + } + + override fun deserialize(byteSequence: ByteSequence, clazz: Class, context: SerializationContext): T { + val pool = getPool(context) + val headerSize = KryoHeaderV0_1.size + val header = byteSequence.take(headerSize) + if (header != KryoHeaderV0_1) { + throw KryoException("Serialized bytes header does not match expected format.") + } + Input(byteSequence.bytes, byteSequence.offset + headerSize, byteSequence.size - headerSize).use { input -> + return pool.run { kryo -> + withContext(kryo, context) { + if (context.objectReferencesEnabled) { + uncheckedCast(kryo.readClassAndObject(input)) + } else { + kryo.withoutReferences { uncheckedCast(kryo.readClassAndObject(input)) } + } + } + } + } + } + + override fun serialize(obj: T, context: SerializationContext): SerializedBytes { + val pool = getPool(context) + return pool.run { kryo -> + withContext(kryo, context) { + serializeOutputStreamPool.run { stream -> + serializeBufferPool.run { buffer -> + Output(buffer).use { + it.outputStream = stream + it.writeBytes(KryoHeaderV0_1.bytes) + if (context.objectReferencesEnabled) { + kryo.writeClassAndObject(it, obj) + } else { + kryo.withoutReferences { kryo.writeClassAndObject(it, obj) } + } + } + SerializedBytes(stream.toByteArray()) + } + } + } + } + } +} + +private val serializeBufferPool = LazyPool( + newInstance = { ByteArray(64 * 1024) } +) + +private val serializeOutputStreamPool = LazyPool( + clear = ByteArrayOutputStream::reset, + shouldReturnToPool = { it.size() < 256 * 1024 }, // Discard if it grew too large + newInstance = { ByteArrayOutputStream(64 * 1024) } +) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializeAsTokenSerializer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/SerializeAsTokenSerializer.kt similarity index 96% rename from node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializeAsTokenSerializer.kt rename to node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/SerializeAsTokenSerializer.kt index 118eb299fb..eebe87a099 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializeAsTokenSerializer.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/kryo/SerializeAsTokenSerializer.kt @@ -1,4 +1,4 @@ -package net.corda.nodeapi.internal.serialization +package net.corda.nodeapi.internal.serialization.kryo import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.KryoException diff --git a/node-api/src/test/java/net/corda/nodeapi/internal/serialization/ForbiddenLambdaSerializationTests.java b/node-api/src/test/java/net/corda/nodeapi/internal/serialization/ForbiddenLambdaSerializationTests.java index 26ef012df3..ba452d138c 100644 --- a/node-api/src/test/java/net/corda/nodeapi/internal/serialization/ForbiddenLambdaSerializationTests.java +++ b/node-api/src/test/java/net/corda/nodeapi/internal/serialization/ForbiddenLambdaSerializationTests.java @@ -5,6 +5,8 @@ import net.corda.core.serialization.SerializationContext; import net.corda.core.serialization.SerializationFactory; import net.corda.core.serialization.SerializedBytes; import net.corda.testing.SerializationEnvironmentRule; +import net.corda.nodeapi.internal.serialization.kryo.CordaClosureBlacklistSerializer; +import net.corda.nodeapi.internal.serialization.kryo.KryoSerializationSchemeKt; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -31,7 +33,7 @@ public final class ForbiddenLambdaSerializationTests { EnumSet contexts = EnumSet.complementOf(EnumSet.of(SerializationContext.UseCase.Checkpoint)); contexts.forEach(ctx -> { - SerializationContext context = new SerializationContextImpl(SerializationSchemeKt.getKryoHeaderV0_1(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx); + SerializationContext context = new SerializationContextImpl(KryoSerializationSchemeKt.getKryoHeaderV0_1(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx); String value = "Hey"; Callable target = (Callable & Serializable) () -> value; @@ -54,7 +56,7 @@ public final class ForbiddenLambdaSerializationTests { EnumSet contexts = EnumSet.complementOf(EnumSet.of(SerializationContext.UseCase.Checkpoint)); contexts.forEach(ctx -> { - SerializationContext context = new SerializationContextImpl(SerializationSchemeKt.getKryoHeaderV0_1(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx); + SerializationContext context = new SerializationContextImpl(KryoSerializationSchemeKt.getKryoHeaderV0_1(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx); String value = "Hey"; Callable target = () -> value; diff --git a/node-api/src/test/java/net/corda/nodeapi/internal/serialization/LambdaCheckpointSerializationTest.java b/node-api/src/test/java/net/corda/nodeapi/internal/serialization/LambdaCheckpointSerializationTest.java index c3230be211..93156d1073 100644 --- a/node-api/src/test/java/net/corda/nodeapi/internal/serialization/LambdaCheckpointSerializationTest.java +++ b/node-api/src/test/java/net/corda/nodeapi/internal/serialization/LambdaCheckpointSerializationTest.java @@ -5,6 +5,8 @@ import net.corda.core.serialization.SerializationContext; import net.corda.core.serialization.SerializationFactory; import net.corda.core.serialization.SerializedBytes; import net.corda.testing.SerializationEnvironmentRule; +import net.corda.nodeapi.internal.serialization.kryo.CordaClosureSerializer; +import net.corda.nodeapi.internal.serialization.kryo.KryoSerializationSchemeKt; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -24,7 +26,7 @@ public final class LambdaCheckpointSerializationTest { @Before public void setup() { factory = testSerialization.env.getSERIALIZATION_FACTORY(); - context = new SerializationContextImpl(SerializationSchemeKt.getKryoHeaderV0_1(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, SerializationContext.UseCase.Checkpoint); + context = new SerializationContextImpl(KryoSerializationSchemeKt.getKryoHeaderV0_1(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, SerializationContext.UseCase.Checkpoint); } @Test diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/CordaClassResolverTests.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/CordaClassResolverTests.kt index 04f4c69122..155229a5bc 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/CordaClassResolverTests.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/CordaClassResolverTests.kt @@ -10,6 +10,8 @@ import net.corda.core.node.services.AttachmentStorage import net.corda.core.serialization.* import net.corda.nodeapi.internal.AttachmentsClassLoader import net.corda.nodeapi.internal.AttachmentsClassLoaderTests +import net.corda.nodeapi.internal.serialization.kryo.CordaKryo +import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1 import net.corda.testing.node.MockAttachmentStorage import net.corda.testing.rigorousMock import org.junit.Rule diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/KryoTests.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/KryoTests.kt index 07593368a7..cafeb14dd0 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/KryoTests.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/KryoTests.kt @@ -13,6 +13,7 @@ import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.sequence import net.corda.node.serialization.KryoServerSerializationScheme import net.corda.node.services.persistence.NodeAttachmentService +import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1 import net.corda.testing.ALICE_PUBKEY import net.corda.testing.SerializationEnvironmentRule import org.assertj.core.api.Assertions.assertThat diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/ListsSerializationTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/ListsSerializationTest.kt index c59d26b72d..5663a78beb 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/ListsSerializationTest.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/ListsSerializationTest.kt @@ -7,6 +7,7 @@ import net.corda.node.services.statemachine.SessionData import net.corda.nodeapi.internal.serialization.amqp.DeserializationInput import net.corda.nodeapi.internal.serialization.amqp.Envelope import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory +import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1 import net.corda.testing.amqpSpecific import net.corda.testing.kryoSpecific import net.corda.testing.SerializationEnvironmentRule diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/MapsSerializationTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/MapsSerializationTest.kt index 5cc64a5b64..ed45e63b68 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/MapsSerializationTest.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/MapsSerializationTest.kt @@ -6,6 +6,7 @@ import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.node.services.statemachine.SessionData +import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1 import net.corda.testing.amqpSpecific import net.corda.testing.kryoSpecific import net.corda.testing.SerializationEnvironmentRule @@ -84,4 +85,4 @@ class MapsSerializationTest { } assertArrayEquals(output.toByteArray(), serializedForm.bytes) } -} \ No newline at end of file +} diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/SerializationTokenTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/SerializationTokenTest.kt index 6fc74fba55..bcca06f6de 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/SerializationTokenTest.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/SerializationTokenTest.kt @@ -5,6 +5,10 @@ import com.esotericsoftware.kryo.KryoException import com.esotericsoftware.kryo.io.Output import net.corda.core.serialization.* import net.corda.core.utilities.OpaqueBytes +import net.corda.nodeapi.internal.serialization.kryo.CordaKryo +import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer +import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1 +import net.corda.testing.TestDependencyInjectionBase import net.corda.testing.rigorousMock import net.corda.testing.SerializationEnvironmentRule import org.assertj.core.api.Assertions.assertThat diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/SetsSerializationTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/SetsSerializationTest.kt index 57cee0e7e5..243b73a803 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/SetsSerializationTest.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/SetsSerializationTest.kt @@ -5,6 +5,7 @@ import com.esotericsoftware.kryo.util.DefaultClassResolver import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.node.services.statemachine.SessionData +import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1 import net.corda.testing.kryoSpecific import net.corda.testing.SerializationEnvironmentRule import org.junit.Assert.assertArrayEquals diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationOutputTests.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationOutputTests.kt index f3496a9f74..d6527fd816 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationOutputTests.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationOutputTests.kt @@ -13,7 +13,6 @@ import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SerializationFactory import net.corda.core.transactions.LedgerTransaction import net.corda.client.rpc.RPCException -import net.corda.nodeapi.internal.serialization.AbstractAMQPSerializationScheme import net.corda.nodeapi.internal.serialization.AllWhitelist import net.corda.nodeapi.internal.serialization.EmptyWhitelist import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory.Companion.isPrimitive diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 992b433081..20eb06247a 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -39,6 +39,7 @@ import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.internal.ShutdownHook import net.corda.nodeapi.internal.addShutdownHook import net.corda.nodeapi.internal.serialization.* +import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.client.ActiveMQClient diff --git a/node/src/main/kotlin/net/corda/node/serialization/KryoServerSerializationScheme.kt b/node/src/main/kotlin/net/corda/node/serialization/KryoServerSerializationScheme.kt index 319312689b..09f876d937 100644 --- a/node/src/main/kotlin/net/corda/node/serialization/KryoServerSerializationScheme.kt +++ b/node/src/main/kotlin/net/corda/node/serialization/KryoServerSerializationScheme.kt @@ -4,10 +4,10 @@ import com.esotericsoftware.kryo.pool.KryoPool import net.corda.core.serialization.SerializationContext import net.corda.core.utilities.ByteSequence import net.corda.node.services.messaging.RpcServerObservableSerializer -import net.corda.nodeapi.internal.serialization.AbstractKryoSerializationScheme -import net.corda.nodeapi.internal.serialization.DefaultKryoCustomizer -import net.corda.nodeapi.internal.serialization.KryoHeaderV0_1 -import net.corda.nodeapi.internal.serialization.RPCKryo +import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme +import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer +import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1 +import net.corda.nodeapi.internal.serialization.kryo.RPCKryo class KryoServerSerializationScheme : AbstractKryoSerializationScheme() { override fun canDeserializeVersion(byteSequence: ByteSequence, target: SerializationContext.UseCase): Boolean { diff --git a/node/src/test/kotlin/net/corda/node/utilities/X509UtilitiesTest.kt b/node/src/test/kotlin/net/corda/node/utilities/X509UtilitiesTest.kt index a94b2d7d88..df81bef948 100644 --- a/node/src/test/kotlin/net/corda/node/utilities/X509UtilitiesTest.kt +++ b/node/src/test/kotlin/net/corda/node/utilities/X509UtilitiesTest.kt @@ -14,7 +14,7 @@ import net.corda.core.serialization.serialize import net.corda.node.serialization.KryoServerSerializationScheme import net.corda.node.services.config.createKeystoreForCordaNode import net.corda.nodeapi.internal.serialization.AllWhitelist -import net.corda.nodeapi.internal.serialization.KryoHeaderV0_1 +import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1 import net.corda.nodeapi.internal.serialization.SerializationContextImpl import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl import net.corda.testing.ALICE diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt index a1cda49a22..3155e38f27 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt @@ -24,7 +24,7 @@ import net.corda.node.utilities.CertificateAndKeyPair import net.corda.node.utilities.CertificateType import net.corda.node.utilities.X509Utilities import net.corda.nodeapi.config.SSLConfiguration -import net.corda.nodeapi.internal.serialization.AMQP_ENABLED +import net.corda.nodeapi.internal.serialization.amqp.AMQP_ENABLED import org.mockito.internal.stubbing.answers.ThrowsException import org.mockito.stubbing.Answer import java.nio.file.Files diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/SerializationTestHelpers.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/SerializationTestHelpers.kt index 857f9cb0fb..4738175ae4 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/SerializationTestHelpers.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/SerializationTestHelpers.kt @@ -9,6 +9,8 @@ import net.corda.core.serialization.internal.SerializationEnvironment import net.corda.core.utilities.ByteSequence import net.corda.node.serialization.KryoServerSerializationScheme import net.corda.nodeapi.internal.serialization.* +import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme +import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme import org.junit.rules.TestRule import org.junit.runner.Description import org.junit.runners.model.Statement diff --git a/verifier/src/main/kotlin/net/corda/verifier/Verifier.kt b/verifier/src/main/kotlin/net/corda/verifier/Verifier.kt index 22ff4e9f60..ccc9a967db 100644 --- a/verifier/src/main/kotlin/net/corda/verifier/Verifier.kt +++ b/verifier/src/main/kotlin/net/corda/verifier/Verifier.kt @@ -18,8 +18,11 @@ import net.corda.nodeapi.config.NodeSSLConfiguration import net.corda.nodeapi.config.getValue import net.corda.nodeapi.internal.addShutdownHook import net.corda.nodeapi.internal.serialization.* +import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme import net.corda.nodeapi.internal.serialization.amqp.AmqpHeaderV1_0 import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory +import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme +import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1 import org.apache.activemq.artemis.api.core.client.ActiveMQClient import java.nio.file.Path import java.nio.file.Paths