ENT-10284 Performance optimise deserialisation (#7425)

This commit is contained in:
Rick Parker
2023-07-20 09:51:35 +01:00
committed by GitHub
parent bcf4c11420
commit 48213b5f8c
10 changed files with 288 additions and 96 deletions

View File

@ -1,6 +1,7 @@
package net.corda.serialization.internal.amqp
import net.corda.core.KeepForDJVM
import net.corda.core.internal.LazyPool
import net.corda.core.internal.VisibleForTesting
import net.corda.core.serialization.AMQP_ENVELOPE_CACHE_PROPERTY
import net.corda.core.serialization.EncodingWhitelist
@ -18,7 +19,8 @@ import net.corda.serialization.internal.model.TypeIdentifier
import org.apache.qpid.proton.amqp.Binary
import org.apache.qpid.proton.amqp.DescribedType
import org.apache.qpid.proton.amqp.UnsignedInteger
import org.apache.qpid.proton.codec.Data
import org.apache.qpid.proton.codec.DecoderImpl
import org.apache.qpid.proton.codec.EncoderImpl
import java.io.InputStream
import java.io.NotSerializableException
import java.lang.reflect.ParameterizedType
@ -72,17 +74,32 @@ class DeserializationInput constructor(
}
}
private val decoderPool = LazyPool<DecoderImpl> {
val decoder = DecoderImpl().apply {
register(Envelope.DESCRIPTOR, Envelope.FastPathConstructor(this))
register(Schema.DESCRIPTOR, Schema)
register(Descriptor.DESCRIPTOR, Descriptor)
register(Field.DESCRIPTOR, Field)
register(CompositeType.DESCRIPTOR, CompositeType)
register(Choice.DESCRIPTOR, Choice)
register(RestrictedType.DESCRIPTOR, RestrictedType)
register(ReferencedObject.DESCRIPTOR, ReferencedObject)
register(TransformsSchema.DESCRIPTOR, TransformsSchema)
register(TransformTypes.DESCRIPTOR, TransformTypes)
}
EncoderImpl(decoder)
decoder
}
@Throws(AMQPNoTypeNotSerializableException::class)
fun getEnvelope(byteSequence: ByteSequence, encodingWhitelist: EncodingWhitelist = NullEncodingWhitelist): Envelope {
fun getEnvelope(byteSequence: ByteSequence, encodingWhitelist: EncodingWhitelist = NullEncodingWhitelist, lazy: Boolean = false): Envelope {
return withDataBytes(byteSequence, encodingWhitelist) { dataBytes ->
val data = Data.Factory.create()
val expectedSize = dataBytes.remaining()
if (data.decode(dataBytes) != expectedSize.toLong()) {
throw AMQPNoTypeNotSerializableException(
"Unexpected size of data",
"Blob is corrupted!.")
decoderPool.reentrantRun {
it.byteBuffer = dataBytes
(it.readObject() as Envelope).apply {
if (!lazy) this.resolvedSchema
}
}
Envelope.get(data)
}
}
}
@ -124,22 +141,29 @@ class DeserializationInput constructor(
fun <T : Any> deserialize(bytes: ByteSequence, clazz: Class<T>, context: SerializationContext): T =
des {
/**
* The cache uses object identity rather than [ByteSequence.equals] and
* [ByteSequence.hashCode]. This is for speed: each [ByteSequence] object
* can potentially be large, and we are optimizing for the case when we
* know we will be deserializing the exact same objects multiple times.
* This also means that the cache MUST be short-lived, as otherwise it
* becomes a memory leak.
* So that the [DecoderImpl] is held whilst we get the [Envelope] and [doReadObject],
* since we are using lazy when getting the [Envelope] which means [Schema] and
* [TransformsSchema] are only parsed out of the [bytes] if demanded by [doReadObject].
*/
@Suppress("unchecked_cast")
val envelope = (context.properties[AMQP_ENVELOPE_CACHE_PROPERTY] as? MutableMap<IdentityKey, Envelope>)
?.computeIfAbsent(IdentityKey(bytes)) { key ->
getEnvelope(key.bytes, context.encodingWhitelist)
} ?: getEnvelope(bytes, context.encodingWhitelist)
decoderPool.reentrantRun {
/**
* The cache uses object identity rather than [ByteSequence.equals] and
* [ByteSequence.hashCode]. This is for speed: each [ByteSequence] object
* can potentially be large, and we are optimizing for the case when we
* know we will be deserializing the exact same objects multiple times.
* This also means that the cache MUST be short-lived, as otherwise it
* becomes a memory leak.
*/
@Suppress("unchecked_cast")
val envelope = (context.properties[AMQP_ENVELOPE_CACHE_PROPERTY] as? MutableMap<IdentityKey, Envelope>)
?.computeIfAbsent(IdentityKey(bytes)) { key ->
getEnvelope(key.bytes, context.encodingWhitelist, true)
} ?: getEnvelope(bytes, context.encodingWhitelist, true)
logger.trace { "deserialize blob scheme=\"${envelope.schema}\"" }
logger.trace { "deserialize blob scheme=\"${envelope.schema}\"" }
doReadObject(envelope, clazz, context)
doReadObject(envelope, clazz, context)
}
}
@Throws(NotSerializableException::class)
@ -155,10 +179,10 @@ class DeserializationInput constructor(
private fun <T: Any> doReadObject(envelope: Envelope, clazz: Class<T>, context: SerializationContext): T {
return clazz.cast(readObjectOrNull(
obj = redescribe(envelope.obj, clazz),
schema = SerializationSchemas(envelope.schema, envelope.transformsSchema),
type = clazz,
context = context
obj = redescribe(envelope.obj, clazz),
schema = SerializationSchemas(envelope::resolvedSchema),
type = clazz,
context = context
))
}

View File

@ -1,70 +1,85 @@
package net.corda.serialization.internal.amqp
import net.corda.core.KeepForDJVM
import org.apache.qpid.proton.ProtonException
import org.apache.qpid.proton.amqp.DescribedType
import org.apache.qpid.proton.codec.Data
import org.apache.qpid.proton.codec.DescribedTypeConstructor
import org.apache.qpid.proton.codec.DecoderImpl
import org.apache.qpid.proton.codec.EncodingCodes
import org.apache.qpid.proton.codec.FastPathDescribedTypeConstructor
import java.nio.Buffer
import java.nio.ByteBuffer
/**
* This class wraps all serialized data, so that the schema can be carried along with it. We will provide various
* internal utilities to decompose and recompose with/without schema etc so that e.g. we can store objects with a
* (relationally) normalised out schema to avoid excessive duplication.
*/
// TODO: make the schema parsing lazy since mostly schemas will have been seen before and we only need it if we
// TODO: don't recognise a type descriptor.
@KeepForDJVM
data class Envelope(val obj: Any?, val schema: Schema, val transformsSchema: TransformsSchema) : DescribedType {
companion object : DescribedTypeConstructor<Envelope> {
class Envelope(val obj: Any?, resolveSchema: () -> Pair<Schema, TransformsSchema>) : DescribedType {
val resolvedSchema: Pair<Schema, TransformsSchema> by lazy(resolveSchema)
val schema: Schema get() = resolvedSchema.first
val transformsSchema: TransformsSchema get() = resolvedSchema.second
companion object {
val DESCRIPTOR = AMQPDescriptorRegistry.ENVELOPE.amqpDescriptor
val DESCRIPTOR_OBJECT = Descriptor(null, DESCRIPTOR)
// described list should either be two or three elements long
private const val ENVELOPE_WITHOUT_TRANSFORMS = 2
private const val ENVELOPE_WITH_TRANSFORMS = 3
}
private const val BLOB_IDX = 0
private const val SCHEMA_IDX = 1
private const val TRANSFORMS_SCHEMA_IDX = 2
class FastPathConstructor(private val decoder: DecoderImpl) : FastPathDescribedTypeConstructor<Envelope> {
fun get(data: Data): Envelope {
val describedType = data.`object` as DescribedType
if (describedType.descriptor != DESCRIPTOR) {
throw AMQPNoTypeNotSerializableException(
"Unexpected descriptor ${describedType.descriptor}, should be $DESCRIPTOR.")
private val _buffer: ByteBuffer get() = decoder.byteBuffer
@Suppress("ComplexMethod", "MagicNumber")
private fun readEncodingAndReturnSize(buffer: ByteBuffer, inBytes: Boolean = true): Int {
val encodingCode: Byte = buffer.get()
return when (encodingCode) {
EncodingCodes.LIST8 -> {
(buffer.get().toInt() and 0xff).let { if (inBytes) it else (buffer.get().toInt() and 0xff) }
}
EncodingCodes.LIST32 -> {
buffer.int.let { if (inBytes) it else buffer.int }
}
else -> throw ProtonException("Expected List type but found encoding: $encodingCode")
}
val list = describedType.described as List<*>
// We need to cope with objects serialised without the transforms header element in the
// envelope
val transformSchema: Any? = when (list.size) {
ENVELOPE_WITHOUT_TRANSFORMS -> null
ENVELOPE_WITH_TRANSFORMS -> list[TRANSFORMS_SCHEMA_IDX]
else -> throw AMQPNoTypeNotSerializableException(
"Malformed list, bad length of ${list.size} (should be 2 or 3)")
}
return newInstance(listOf(list[BLOB_IDX], Schema.get(list[SCHEMA_IDX]!!),
TransformsSchema.newInstance(transformSchema)))
}
// This separation of functions is needed as this will be the entry point for the default
// AMQP decoder if one is used (see the unit tests).
override fun newInstance(described: Any?): Envelope {
val list = described as? List<*> ?: throw IllegalStateException("Was expecting a list")
// We need to cope with objects serialised without the transforms header element in the
// envelope
val transformSchema = when (list.size) {
ENVELOPE_WITHOUT_TRANSFORMS -> TransformsSchema.newInstance(null)
ENVELOPE_WITH_TRANSFORMS -> list[TRANSFORMS_SCHEMA_IDX] as TransformsSchema
else -> throw AMQPNoTypeNotSerializableException(
"Malformed list, bad length of ${list.size} (should be 2 or 3)")
override fun readValue(): Envelope? {
val buffer = _buffer
val size = readEncodingAndReturnSize(buffer, false)
if (size != ENVELOPE_WITHOUT_TRANSFORMS && size != ENVELOPE_WITH_TRANSFORMS) {
throw AMQPNoTypeNotSerializableException("Malformed list, bad length of $size (should be 2 or 3)")
}
return Envelope(list[BLOB_IDX], list[SCHEMA_IDX] as Schema, transformSchema)
val data = Data.Factory.create()
data.decode(buffer)
val obj = data.`object`
val lambda: () -> Pair<Schema, TransformsSchema> = {
data.decode(buffer)
val schema = data.`object`
val transformsSchema = if (size > 2) {
data.decode(buffer)
data.`object`
} else null
Schema.get(schema) to TransformsSchema.newInstance(transformsSchema)
}
return Envelope(obj, lambda)
}
override fun getTypeClass(): Class<*> = Envelope::class.java
override fun skipValue() {
val buffer = _buffer
val size = readEncodingAndReturnSize(buffer)
(buffer as Buffer).position(buffer.position() + size)
}
override fun encodesJavaPrimitive(): Boolean = false
override fun getTypeClass(): Class<Envelope> = Envelope::class.java
}
override fun getDescriptor(): Any = DESCRIPTOR

View File

@ -5,9 +5,13 @@ import net.corda.core.internal.uncheckedCast
import net.corda.serialization.internal.CordaSerializationMagic
import net.corda.serialization.internal.amqp.AMQPTypeIdentifiers.isPrimitive
import net.corda.serialization.internal.model.TypeIdentifier
import net.corda.serialization.internal.model.TypeIdentifier.TopType
import net.corda.serialization.internal.model.TypeIdentifier.Companion.forGenericType
import org.apache.qpid.proton.amqp.*
import net.corda.serialization.internal.model.TypeIdentifier.TopType
import org.apache.qpid.proton.amqp.Binary
import org.apache.qpid.proton.amqp.DescribedType
import org.apache.qpid.proton.amqp.Symbol
import org.apache.qpid.proton.amqp.UnsignedInteger
import org.apache.qpid.proton.amqp.UnsignedLong
import org.apache.qpid.proton.codec.DescribedTypeConstructor
import java.io.NotSerializableException
import java.lang.reflect.Type

View File

@ -5,7 +5,17 @@ import java.io.NotSerializableException
import javax.annotation.concurrent.ThreadSafe
@KeepForDJVM
data class SerializationSchemas(val schema: Schema, val transforms: TransformsSchema)
class SerializationSchemas(resolveSchema: () -> Pair<Schema, TransformsSchema>) {
constructor(schema: Schema, transforms: TransformsSchema) : this({ schema to transforms })
private val resolvedSchema: Pair<Schema, TransformsSchema> by lazy(resolveSchema)
val schema: Schema get() = resolvedSchema.first
val transforms: TransformsSchema get() = resolvedSchema.second
operator fun component1(): Schema = schema
operator fun component2(): TransformsSchema = transforms
}
/**
* Factory of serializers designed to be shared across threads and invocations.

View File

@ -183,7 +183,7 @@ public class JavaSerializationOutputTests {
DecoderImpl decoder = new DecoderImpl();
decoder.register(Envelope.Companion.getDESCRIPTOR(), Envelope.Companion);
decoder.register(Envelope.Companion.getDESCRIPTOR(), new Envelope.FastPathConstructor(decoder));
decoder.register(Schema.Companion.getDESCRIPTOR(), Schema.Companion);
decoder.register(Descriptor.Companion.getDESCRIPTOR(), Descriptor.Companion);
decoder.register(Field.Companion.getDESCRIPTOR(), Field.Companion);