ENT-12070 AMQP Serialisation performance improvements (#7778)

* Serialization performance test of creating wire transaction
* Initial serialization refactoring to enable future caching of schema
* Add caching of schema
* Move encoder pool to the companion object so it actually gets re-used!
* Slightly better cache concurrency for LocalSerializerFactory
* Upgrade grgit to 4.1.1 as 4.0.0 seems to have vanished
This commit is contained in:
Rick Parker 2024-10-10 17:22:07 +01:00 committed by GitHub
parent d721bb7f3e
commit 852127c648
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 342 additions and 40 deletions

View File

@ -919,4 +919,26 @@ class CashTests {
assertEquals(2, wtx.commands.size)
}
@Test(timeout = 300_000)
fun performanceTest() {
val tx = TransactionBuilder(dummyNotary.party)
database.transaction {
val payments = listOf(
PartyAndAmount(miniCorpAnonymised, 400.DOLLARS),
PartyAndAmount(charlie.party.anonymise(), 150.DOLLARS)
)
CashUtils.generateSpend(ourServices, tx, payments, ourServices.myInfo.singleIdentityAndCert())
}
val counts = 1000
val loops = 50
for (loop in 0 until loops) {
val start = System.nanoTime()
for (count in 0 until counts) {
tx.toWireTransaction(ourServices)
}
val end = System.nanoTime()
println("Time per transaction serialize on loop $loop = ${(end - start) / counts} nanoseconds")
}
}
}

View File

