mirror of
https://github.com/corda/corda.git
synced 2025-02-21 09:51:57 +00:00
CORDA-756 - Refactor Kryo into sub module as per AMQP
This commit is contained in:
parent
7f96205b99
commit
d6adb83cd0
@ -7,6 +7,7 @@ import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport
|
||||
import net.corda.nodeapi.ConnectionDirection
|
||||
import net.corda.nodeapi.internal.serialization.AMQP_RPC_CLIENT_CONTEXT
|
||||
import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT
|
||||
import java.time.Duration
|
||||
|
||||
|
@ -5,6 +5,11 @@ import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.nodeapi.internal.serialization.*
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
|
||||
import net.corda.nodeapi.internal.serialization.kryo.RPCKryo
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
class KryoClientSerializationScheme : AbstractKryoSerializationScheme() {
|
||||
|
@ -4,15 +4,25 @@ package net.corda.nodeapi.internal.serialization
|
||||
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AmqpHeaderV1_0
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
|
||||
|
||||
/*
|
||||
* Serialisation contexts for the client.
|
||||
* These have been refactored into a separate file to prevent
|
||||
* servers from trying to instantiate any of them.
|
||||
*/
|
||||
|
||||
val KRYO_RPC_CLIENT_CONTEXT = SerializationContextImpl(KryoHeaderV0_1,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.RPCClient)
|
||||
|
||||
val AMQP_RPC_CLIENT_CONTEXT = SerializationContextImpl(AmqpHeaderV1_0,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.RPCClient)
|
||||
|
@ -12,6 +12,7 @@ import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.nodeapi.internal.serialization.amqp.hasAnnotationInHierarchy
|
||||
import net.corda.nodeapi.internal.serialization.kryo.ThrowableSerializer
|
||||
import java.io.PrintWriter
|
||||
import java.lang.reflect.Modifier.isAbstract
|
||||
import java.nio.charset.StandardCharsets
|
||||
|
@ -1,26 +1,13 @@
|
||||
package net.corda.nodeapi.internal.serialization
|
||||
|
||||
import co.paralleluniverse.fibers.Fiber
|
||||
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.KryoException
|
||||
import com.esotericsoftware.kryo.Serializer
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import com.esotericsoftware.kryo.pool.KryoPool
|
||||
import com.esotericsoftware.kryo.serializers.ClosureSerializer
|
||||
import com.google.common.cache.Cache
|
||||
import com.google.common.cache.CacheBuilder
|
||||
import net.corda.core.contracts.Attachment
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.internal.LazyPool
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.nodeapi.internal.AttachmentsClassLoader
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.io.NotSerializableException
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
@ -158,124 +145,8 @@ open class SerializationFactoryImpl : SerializationFactory() {
|
||||
override fun hashCode(): Int = registeredSchemes.hashCode()
|
||||
}
|
||||
|
||||
private object AutoCloseableSerialisationDetector : Serializer<AutoCloseable>() {
|
||||
override fun write(kryo: Kryo, output: Output, closeable: AutoCloseable) {
|
||||
val message = "${closeable.javaClass.name}, which is a closeable resource, has been detected during flow checkpointing. " +
|
||||
"Restoring such resources across node restarts is not supported. Make sure code accessing it is " +
|
||||
"confined to a private method or the reference is nulled out."
|
||||
throw UnsupportedOperationException(message)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<AutoCloseable>) = throw IllegalStateException("Should not reach here!")
|
||||
}
|
||||
|
||||
abstract class AbstractKryoSerializationScheme : SerializationScheme {
|
||||
private val kryoPoolsForContexts = ConcurrentHashMap<Pair<ClassWhitelist, ClassLoader>, KryoPool>()
|
||||
|
||||
protected abstract fun rpcClientKryoPool(context: SerializationContext): KryoPool
|
||||
protected abstract fun rpcServerKryoPool(context: SerializationContext): KryoPool
|
||||
|
||||
private fun getPool(context: SerializationContext): KryoPool {
|
||||
return kryoPoolsForContexts.computeIfAbsent(Pair(context.whitelist, context.deserializationClassLoader)) {
|
||||
when (context.useCase) {
|
||||
SerializationContext.UseCase.Checkpoint ->
|
||||
KryoPool.Builder {
|
||||
val serializer = Fiber.getFiberSerializer(false) as KryoSerializer
|
||||
val classResolver = CordaClassResolver(context).apply { setKryo(serializer.kryo) }
|
||||
// TODO The ClassResolver can only be set in the Kryo constructor and Quasar doesn't provide us with a way of doing that
|
||||
val field = Kryo::class.java.getDeclaredField("classResolver").apply { isAccessible = true }
|
||||
serializer.kryo.apply {
|
||||
field.set(this, classResolver)
|
||||
DefaultKryoCustomizer.customize(this)
|
||||
addDefaultSerializer(AutoCloseable::class.java, AutoCloseableSerialisationDetector)
|
||||
register(ClosureSerializer.Closure::class.java, CordaClosureSerializer)
|
||||
classLoader = it.second
|
||||
}
|
||||
}.build()
|
||||
SerializationContext.UseCase.RPCClient ->
|
||||
rpcClientKryoPool(context)
|
||||
SerializationContext.UseCase.RPCServer ->
|
||||
rpcServerKryoPool(context)
|
||||
else ->
|
||||
KryoPool.Builder {
|
||||
DefaultKryoCustomizer.customize(CordaKryo(CordaClassResolver(context))).apply { classLoader = it.second }
|
||||
}.build()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun <T : Any> withContext(kryo: Kryo, context: SerializationContext, block: (Kryo) -> T): T {
|
||||
kryo.context.ensureCapacity(context.properties.size)
|
||||
context.properties.forEach { kryo.context.put(it.key, it.value) }
|
||||
try {
|
||||
return block(kryo)
|
||||
} finally {
|
||||
kryo.context.clear()
|
||||
}
|
||||
}
|
||||
|
||||
override fun <T : Any> deserialize(byteSequence: ByteSequence, clazz: Class<T>, context: SerializationContext): T {
|
||||
val pool = getPool(context)
|
||||
val headerSize = KryoHeaderV0_1.size
|
||||
val header = byteSequence.take(headerSize)
|
||||
if (header != KryoHeaderV0_1) {
|
||||
throw KryoException("Serialized bytes header does not match expected format.")
|
||||
}
|
||||
Input(byteSequence.bytes, byteSequence.offset + headerSize, byteSequence.size - headerSize).use { input ->
|
||||
return pool.run { kryo ->
|
||||
withContext(kryo, context) {
|
||||
if (context.objectReferencesEnabled) {
|
||||
uncheckedCast(kryo.readClassAndObject(input))
|
||||
} else {
|
||||
kryo.withoutReferences { uncheckedCast<Any?, T>(kryo.readClassAndObject(input)) }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun <T : Any> serialize(obj: T, context: SerializationContext): SerializedBytes<T> {
|
||||
val pool = getPool(context)
|
||||
return pool.run { kryo ->
|
||||
withContext(kryo, context) {
|
||||
serializeOutputStreamPool.run { stream ->
|
||||
serializeBufferPool.run { buffer ->
|
||||
Output(buffer).use {
|
||||
it.outputStream = stream
|
||||
it.writeBytes(KryoHeaderV0_1.bytes)
|
||||
if (context.objectReferencesEnabled) {
|
||||
kryo.writeClassAndObject(it, obj)
|
||||
} else {
|
||||
kryo.withoutReferences { kryo.writeClassAndObject(it, obj) }
|
||||
}
|
||||
}
|
||||
SerializedBytes(stream.toByteArray())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val serializeBufferPool = LazyPool(
|
||||
newInstance = { ByteArray(64 * 1024) }
|
||||
)
|
||||
private val serializeOutputStreamPool = LazyPool(
|
||||
clear = ByteArrayOutputStream::reset,
|
||||
shouldReturnToPool = { it.size() < 256 * 1024 }, // Discard if it grew too large
|
||||
newInstance = { ByteArrayOutputStream(64 * 1024) }
|
||||
)
|
||||
|
||||
// "corda" + majorVersionByte + minorVersionMSB + minorVersionLSB
|
||||
val KryoHeaderV0_1: OpaqueBytes = OpaqueBytes("corda\u0000\u0000\u0001".toByteArray(Charsets.UTF_8))
|
||||
|
||||
|
||||
val KRYO_P2P_CONTEXT = SerializationContextImpl(KryoHeaderV0_1,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.P2P)
|
||||
|
||||
interface SerializationScheme {
|
||||
// byteSequence expected to just be the 8 bytes necessary for versioning
|
||||
|
@ -6,6 +6,11 @@ import net.corda.core.serialization.ClassWhitelist
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AmqpHeaderV1_0
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
|
||||
|
||||
object QuasarWhitelist : ClassWhitelist {
|
||||
override fun hasListed(type: Class<*>): Boolean = true
|
||||
}
|
||||
|
||||
/*
|
||||
* Serialisation contexts for the server.
|
||||
@ -16,18 +21,28 @@ import net.corda.nodeapi.internal.serialization.amqp.AmqpHeaderV1_0
|
||||
* CANNOT always be instantiated outside of the server and so
|
||||
* MUST be kept separate!
|
||||
*/
|
||||
|
||||
val KRYO_RPC_SERVER_CONTEXT = SerializationContextImpl(KryoHeaderV0_1,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.RPCServer)
|
||||
|
||||
val KRYO_STORAGE_CONTEXT = SerializationContextImpl(KryoHeaderV0_1,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
AllButBlacklisted,
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.Storage)
|
||||
|
||||
val KRYO_P2P_CONTEXT = SerializationContextImpl(KryoHeaderV0_1,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.P2P)
|
||||
|
||||
val KRYO_CHECKPOINT_CONTEXT = SerializationContextImpl(KryoHeaderV0_1,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
QuasarWhitelist,
|
||||
@ -35,9 +50,6 @@ val KRYO_CHECKPOINT_CONTEXT = SerializationContextImpl(KryoHeaderV0_1,
|
||||
true,
|
||||
SerializationContext.UseCase.Checkpoint)
|
||||
|
||||
object QuasarWhitelist : ClassWhitelist {
|
||||
override fun hasListed(type: Class<*>): Boolean = true
|
||||
}
|
||||
|
||||
val AMQP_STORAGE_CONTEXT = SerializationContextImpl(AmqpHeaderV1_0,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
@ -45,3 +57,18 @@ val AMQP_STORAGE_CONTEXT = SerializationContextImpl(AmqpHeaderV1_0,
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.Storage)
|
||||
|
||||
val AMQP_P2P_CONTEXT = SerializationContextImpl(AmqpHeaderV1_0,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.P2P)
|
||||
|
||||
val AMQP_RPC_SERVER_CONTEXT = SerializationContextImpl(AmqpHeaderV1_0,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.RPCServer)
|
||||
|
||||
|
@ -1,13 +1,12 @@
|
||||
@file:JvmName("AMQPSerializationScheme")
|
||||
|
||||
package net.corda.nodeapi.internal.serialization
|
||||
package net.corda.nodeapi.internal.serialization.amqp
|
||||
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AmqpHeaderV1_0
|
||||
import net.corda.nodeapi.internal.serialization.amqp.DeserializationInput
|
||||
import net.corda.nodeapi.internal.serialization.amqp.SerializationOutput
|
||||
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
||||
import net.corda.nodeapi.internal.serialization.DefaultWhitelist
|
||||
import net.corda.nodeapi.internal.serialization.MutableClassWhitelist
|
||||
import net.corda.nodeapi.internal.serialization.SerializationScheme
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
@ -25,7 +24,7 @@ fun SerializerFactory.addToWhitelist(vararg types: Class<*>) {
|
||||
}
|
||||
|
||||
abstract class AbstractAMQPSerializationScheme : SerializationScheme {
|
||||
internal companion object {
|
||||
companion object {
|
||||
private val serializationWhitelists: List<SerializationWhitelist> by lazy {
|
||||
ServiceLoader.load(SerializationWhitelist::class.java, this::class.java.classLoader).toList() + DefaultWhitelist
|
||||
}
|
||||
@ -132,9 +131,3 @@ class AMQPClientSerializationScheme : AbstractAMQPSerializationScheme() {
|
||||
|
||||
}
|
||||
|
||||
val AMQP_P2P_CONTEXT = SerializationContextImpl(AmqpHeaderV1_0,
|
||||
SerializationDefaults.javaClass.classLoader,
|
||||
GlobalTransientClassWhiteList(BuiltInExceptionsWhitelist()),
|
||||
emptyMap(),
|
||||
true,
|
||||
SerializationContext.UseCase.P2P)
|
@ -1,4 +1,4 @@
|
||||
package net.corda.nodeapi.internal.serialization
|
||||
package net.corda.nodeapi.internal.serialization.kryo
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.io.Output
|
@ -1,4 +1,4 @@
|
||||
package net.corda.nodeapi.internal.serialization
|
||||
package net.corda.nodeapi.internal.serialization.kryo
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.Serializer
|
||||
@ -23,6 +23,10 @@ import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.NonEmptySet
|
||||
import net.corda.core.utilities.toNonEmptySet
|
||||
import net.corda.nodeapi.internal.serialization.CordaClassResolver
|
||||
import net.corda.nodeapi.internal.serialization.DefaultWhitelist
|
||||
import net.corda.nodeapi.internal.serialization.GeneratedAttachment
|
||||
import net.corda.nodeapi.internal.serialization.MutableClassWhitelist
|
||||
import net.i2p.crypto.eddsa.EdDSAPrivateKey
|
||||
import net.i2p.crypto.eddsa.EdDSAPublicKey
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
@ -1,4 +1,4 @@
|
||||
package net.corda.nodeapi.internal.serialization
|
||||
package net.corda.nodeapi.internal.serialization.kryo
|
||||
|
||||
import com.esotericsoftware.kryo.*
|
||||
import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory
|
||||
@ -21,6 +21,8 @@ import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.toFuture
|
||||
import net.corda.core.toObservable
|
||||
import net.corda.core.transactions.*
|
||||
import net.corda.nodeapi.internal.serialization.CordaClassResolver
|
||||
import net.corda.nodeapi.internal.serialization.serializationContextKey
|
||||
import org.bouncycastle.asn1.ASN1InputStream
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.bouncycastle.cert.X509CertificateHolder
|
||||
@ -280,7 +282,7 @@ object SignedTransactionSerializer : Serializer<SignedTransaction>() {
|
||||
|
||||
sealed class UseCaseSerializer<T>(private val allowedUseCases: EnumSet<SerializationContext.UseCase>) : Serializer<T>() {
|
||||
protected fun checkUseCase() {
|
||||
checkUseCase(allowedUseCases)
|
||||
net.corda.nodeapi.internal.serialization.checkUseCase(allowedUseCases)
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,132 @@
|
||||
package net.corda.nodeapi.internal.serialization.kryo
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.io.ByteArrayOutputStream
|
||||
import co.paralleluniverse.fibers.Fiber
|
||||
import co.paralleluniverse.io.serialization.kryo.KryoSerializer
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.KryoException
|
||||
import com.esotericsoftware.kryo.Serializer
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import com.esotericsoftware.kryo.pool.KryoPool
|
||||
import com.esotericsoftware.kryo.serializers.ClosureSerializer
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.internal.LazyPool
|
||||
import net.corda.nodeapi.internal.serialization.CordaClassResolver
|
||||
import net.corda.nodeapi.internal.serialization.SerializationScheme
|
||||
|
||||
// "corda" + majorVersionByte + minorVersionMSB + minorVersionLSB
|
||||
val KryoHeaderV0_1: OpaqueBytes = OpaqueBytes("corda\u0000\u0000\u0001".toByteArray(Charsets.UTF_8))
|
||||
|
||||
private object AutoCloseableSerialisationDetector : Serializer<AutoCloseable>() {
|
||||
override fun write(kryo: Kryo, output: Output, closeable: AutoCloseable) {
|
||||
val message = "${closeable.javaClass.name}, which is a closeable resource, has been detected during flow checkpointing. " +
|
||||
"Restoring such resources across node restarts is not supported. Make sure code accessing it is " +
|
||||
"confined to a private method or the reference is nulled out."
|
||||
throw UnsupportedOperationException(message)
|
||||
}
|
||||
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<AutoCloseable>) = throw IllegalStateException("Should not reach here!")
|
||||
}
|
||||
|
||||
abstract class AbstractKryoSerializationScheme : SerializationScheme {
|
||||
private val kryoPoolsForContexts = ConcurrentHashMap<Pair<ClassWhitelist, ClassLoader>, KryoPool>()
|
||||
|
||||
protected abstract fun rpcClientKryoPool(context: SerializationContext): KryoPool
|
||||
protected abstract fun rpcServerKryoPool(context: SerializationContext): KryoPool
|
||||
|
||||
private fun getPool(context: SerializationContext): KryoPool {
|
||||
return kryoPoolsForContexts.computeIfAbsent(Pair(context.whitelist, context.deserializationClassLoader)) {
|
||||
when (context.useCase) {
|
||||
SerializationContext.UseCase.Checkpoint ->
|
||||
KryoPool.Builder {
|
||||
val serializer = Fiber.getFiberSerializer(false) as KryoSerializer
|
||||
val classResolver = CordaClassResolver(context).apply { setKryo(serializer.kryo) }
|
||||
// TODO The ClassResolver can only be set in the Kryo constructor and Quasar doesn't provide us with a way of doing that
|
||||
val field = Kryo::class.java.getDeclaredField("classResolver").apply { isAccessible = true }
|
||||
serializer.kryo.apply {
|
||||
field.set(this, classResolver)
|
||||
DefaultKryoCustomizer.customize(this)
|
||||
addDefaultSerializer(AutoCloseable::class.java, AutoCloseableSerialisationDetector)
|
||||
register(ClosureSerializer.Closure::class.java, CordaClosureSerializer)
|
||||
classLoader = it.second
|
||||
}
|
||||
}.build()
|
||||
SerializationContext.UseCase.RPCClient ->
|
||||
rpcClientKryoPool(context)
|
||||
SerializationContext.UseCase.RPCServer ->
|
||||
rpcServerKryoPool(context)
|
||||
else ->
|
||||
KryoPool.Builder {
|
||||
DefaultKryoCustomizer.customize(CordaKryo(CordaClassResolver(context))).apply { classLoader = it.second }
|
||||
}.build()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun <T : Any> withContext(kryo: Kryo, context: SerializationContext, block: (Kryo) -> T): T {
|
||||
kryo.context.ensureCapacity(context.properties.size)
|
||||
context.properties.forEach { kryo.context.put(it.key, it.value) }
|
||||
try {
|
||||
return block(kryo)
|
||||
} finally {
|
||||
kryo.context.clear()
|
||||
}
|
||||
}
|
||||
|
||||
override fun <T : Any> deserialize(byteSequence: ByteSequence, clazz: Class<T>, context: SerializationContext): T {
|
||||
val pool = getPool(context)
|
||||
val headerSize = KryoHeaderV0_1.size
|
||||
val header = byteSequence.take(headerSize)
|
||||
if (header != KryoHeaderV0_1) {
|
||||
throw KryoException("Serialized bytes header does not match expected format.")
|
||||
}
|
||||
Input(byteSequence.bytes, byteSequence.offset + headerSize, byteSequence.size - headerSize).use { input ->
|
||||
return pool.run { kryo ->
|
||||
withContext(kryo, context) {
|
||||
if (context.objectReferencesEnabled) {
|
||||
uncheckedCast(kryo.readClassAndObject(input))
|
||||
} else {
|
||||
kryo.withoutReferences { uncheckedCast<Any?, T>(kryo.readClassAndObject(input)) }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun <T : Any> serialize(obj: T, context: SerializationContext): SerializedBytes<T> {
|
||||
val pool = getPool(context)
|
||||
return pool.run { kryo ->
|
||||
withContext(kryo, context) {
|
||||
serializeOutputStreamPool.run { stream ->
|
||||
serializeBufferPool.run { buffer ->
|
||||
Output(buffer).use {
|
||||
it.outputStream = stream
|
||||
it.writeBytes(KryoHeaderV0_1.bytes)
|
||||
if (context.objectReferencesEnabled) {
|
||||
kryo.writeClassAndObject(it, obj)
|
||||
} else {
|
||||
kryo.withoutReferences { kryo.writeClassAndObject(it, obj) }
|
||||
}
|
||||
}
|
||||
SerializedBytes(stream.toByteArray())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val serializeBufferPool = LazyPool(
|
||||
newInstance = { ByteArray(64 * 1024) }
|
||||
)
|
||||
|
||||
private val serializeOutputStreamPool = LazyPool(
|
||||
clear = ByteArrayOutputStream::reset,
|
||||
shouldReturnToPool = { it.size() < 256 * 1024 }, // Discard if it grew too large
|
||||
newInstance = { ByteArrayOutputStream(64 * 1024) }
|
||||
)
|
@ -1,4 +1,4 @@
|
||||
package net.corda.nodeapi.internal.serialization
|
||||
package net.corda.nodeapi.internal.serialization.kryo
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.KryoException
|
@ -5,6 +5,8 @@ import net.corda.core.serialization.SerializationContext;
|
||||
import net.corda.core.serialization.SerializationFactory;
|
||||
import net.corda.core.serialization.SerializedBytes;
|
||||
import net.corda.testing.SerializationEnvironmentRule;
|
||||
import net.corda.nodeapi.internal.serialization.kryo.CordaClosureBlacklistSerializer;
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoSerializationSchemeKt;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
@ -31,7 +33,7 @@ public final class ForbiddenLambdaSerializationTests {
|
||||
EnumSet<SerializationContext.UseCase> contexts = EnumSet.complementOf(EnumSet.of(SerializationContext.UseCase.Checkpoint));
|
||||
|
||||
contexts.forEach(ctx -> {
|
||||
SerializationContext context = new SerializationContextImpl(SerializationSchemeKt.getKryoHeaderV0_1(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx);
|
||||
SerializationContext context = new SerializationContextImpl(KryoSerializationSchemeKt.getKryoHeaderV0_1(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx);
|
||||
|
||||
String value = "Hey";
|
||||
Callable<String> target = (Callable<String> & Serializable) () -> value;
|
||||
@ -54,7 +56,7 @@ public final class ForbiddenLambdaSerializationTests {
|
||||
EnumSet<SerializationContext.UseCase> contexts = EnumSet.complementOf(EnumSet.of(SerializationContext.UseCase.Checkpoint));
|
||||
|
||||
contexts.forEach(ctx -> {
|
||||
SerializationContext context = new SerializationContextImpl(SerializationSchemeKt.getKryoHeaderV0_1(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx);
|
||||
SerializationContext context = new SerializationContextImpl(KryoSerializationSchemeKt.getKryoHeaderV0_1(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, ctx);
|
||||
|
||||
String value = "Hey";
|
||||
Callable<String> target = () -> value;
|
||||
|
@ -5,6 +5,8 @@ import net.corda.core.serialization.SerializationContext;
|
||||
import net.corda.core.serialization.SerializationFactory;
|
||||
import net.corda.core.serialization.SerializedBytes;
|
||||
import net.corda.testing.SerializationEnvironmentRule;
|
||||
import net.corda.nodeapi.internal.serialization.kryo.CordaClosureSerializer;
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoSerializationSchemeKt;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
@ -24,7 +26,7 @@ public final class LambdaCheckpointSerializationTest {
|
||||
@Before
|
||||
public void setup() {
|
||||
factory = testSerialization.env.getSERIALIZATION_FACTORY();
|
||||
context = new SerializationContextImpl(SerializationSchemeKt.getKryoHeaderV0_1(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, SerializationContext.UseCase.Checkpoint);
|
||||
context = new SerializationContextImpl(KryoSerializationSchemeKt.getKryoHeaderV0_1(), this.getClass().getClassLoader(), AllWhitelist.INSTANCE, Maps.newHashMap(), true, SerializationContext.UseCase.Checkpoint);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -10,6 +10,8 @@ import net.corda.core.node.services.AttachmentStorage
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.nodeapi.internal.AttachmentsClassLoader
|
||||
import net.corda.nodeapi.internal.AttachmentsClassLoaderTests
|
||||
import net.corda.nodeapi.internal.serialization.kryo.CordaKryo
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
|
||||
import net.corda.testing.node.MockAttachmentStorage
|
||||
import net.corda.testing.rigorousMock
|
||||
import org.junit.Rule
|
||||
|
@ -13,6 +13,7 @@ import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.sequence
|
||||
import net.corda.node.serialization.KryoServerSerializationScheme
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
|
||||
import net.corda.testing.ALICE_PUBKEY
|
||||
import net.corda.testing.SerializationEnvironmentRule
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
|
@ -7,6 +7,7 @@ import net.corda.node.services.statemachine.SessionData
|
||||
import net.corda.nodeapi.internal.serialization.amqp.DeserializationInput
|
||||
import net.corda.nodeapi.internal.serialization.amqp.Envelope
|
||||
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
|
||||
import net.corda.testing.amqpSpecific
|
||||
import net.corda.testing.kryoSpecific
|
||||
import net.corda.testing.SerializationEnvironmentRule
|
||||
|
@ -6,6 +6,7 @@ import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.node.services.statemachine.SessionData
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
|
||||
import net.corda.testing.amqpSpecific
|
||||
import net.corda.testing.kryoSpecific
|
||||
import net.corda.testing.SerializationEnvironmentRule
|
||||
@ -84,4 +85,4 @@ class MapsSerializationTest {
|
||||
}
|
||||
assertArrayEquals(output.toByteArray(), serializedForm.bytes)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5,6 +5,10 @@ import com.esotericsoftware.kryo.KryoException
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.nodeapi.internal.serialization.kryo.CordaKryo
|
||||
import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
|
||||
import net.corda.testing.TestDependencyInjectionBase
|
||||
import net.corda.testing.rigorousMock
|
||||
import net.corda.testing.SerializationEnvironmentRule
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
|
@ -5,6 +5,7 @@ import com.esotericsoftware.kryo.util.DefaultClassResolver
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.node.services.statemachine.SessionData
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
|
||||
import net.corda.testing.kryoSpecific
|
||||
import net.corda.testing.SerializationEnvironmentRule
|
||||
import org.junit.Assert.assertArrayEquals
|
||||
|
@ -13,7 +13,6 @@ import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SerializationFactory
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.client.rpc.RPCException
|
||||
import net.corda.nodeapi.internal.serialization.AbstractAMQPSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.AllWhitelist
|
||||
import net.corda.nodeapi.internal.serialization.EmptyWhitelist
|
||||
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory.Companion.isPrimitive
|
||||
|
@ -39,6 +39,7 @@ import net.corda.nodeapi.ConnectionDirection
|
||||
import net.corda.nodeapi.internal.ShutdownHook
|
||||
import net.corda.nodeapi.internal.addShutdownHook
|
||||
import net.corda.nodeapi.internal.serialization.*
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
||||
|
@ -4,10 +4,10 @@ import com.esotericsoftware.kryo.pool.KryoPool
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.node.services.messaging.RpcServerObservableSerializer
|
||||
import net.corda.nodeapi.internal.serialization.AbstractKryoSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.DefaultKryoCustomizer
|
||||
import net.corda.nodeapi.internal.serialization.KryoHeaderV0_1
|
||||
import net.corda.nodeapi.internal.serialization.RPCKryo
|
||||
import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.kryo.DefaultKryoCustomizer
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
|
||||
import net.corda.nodeapi.internal.serialization.kryo.RPCKryo
|
||||
|
||||
class KryoServerSerializationScheme : AbstractKryoSerializationScheme() {
|
||||
override fun canDeserializeVersion(byteSequence: ByteSequence, target: SerializationContext.UseCase): Boolean {
|
||||
|
@ -14,7 +14,7 @@ import net.corda.core.serialization.serialize
|
||||
import net.corda.node.serialization.KryoServerSerializationScheme
|
||||
import net.corda.node.services.config.createKeystoreForCordaNode
|
||||
import net.corda.nodeapi.internal.serialization.AllWhitelist
|
||||
import net.corda.nodeapi.internal.serialization.KryoHeaderV0_1
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
|
||||
import net.corda.nodeapi.internal.serialization.SerializationContextImpl
|
||||
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
|
||||
import net.corda.testing.ALICE
|
||||
|
@ -24,7 +24,7 @@ import net.corda.node.utilities.CertificateAndKeyPair
|
||||
import net.corda.node.utilities.CertificateType
|
||||
import net.corda.node.utilities.X509Utilities
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import net.corda.nodeapi.internal.serialization.AMQP_ENABLED
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AMQP_ENABLED
|
||||
import org.mockito.internal.stubbing.answers.ThrowsException
|
||||
import org.mockito.stubbing.Answer
|
||||
import java.nio.file.Files
|
||||
|
@ -9,6 +9,8 @@ import net.corda.core.serialization.internal.SerializationEnvironment
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.node.serialization.KryoServerSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.*
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
|
||||
import org.junit.rules.TestRule
|
||||
import org.junit.runner.Description
|
||||
import org.junit.runners.model.Statement
|
||||
|
@ -18,8 +18,11 @@ import net.corda.nodeapi.config.NodeSSLConfiguration
|
||||
import net.corda.nodeapi.config.getValue
|
||||
import net.corda.nodeapi.internal.addShutdownHook
|
||||
import net.corda.nodeapi.internal.serialization.*
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AbstractAMQPSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.amqp.AmqpHeaderV1_0
|
||||
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory
|
||||
import net.corda.nodeapi.internal.serialization.kryo.AbstractKryoSerializationScheme
|
||||
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
|
Loading…
x
Reference in New Issue
Block a user