diff --git a/finance/contracts/src/test/kotlin/net/corda/finance/contracts/asset/CashTests.kt b/finance/contracts/src/test/kotlin/net/corda/finance/contracts/asset/CashTests.kt index 9ac767da8d..9315f4898d 100644 --- a/finance/contracts/src/test/kotlin/net/corda/finance/contracts/asset/CashTests.kt +++ b/finance/contracts/src/test/kotlin/net/corda/finance/contracts/asset/CashTests.kt @@ -919,4 +919,26 @@ class CashTests { assertEquals(2, wtx.commands.size) } + + @Test(timeout = 300_000) + fun performanceTest() { + val tx = TransactionBuilder(dummyNotary.party) + database.transaction { + val payments = listOf( + PartyAndAmount(miniCorpAnonymised, 400.DOLLARS), + PartyAndAmount(charlie.party.anonymise(), 150.DOLLARS) + ) + CashUtils.generateSpend(ourServices, tx, payments, ourServices.myInfo.singleIdentityAndCert()) + } + val counts = 1000 + val loops = 50 + for (loop in 0 until loops) { + val start = System.nanoTime() + for (count in 0 until counts) { + tx.toWireTransaction(ourServices) + } + val end = System.nanoTime() + println("Time per transaction serialize on loop $loop = ${(end - start) / counts} nanoseconds") + } + } } diff --git a/serialization-tests/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt b/serialization-tests/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt index 07507ae219..9b7bd0a1b0 100644 --- a/serialization-tests/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt +++ b/serialization-tests/src/test/kotlin/net/corda/serialization/internal/amqp/SerializationOutputTests.kt @@ -775,6 +775,34 @@ class SerializationOutputTests(private val compression: CordaSerializationEncodi assertEquals(desState.encumbrance, state.encumbrance) } + @Test(timeout = 300_000) + fun performanceTest() { + val state = TransactionState(FooState(), FOO_PROGRAM_ID, MEGA_CORP) + val scheme = AMQPServerSerializationScheme(emptyList()) + val func = scheme::class.superclasses.single { it.simpleName == "AbstractAMQPSerializationScheme" } + .java.getDeclaredMethod("registerCustomSerializers", + SerializationContext::class.java, + SerializerFactory::class.java) + func.isAccessible = true + + val factory = SerializerFactoryBuilder.build(AllWhitelist, + ClassCarpenterImpl(AllWhitelist, ClassLoader.getSystemClassLoader()) + ) + func.invoke(scheme, testSerializationContext, factory) + val ser = SerializationOutput(factory) + + val counts = 1000 + val loops = 50 + for (loop in 0 until loops) { + val start = System.nanoTime() + for (count in 0 until counts) { + ser.serialize(state, compression) + } + val end = System.nanoTime() + println("Time per transaction state serialize on loop $loop = ${(end - start) / counts} nanoseconds") + } + } + @Test(timeout=300_000) fun `test currencies serialize`() { val factory = SerializerFactoryBuilder.build(AllWhitelist, diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/LocalSerializerFactory.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/LocalSerializerFactory.kt index 2106818434..1ed05feade 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/LocalSerializerFactory.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/LocalSerializerFactory.kt @@ -5,12 +5,15 @@ import net.corda.core.serialization.ClassWhitelist import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug import net.corda.core.utilities.trace -import net.corda.serialization.internal.model.* -import net.corda.serialization.internal.model.TypeIdentifier.* +import net.corda.serialization.internal.model.FingerPrinter +import net.corda.serialization.internal.model.LocalTypeInformation +import net.corda.serialization.internal.model.LocalTypeModel +import net.corda.serialization.internal.model.TypeIdentifier +import net.corda.serialization.internal.model.TypeIdentifier.Parameterised import org.apache.qpid.proton.amqp.Symbol import java.lang.reflect.ParameterizedType import java.lang.reflect.Type -import java.util.* +import java.util.Optional import java.util.concurrent.ConcurrentHashMap import java.util.function.Function import java.util.function.Predicate @@ -82,6 +85,8 @@ interface LocalSerializerFactory { * when serialising and deserialising. */ fun isSuitableForObjectReference(type: Type): Boolean + + fun getCachedSchema(types: Set): Pair } /** @@ -277,4 +282,24 @@ class DefaultLocalSerializerFactory( } } + private val schemaCache = ConcurrentHashMap, Pair>() + + override fun getCachedSchema(types: Set): Pair { + val cacheKey = CachingSet(types) + return schemaCache.getOrPut(cacheKey) { + val schema = Schema(cacheKey.toList()) + schema to TransformsSchema.build(schema, this) + } + } + + private class CachingSet(exisitingSet: Set) : LinkedHashSet(exisitingSet) { + override val size: Int = super.size + private val hashCode = super.hashCode() + override fun hashCode(): Int { + return hashCode + } + override fun equals(other: Any?): Boolean { + return super.equals(other) + } + } } diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/OutputStreamWritableBuffer.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/OutputStreamWritableBuffer.kt new file mode 100644 index 0000000000..c3a42d15de --- /dev/null +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/OutputStreamWritableBuffer.kt @@ -0,0 +1,83 @@ +package net.corda.serialization.internal.amqp + +import org.apache.qpid.proton.codec.ReadableBuffer +import org.apache.qpid.proton.codec.WritableBuffer +import java.io.OutputStream +import java.nio.ByteBuffer + +/** + * This class is just a wrapper around an [OutputStream] for Proton-J Encoder. Only the methods + * we are actively using are implemented and tested. + */ +@Suppress("MagicNumber") +class OutputStreamWritableBuffer(private val stream: OutputStream) : WritableBuffer { + private val writeBuffer = ByteArray(8) + + override fun put(b: Byte) { + stream.write(b.toInt()) + } + + override fun put(src: ByteArray, offset: Int, length: Int) { + stream.write(src, offset, length) + } + + override fun put(payload: ByteBuffer) { + throw UnsupportedOperationException() + } + + override fun put(payload: ReadableBuffer?) { + throw UnsupportedOperationException() + } + + override fun putFloat(f: Float) { + throw UnsupportedOperationException() + } + + override fun putDouble(d: Double) { + throw UnsupportedOperationException() + } + + override fun putShort(s: Short) { + throw UnsupportedOperationException() + } + + override fun putInt(i: Int) { + writeBuffer[0] = (i ushr 24).toByte() + writeBuffer[1] = (i ushr 16).toByte() + writeBuffer[2] = (i ushr 8).toByte() + writeBuffer[3] = (i ushr 0).toByte() + put(writeBuffer, 0, 4) + } + + override fun putLong(v: Long) { + writeBuffer[0] = (v ushr 56).toByte() + writeBuffer[1] = (v ushr 48).toByte() + writeBuffer[2] = (v ushr 40).toByte() + writeBuffer[3] = (v ushr 32).toByte() + writeBuffer[4] = (v ushr 24).toByte() + writeBuffer[5] = (v ushr 16).toByte() + writeBuffer[6] = (v ushr 8).toByte() + writeBuffer[7] = (v ushr 0).toByte() + put(writeBuffer, 0, 8) + } + + override fun hasRemaining(): Boolean { + return true + } + + override fun remaining(): Int { + throw UnsupportedOperationException() + } + + override fun position(): Int { + throw UnsupportedOperationException() + } + + override fun position(position: Int) { + throw UnsupportedOperationException() + } + + override fun limit(): Int { + throw UnsupportedOperationException() + } +} diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/Schema.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/Schema.kt index 46d85fbd1a..73567041dc 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/Schema.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/Schema.kt @@ -11,7 +11,11 @@ import org.apache.qpid.proton.amqp.DescribedType import org.apache.qpid.proton.amqp.Symbol import org.apache.qpid.proton.amqp.UnsignedInteger import org.apache.qpid.proton.amqp.UnsignedLong +import org.apache.qpid.proton.codec.AMQPType +import org.apache.qpid.proton.codec.Data import org.apache.qpid.proton.codec.DescribedTypeConstructor +import org.apache.qpid.proton.codec.EncoderImpl +import org.apache.qpid.proton.codec.TypeEncoding import java.io.NotSerializableException import java.lang.reflect.Type @@ -50,7 +54,7 @@ private class RedescribedType( * This and the classes below are OO representations of the AMQP XML schema described in the specification. Their * [toString] representations generate the associated XML form. */ -data class Schema(val types: List) : DescribedType { +data class Schema(val types: List) : CachingDescribedType, DescribedType { companion object : DescribedTypeConstructor { val DESCRIPTOR = AMQPDescriptorRegistry.SCHEMA.amqpDescriptor @@ -74,8 +78,78 @@ data class Schema(val types: List) : DescribedType { override fun getDescriptor(): Any = DESCRIPTOR override fun getDescribed(): Any = listOf(types) - override fun toString(): String = types.joinToString("\n") + override val bytes: ByteArray by lazy { + val data = Data.Factory.create() + data.putObject(this) + data.encode().array + } +} + +interface CachingDescribedType { + val bytes: ByteArray +} + +class CachingWrapper(dataWriter: (Data) -> Unit) : CachingDescribedType { + override val bytes: ByteArray = let { + val data = Data.Factory.create() + dataWriter(data) + data.encode().array + } +} + +class CachingDescribedAMQPType(private val type: Class, private val encoder: EncoderImpl) : AMQPType { + override fun getTypeClass(): Class { + return type + } + + override fun getCanonicalEncoding(): TypeEncoding { + throw UnsupportedOperationException() + } + + override fun getAllEncodings(): MutableCollection> { + throw UnsupportedOperationException() + } + + override fun write(obj: T) { + val bytes = obj.bytes + encoder.buffer.put(bytes, 0, bytes.size) + } + + override fun getEncoding(obj: T): TypeEncoding { + return object : TypeEncoding { + override fun getType(): AMQPType { + return this@CachingDescribedAMQPType + } + + override fun writeConstructor() { + } + + override fun getConstructorSize(): Int { + return 0 + } + + override fun isFixedSizeVal(): Boolean { + return false + } + + override fun encodesJavaPrimitive(): Boolean { + return false + } + + override fun encodesSuperset(encoder: TypeEncoding?): Boolean { + return false + } + + override fun getValueSize(obj: T): Int { + return obj.bytes.size + } + + override fun writeValue(obj: T) { + write(obj) + } + } + } } data class Descriptor(val name: Symbol?, val code: UnsignedLong? = null) : DescribedType { @@ -215,6 +289,16 @@ data class CompositeType( override fun getDescribed(): Any = listOf(name, label, provides, descriptor, fields) + private val hashCode = descriptor.hashCode() + override fun hashCode(): Int { + return hashCode + } + + override fun equals(other: Any?): Boolean { + if(other !is TypeNotation) return false + return descriptor.equals(other.descriptor) + } + override fun toString(): String { val sb = StringBuilder("( val obj: SerializedBytes, @@ -31,6 +33,15 @@ open class SerializationOutput constructor( ) { companion object { private val logger = contextLogger() + + private val encoderPool = LazyPool { + EncoderImpl(DecoderImpl()).apply { + registerDescribedType(Envelope::class.java, Envelope.DESCRIPTOR) + register(CachingDescribedAMQPType(CachingWrapper::class.java, this)) + register(CachingDescribedAMQPType(Schema::class.java, this)) + register(CachingDescribedAMQPType(TransformsSchema::class.java, this)) + } + } } private val objectHistory: MutableMap = IdentityHashMap() @@ -74,15 +85,6 @@ open class SerializationOutput constructor( } internal fun _serialize(obj: T, context: SerializationContext): SerializedBytes { - val data = Data.Factory.create() - data.withDescribed(Envelope.DESCRIPTOR_OBJECT) { - withList { - writeObject(obj, this, context) - val schema = Schema(schemaHistory.toList()) - writeSchema(schema, this) - writeTransformSchema(TransformsSchema.build(schema, serializerFactory), this) - } - } return SerializedBytes(byteArrayOutput { var stream: OutputStream = it try { @@ -94,7 +96,16 @@ open class SerializationOutput constructor( stream = encoding.wrap(stream) } SectionId.DATA_AND_STOP.writeTo(stream) - stream.alsoAsByteBuffer(data.encodedSize().toInt(), data::encode) + encoderPool.reentrantRun { encoderImpl -> + val previousBuffer = encoderImpl.buffer + encoderImpl.setByteBuffer(OutputStreamWritableBuffer(stream)) + encoderImpl.writeObject(Envelope(CachingWrapper { data -> + writeObject(obj, data, context) + }) { + serializerFactory.getCachedSchema(schemaHistory) + }) + encoderImpl.setByteBuffer(previousBuffer) + } } finally { stream.close() } @@ -105,14 +116,6 @@ open class SerializationOutput constructor( writeObject(obj, data, obj.javaClass, context) } - open fun writeSchema(schema: Schema, data: Data) { - data.putObject(schema) - } - - open fun writeTransformSchema(transformsSchema: TransformsSchema, data: Data) { - data.putObject(transformsSchema) - } - internal fun writeObjectOrNull(obj: Any?, data: Data, type: Type, context: SerializationContext, debugIndent: Int) { if (obj == null) { data.putNull() diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/TransformsSchema.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/TransformsSchema.kt index 212ea85dd2..de99a98028 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/TransformsSchema.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/TransformsSchema.kt @@ -4,9 +4,10 @@ import net.corda.core.serialization.CordaSerializationTransformEnumDefault import net.corda.core.serialization.CordaSerializationTransformRename import net.corda.serialization.internal.model.LocalTypeInformation import org.apache.qpid.proton.amqp.DescribedType +import org.apache.qpid.proton.codec.Data import org.apache.qpid.proton.codec.DescribedTypeConstructor import java.io.NotSerializableException -import java.util.* +import java.util.EnumMap // NOTE: We are effectively going to replicate the annotations, we need to do this because // we can't instantiate instances of those annotation classes and this code needs to @@ -243,7 +244,7 @@ object TransformsAnnotationProcessor { * @property types maps class names to a map of transformation types. In turn those transformation types * are each a list of instances o that transform. */ -data class TransformsSchema(val types: Map>>) : DescribedType { +data class TransformsSchema(val types: Map>>) : CachingDescribedType, DescribedType { companion object : DescribedTypeConstructor { val DESCRIPTOR = AMQPDescriptorRegistry.TRANSFORM_SCHEMA.amqpDescriptor @@ -341,6 +342,12 @@ data class TransformsSchema(val types: Map serialize(obj: T): SerializedBytes { try {