@ -775,6 +775,34 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi
assertEquals(desState.encumbrance, state.encumbrance)
}
@Test(timeout = 300_000)
fun performanceTest() {
val state = TransactionState(FooState(), FOO_PROGRAM_ID, MEGA_CORP)
val scheme = AMQPServerSerializationScheme(emptyList())
val func = scheme::class.superclasses.single { it.simpleName == "AbstractAMQPSerializationScheme" }
.java.getDeclaredMethod("registerCustomSerializers",
SerializationContext::class.java,
SerializerFactory::class.java)
func.isAccessible = true
val factory = SerializerFactoryBuilder.build(AllWhitelist,
ClassCarpenterImpl(AllWhitelist, ClassLoader.getSystemClassLoader())
)
func.invoke(scheme, testSerializationContext, factory)
val ser = SerializationOutput(factory)
val counts = 1000
val loops = 50
for (loop in 0 until loops) {
val start = System.nanoTime()
for (count in 0 until counts) {
ser.serialize(state, compression)
}
val end = System.nanoTime()
println("Time per transaction state serialize on loop $loop = ${(end - start) / counts} nanoseconds")
}
}
@Test(timeout=300_000)
fun `test currencies serialize`() {
val factory = SerializerFactoryBuilder.build(AllWhitelist,

View File

@ -5,12 +5,15 @@ import net.corda.core.serialization.ClassWhitelist
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.trace
import net.corda.serialization.internal.model.*
import net.corda.serialization.internal.model.TypeIdentifier.*
import net.corda.serialization.internal.model.FingerPrinter
import net.corda.serialization.internal.model.LocalTypeInformation
import net.corda.serialization.internal.model.LocalTypeModel
import net.corda.serialization.internal.model.TypeIdentifier
import net.corda.serialization.internal.model.TypeIdentifier.Parameterised
import org.apache.qpid.proton.amqp.Symbol
import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
import java.util.*
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Function
import java.util.function.Predicate
@ -82,6 +85,8 @@ interface LocalSerializerFactory {
* when serialising and deserialising.
*/
fun isSuitableForObjectReference(type: Type): Boolean
fun getCachedSchema(types: Set<TypeNotation>): Pair<Schema, TransformsSchema>
}
/**
@ -277,4 +282,24 @@ class DefaultLocalSerializerFactory(
}
}
private val schemaCache = ConcurrentHashMap<Set<TypeNotation>, Pair<Schema, TransformsSchema>>()
override fun getCachedSchema(types: Set<TypeNotation>): Pair<Schema, TransformsSchema> {
val cacheKey = CachingSet(types)
return schemaCache.getOrPut(cacheKey) {
val schema = Schema(cacheKey.toList())
schema to TransformsSchema.build(schema, this)
}
}
private class CachingSet<T>(exisitingSet: Set<T>) : LinkedHashSet<T>(exisitingSet) {
override val size: Int = super.size
private val hashCode = super.hashCode()
override fun hashCode(): Int {
return hashCode
}
override fun equals(other: Any?): Boolean {
return super.equals(other)
}
}
}

View File

@ -0,0 +1,83 @@
package net.corda.serialization.internal.amqp
import org.apache.qpid.proton.codec.ReadableBuffer
import org.apache.qpid.proton.codec.WritableBuffer
import java.io.OutputStream
import java.nio.ByteBuffer
/**
* This class is just a wrapper around an [OutputStream] for Proton-J Encoder. Only the methods
* we are actively using are implemented and tested.
*/
@Suppress("MagicNumber")
class OutputStreamWritableBuffer(private val stream: OutputStream) : WritableBuffer {
private val writeBuffer = ByteArray(8)
override fun put(b: Byte) {
stream.write(b.toInt())
}
override fun put(src: ByteArray, offset: Int, length: Int) {
stream.write(src, offset, length)
}
override fun put(payload: ByteBuffer) {
throw UnsupportedOperationException()
}
override fun put(payload: ReadableBuffer?) {
throw UnsupportedOperationException()
}
override fun putFloat(f: Float) {
throw UnsupportedOperationException()
}
override fun putDouble(d: Double) {
throw UnsupportedOperationException()
}
override fun putShort(s: Short) {
throw UnsupportedOperationException()
}
override fun putInt(i: Int) {
writeBuffer[0] = (i ushr 24).toByte()
writeBuffer[1] = (i ushr 16).toByte()
writeBuffer[2] = (i ushr 8).toByte()
writeBuffer[3] = (i ushr 0).toByte()
put(writeBuffer, 0, 4)
}
override fun putLong(v: Long) {
writeBuffer[0] = (v ushr 56).toByte()
writeBuffer[1] = (v ushr 48).toByte()
writeBuffer[2] = (v ushr 40).toByte()
writeBuffer[3] = (v ushr 32).toByte()
writeBuffer[4] = (v ushr 24).toByte()
writeBuffer[5] = (v ushr 16).toByte()
writeBuffer[6] = (v ushr 8).toByte()
writeBuffer[7] = (v ushr 0).toByte()
put(writeBuffer, 0, 8)
}
override fun hasRemaining(): Boolean {
return true
}
override fun remaining(): Int {
throw UnsupportedOperationException()
}
override fun position(): Int {
throw UnsupportedOperationException()
}
override fun position(position: Int) {
throw UnsupportedOperationException()
}
override fun limit(): Int {
throw UnsupportedOperationException()
}
}

View File

@ -11,7 +11,11 @@ import org.apache.qpid.proton.amqp.DescribedType
import org.apache.qpid.proton.amqp.Symbol
import org.apache.qpid.proton.amqp.UnsignedInteger
import org.apache.qpid.proton.amqp.UnsignedLong
import org.apache.qpid.proton.codec.AMQPType
import org.apache.qpid.proton.codec.Data
import org.apache.qpid.proton.codec.DescribedTypeConstructor
import org.apache.qpid.proton.codec.EncoderImpl
import org.apache.qpid.proton.codec.TypeEncoding
import java.io.NotSerializableException
import java.lang.reflect.Type
@ -50,7 +54,7 @@ private class RedescribedType(
* This and the classes below are OO representations of the AMQP XML schema described in the specification. Their
* [toString] representations generate the associated XML form.
*/
data class Schema(val types: List<TypeNotation>) : DescribedType {
data class Schema(val types: List<TypeNotation>) : CachingDescribedType, DescribedType {
companion object : DescribedTypeConstructor<Schema> {
val DESCRIPTOR = AMQPDescriptorRegistry.SCHEMA.amqpDescriptor
@ -74,8 +78,78 @@ data class Schema(val types: List<TypeNotation>) : DescribedType {
override fun getDescriptor(): Any = DESCRIPTOR
override fun getDescribed(): Any = listOf(types)
override fun toString(): String = types.joinToString("\n")
override val bytes: ByteArray by lazy {
val data = Data.Factory.create()
data.putObject(this)
data.encode().array
}
}
interface CachingDescribedType {
val bytes: ByteArray
}
class CachingWrapper(dataWriter: (Data) -> Unit) : CachingDescribedType {
override val bytes: ByteArray = let {
val data = Data.Factory.create()
dataWriter(data)
data.encode().array
}
}
class CachingDescribedAMQPType<T : CachingDescribedType>(private val type: Class<T>, private val encoder: EncoderImpl) : AMQPType<T> {
override fun getTypeClass(): Class<T> {
return type
}
override fun getCanonicalEncoding(): TypeEncoding<T> {
throw UnsupportedOperationException()
}
override fun getAllEncodings(): MutableCollection<out TypeEncoding<T>> {
throw UnsupportedOperationException()
}
override fun write(obj: T) {
val bytes = obj.bytes
encoder.buffer.put(bytes, 0, bytes.size)
}
override fun getEncoding(obj: T): TypeEncoding<T> {
return object : TypeEncoding<T> {
override fun getType(): AMQPType<T> {
return this@CachingDescribedAMQPType
}
override fun writeConstructor() {
}
override fun getConstructorSize(): Int {
return 0
}
override fun isFixedSizeVal(): Boolean {
return false
}
override fun encodesJavaPrimitive(): Boolean {
return false
}
override fun encodesSuperset(encoder: TypeEncoding<T>?): Boolean {
return false
}
override fun getValueSize(obj: T): Int {
return obj.bytes.size
}
override fun writeValue(obj: T) {
write(obj)
}
}
}
}
data class Descriptor(val name: Symbol?, val code: UnsignedLong? = null) : DescribedType {
@ -215,6 +289,16 @@ data class CompositeType(
override fun getDescribed(): Any = listOf(name, label, provides, descriptor, fields)
private val hashCode = descriptor.hashCode()
override fun hashCode(): Int {
return hashCode
}
override fun equals(other: Any?): Boolean {
if(other !is TypeNotation) return false
return descriptor.equals(other.descriptor)
}
override fun toString(): String {
val sb = StringBuilder("<type class=\"composite\" name=\"$name\"")
if (!label.isNullOrBlank()) {
@ -264,6 +348,16 @@ data class RestrictedType(override val name: String,
override fun getDescribed(): Any = listOf(name, label, provides, source, descriptor, choices)
private val hashCode = descriptor.hashCode()
override fun hashCode(): Int {
return hashCode
}
override fun equals(other: Any?): Boolean {
if(other !is TypeNotation) return false
return descriptor.equals(other.descriptor)
}
override fun toString(): String {
val sb = StringBuilder("<type class=\"restricted\" name=\"$name\"")
if (!label.isNullOrBlank()) {

View File

@ -1,5 +1,6 @@
package net.corda.serialization.internal.amqp
import net.corda.core.internal.LazyPool
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializedBytes
import net.corda.core.utilities.contextLogger
@ -8,12 +9,13 @@ import net.corda.serialization.internal.SectionId
import net.corda.serialization.internal.byteArrayOutput
import net.corda.serialization.internal.model.TypeIdentifier
import org.apache.qpid.proton.codec.Data
import org.apache.qpid.proton.codec.DecoderImpl
import org.apache.qpid.proton.codec.EncoderImpl
import java.io.NotSerializableException
import java.io.OutputStream
import java.lang.reflect.Type
import java.lang.reflect.WildcardType
import java.util.*
import kotlin.collections.LinkedHashSet
data class BytesAndSchemas<T : Any>(
val obj: SerializedBytes<T>,
@ -31,6 +33,15 @@ open class SerializationOutput constructor(
) {
companion object {
private val logger = contextLogger()
private val encoderPool = LazyPool<EncoderImpl> {
EncoderImpl(DecoderImpl()).apply {
registerDescribedType(Envelope::class.java, Envelope.DESCRIPTOR)
register(CachingDescribedAMQPType(CachingWrapper::class.java, this))
register(CachingDescribedAMQPType(Schema::class.java, this))
register(CachingDescribedAMQPType(TransformsSchema::class.java, this))
}
}
}
private val objectHistory: MutableMap<Any, Int> = IdentityHashMap()
@ -74,15 +85,6 @@ open class SerializationOutput constructor(
}
internal fun <T : Any> _serialize(obj: T, context: SerializationContext): SerializedBytes<T> {
val data = Data.Factory.create()
data.withDescribed(Envelope.DESCRIPTOR_OBJECT) {
withList {
writeObject(obj, this, context)
val schema = Schema(schemaHistory.toList())
writeSchema(schema, this)
writeTransformSchema(TransformsSchema.build(schema, serializerFactory), this)
}
}
return SerializedBytes(byteArrayOutput {
var stream: OutputStream = it
try {
@ -94,7 +96,16 @@ open class SerializationOutput constructor(
stream = encoding.wrap(stream)
}
SectionId.DATA_AND_STOP.writeTo(stream)
stream.alsoAsByteBuffer(data.encodedSize().toInt(), data::encode)
encoderPool.reentrantRun { encoderImpl ->
val previousBuffer = encoderImpl.buffer
encoderImpl.setByteBuffer(OutputStreamWritableBuffer(stream))
encoderImpl.writeObject(Envelope(CachingWrapper { data ->
writeObject(obj, data, context)
}) {
serializerFactory.getCachedSchema(schemaHistory)
})
encoderImpl.setByteBuffer(previousBuffer)
}
} finally {
stream.close()
}
@ -105,14 +116,6 @@ open class SerializationOutput constructor(
writeObject(obj, data, obj.javaClass, context)
}
open fun writeSchema(schema: Schema, data: Data) {
data.putObject(schema)
}
open fun writeTransformSchema(transformsSchema: TransformsSchema, data: Data) {
data.putObject(transformsSchema)
}
internal fun writeObjectOrNull(obj: Any?, data: Data, type: Type, context: SerializationContext, debugIndent: Int) {
if (obj == null) {
data.putNull()

View File

@ -4,9 +4,10 @@ import net.corda.core.serialization.CordaSerializationTransformEnumDefault
import net.corda.core.serialization.CordaSerializationTransformRename
import net.corda.serialization.internal.model.LocalTypeInformation
import org.apache.qpid.proton.amqp.DescribedType
import org.apache.qpid.proton.codec.Data
import org.apache.qpid.proton.codec.DescribedTypeConstructor
import java.io.NotSerializableException
import java.util.*
import java.util.EnumMap
// NOTE: We are effectively going to replicate the annotations, we need to do this because
// we can't instantiate instances of those annotation classes and this code needs to
@ -243,7 +244,7 @@ object TransformsAnnotationProcessor {
* @property types maps class names to a map of transformation types. In turn those transformation types
* are each a list of instances o that transform.
*/
data class TransformsSchema(val types: Map<String, EnumMap<TransformTypes, MutableList<Transform>>>) : DescribedType {
data class TransformsSchema(val types: Map<String, EnumMap<TransformTypes, MutableList<Transform>>>) : CachingDescribedType, DescribedType {
companion object : DescribedTypeConstructor<TransformsSchema> {
val DESCRIPTOR = AMQPDescriptorRegistry.TRANSFORM_SCHEMA.amqpDescriptor
@ -341,6 +342,12 @@ data class TransformsSchema(val types: Map<String, EnumMap<TransformTypes, Mutab
return sb.toString()
}
override val bytes: ByteArray by lazy {
val data = Data.Factory.create()
data.putObject(this)
data.encode().array
}
}
private fun String.esc() = "\"$this\""

View File

@ -0,0 +1,54 @@
package net.corda.serialization.internal.amqp
import org.apache.qpid.proton.codec.ReadableBuffer.ByteBufferReader
import org.junit.Test
import java.io.ByteArrayOutputStream
import kotlin.test.assertEquals
class OutputStreamWritableBufferTests {
@Test(timeout = 300_000)
fun testByte() {
val stream = ByteArrayOutputStream()
val buffer = OutputStreamWritableBuffer(stream)
var b = Byte.MIN_VALUE
while (b <= Byte.MAX_VALUE) {
buffer.put(b)
if (b == Byte.MAX_VALUE) break
b++
}
stream.close()
b = Byte.MIN_VALUE
val bytes = stream.toByteArray()
for (byte in bytes) {
assertEquals(b++, byte)
}
}
@Test(timeout = 300_000)
fun testInt() {
val stream = ByteArrayOutputStream()
val buffer = OutputStreamWritableBuffer(stream)
buffer.putInt(Int.MIN_VALUE)
buffer.putInt(Int.MAX_VALUE)
stream.close()
val reader = ByteBufferReader.wrap(stream.toByteArray())
assertEquals(Int.MIN_VALUE, reader.int)
assertEquals(Int.MAX_VALUE, reader.int)
}
@Test(timeout = 300_000)
fun testLong() {
val stream = ByteArrayOutputStream()
val buffer = OutputStreamWritableBuffer(stream)
buffer.putLong(Long.MIN_VALUE)
buffer.putLong(Long.MAX_VALUE)
stream.close()
val reader = ByteBufferReader.wrap(stream.toByteArray())
assertEquals(Long.MIN_VALUE, reader.long)
assertEquals(Long.MAX_VALUE, reader.long)
}
}

View File

@ -9,7 +9,6 @@ import net.corda.serialization.internal.AllWhitelist
import net.corda.serialization.internal.EmptyWhitelist
import net.corda.serialization.internal.amqp.*
import net.corda.serialization.internal.carpenter.ClassCarpenterImpl
import org.apache.qpid.proton.codec.Data
import org.junit.Test
import java.io.File.separatorChar
import java.io.NotSerializableException
@ -63,19 +62,6 @@ class TestSerializationOutput(
serializerFactory: SerializerFactory = testDefaultFactory())
: SerializationOutput(serializerFactory) {
override fun writeSchema(schema: Schema, data: Data) {
if (verbose) println(schema)
super.writeSchema(schema, data)
}
override fun writeTransformSchema(transformsSchema: TransformsSchema, data: Data) {
if(verbose) {
println ("Writing Transform Schema")
println (transformsSchema)
}
super.writeTransformSchema(transformsSchema, data)
}
@Throws(NotSerializableException::class)
fun <T : Any> serialize(obj: T): SerializedBytes<T> {
try {