CORDA-540: Cater for repeated object references found in the AMQP serialization graph (#1326)

Also provide unit test that clearly exposes the problem
This commit is contained in:
Viktor Kolomeyko 2017-08-29 17:59:03 +01:00 committed by GitHub
parent 9664954920
commit 0176184a86
6 changed files with 162 additions and 45 deletions

View File

@ -3,15 +3,12 @@ package net.corda.nodeapi.internal.serialization.amqp
import net.corda.core.internal.getStackTraceAsString
import net.corda.core.serialization.SerializedBytes
import net.corda.core.utilities.ByteSequence
import org.apache.qpid.proton.amqp.Binary
import org.apache.qpid.proton.amqp.DescribedType
import org.apache.qpid.proton.amqp.UnsignedByte
import org.apache.qpid.proton.amqp.*
import org.apache.qpid.proton.codec.Data
import java.io.NotSerializableException
import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
import java.nio.ByteBuffer
import java.util.*
data class ObjectAndEnvelope<out T>(val obj: T, val envelope: Envelope)
@ -22,8 +19,7 @@ data class ObjectAndEnvelope<out T>(val obj: T, val envelope: Envelope)
* instances and threads.
*/
class DeserializationInput(internal val serializerFactory: SerializerFactory) {
// TODO: we're not supporting object refs yet
private val objectHistory: MutableList<Any> = ArrayList()
private val objectHistory: MutableList<Any> = mutableListOf()
internal companion object {
val BYTES_NEEDED_TO_PEEK: Int = 23
@ -115,19 +111,35 @@ class DeserializationInput(internal val serializerFactory: SerializerFactory) {
return if (obj == null) null else readObject(obj, schema, type)
}
internal fun readObject(obj: Any, schema: Schema, type: Type): Any {
if (obj is DescribedType) {
internal fun readObject(obj: Any, schema: Schema, type: Type): Any =
if (obj is DescribedType && ReferencedObject.DESCRIPTOR == obj.descriptor) {
// It must be a reference to an instance that has already been read, cheaply and quickly returning it by reference.
val objectIndex = (obj.described as UnsignedInteger).toInt()
if (objectIndex !in 0..objectHistory.size)
throw NotSerializableException("Retrieval of existing reference failed. Requested index $objectIndex " +
"is outside of the bounds for the list of size: ${objectHistory.size}")
val objectRetrieved = objectHistory[objectIndex]
if (!objectRetrieved::class.java.isSubClassOf(type))
throw NotSerializableException("Existing reference type mismatch. Expected: '$type', found: '${objectRetrieved::class.java}'")
objectRetrieved
}
else {
val objectRead = when (obj) {
is DescribedType -> {
// Look up serializer in factory by descriptor
val serializer = serializerFactory.get(obj.descriptor, schema)
if (serializer.type != type && with(serializer.type) { !isSubClassOf(type) && !materiallyEquivalentTo(type) })
throw NotSerializableException("Described type with descriptor ${obj.descriptor} was " +
"expected to be of type $type but was ${serializer.type}")
return serializer.readObject(obj.described, schema, this)
} else if (obj is Binary) {
return obj.array
} else {
return obj
serializer.readObject(obj.described, schema, this)
}
is Binary -> obj.array
else -> obj // this will be the case for primitive types like [boolean] et al.
}
// Store the reference in case we need it later on.
objectHistory.add(objectRead)
objectRead
}
/**

View File

@ -1,7 +1,8 @@
package net.corda.nodeapi.internal.serialization.amqp
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory.Companion.nameForType
import org.apache.qpid.proton.amqp.UnsignedInteger
import org.apache.qpid.proton.codec.Data
import java.io.NotSerializableException
import java.lang.reflect.Type
@ -15,6 +16,8 @@ open class ObjectSerializer(val clazz: Type, factory: SerializerFactory) : AMQPS
open val kotlinConstructor = constructorForDeserialization(clazz)
val javaConstructor by lazy { kotlinConstructor?.javaConstructor }
private val logger = loggerFor<ObjectSerializer>()
open internal val propertySerializers: Collection<PropertySerializer> by lazy {
propertiesForSerialization(kotlinConstructor, clazz, factory)
}
@ -50,10 +53,7 @@ open class ObjectSerializer(val clazz: Type, factory: SerializerFactory) : AMQPS
}
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): Any {
if (obj is UnsignedInteger) {
// TODO: Object refs
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
} else if (obj is List<*>) {
if (obj is List<*>) {
if (obj.size > propertySerializers.size) throw NotSerializableException("Too many properties in described type $typeName")
val params = obj.zip(propertySerializers).map { it.second.readProperty(it.first, schema, input) }
return construct(params)
@ -66,6 +66,12 @@ open class ObjectSerializer(val clazz: Type, factory: SerializerFactory) : AMQPS
private fun generateProvides(): List<String> = interfaces.map { nameForType(it) }
fun construct(properties: List<Any?>) = javaConstructor?.newInstance(*properties.toTypedArray()) ?:
fun construct(properties: List<Any?>): Any {
logger.debug { "Calling constructor: '$javaConstructor' with properties '$properties'" }
return javaConstructor?.newInstance(*properties.toTypedArray()) ?:
throw NotSerializableException("Attempt to deserialize an interface: $clazz. Serialized form is invalid.")
}
}

View File

@ -6,6 +6,7 @@ import net.corda.core.crypto.toBase64
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.loggerFor
import org.apache.qpid.proton.amqp.DescribedType
import org.apache.qpid.proton.amqp.UnsignedInteger
import org.apache.qpid.proton.amqp.UnsignedLong
import org.apache.qpid.proton.codec.Data
import org.apache.qpid.proton.codec.DescribedTypeConstructor
@ -24,6 +25,21 @@ val DESCRIPTOR_DOMAIN: String = "net.corda"
// "corda" + majorVersionByte + minorVersionMSB + minorVersionLSB
val AmqpHeaderV1_0: OpaqueBytes = OpaqueBytes("corda\u0001\u0000\u0000".toByteArray())
private enum class DescriptorRegistry(val id: Long) {
ENVELOPE(1),
SCHEMA(2),
OBJECT_DESCRIPTOR(3),
FIELD(4),
COMPOSITE_TYPE(5),
RESTRICTED_TYPE(6),
CHOICE(7),
REFERENCED_OBJECT(8),
;
val amqpDescriptor = UnsignedLong(id or DESCRIPTOR_TOP_32BITS)
}
/**
* This class wraps all serialized data, so that the schema can be carried along with it. We will provide various internal utilities
* to decompose and recompose with/without schema etc so that e.g. we can store objects with a (relationally) normalised out schema to
@ -32,7 +48,7 @@ val AmqpHeaderV1_0: OpaqueBytes = OpaqueBytes("corda\u0001\u0000\u0000".toByteAr
// TODO: make the schema parsing lazy since mostly schemas will have been seen before and we only need it if we don't recognise a type descriptor.
data class Envelope(val obj: Any?, val schema: Schema) : DescribedType {
companion object : DescribedTypeConstructor<Envelope> {
val DESCRIPTOR = UnsignedLong(1L or DESCRIPTOR_TOP_32BITS)
val DESCRIPTOR = DescriptorRegistry.ENVELOPE.amqpDescriptor
val DESCRIPTOR_OBJECT = Descriptor(null, DESCRIPTOR)
fun get(data: Data): Envelope {
@ -63,7 +79,7 @@ data class Envelope(val obj: Any?, val schema: Schema) : DescribedType {
*/
data class Schema(val types: List<TypeNotation>) : DescribedType {
companion object : DescribedTypeConstructor<Schema> {
val DESCRIPTOR = UnsignedLong(2L or DESCRIPTOR_TOP_32BITS)
val DESCRIPTOR = DescriptorRegistry.SCHEMA.amqpDescriptor
fun get(obj: Any): Schema {
val describedType = obj as DescribedType
@ -92,7 +108,7 @@ data class Schema(val types: List<TypeNotation>) : DescribedType {
data class Descriptor(val name: String?, val code: UnsignedLong? = null) : DescribedType {
companion object : DescribedTypeConstructor<Descriptor> {
val DESCRIPTOR = UnsignedLong(3L or DESCRIPTOR_TOP_32BITS)
val DESCRIPTOR = DescriptorRegistry.OBJECT_DESCRIPTOR.amqpDescriptor
fun get(obj: Any): Descriptor {
val describedType = obj as DescribedType
@ -130,7 +146,7 @@ data class Descriptor(val name: String?, val code: UnsignedLong? = null) : Descr
data class Field(val name: String, val type: String, val requires: List<String>, val default: String?, val label: String?, val mandatory: Boolean, val multiple: Boolean) : DescribedType {
companion object : DescribedTypeConstructor<Field> {
val DESCRIPTOR = UnsignedLong(4L or DESCRIPTOR_TOP_32BITS)
val DESCRIPTOR = DescriptorRegistry.FIELD.amqpDescriptor
fun get(obj: Any): Field {
val describedType = obj as DescribedType
@ -193,7 +209,7 @@ sealed class TypeNotation : DescribedType {
data class CompositeType(override val name: String, override val label: String?, override val provides: List<String>, override val descriptor: Descriptor, val fields: List<Field>) : TypeNotation() {
companion object : DescribedTypeConstructor<CompositeType> {
val DESCRIPTOR = UnsignedLong(5L or DESCRIPTOR_TOP_32BITS)
val DESCRIPTOR = DescriptorRegistry.COMPOSITE_TYPE.amqpDescriptor
fun get(describedType: DescribedType): CompositeType {
if (describedType.descriptor != DESCRIPTOR) {
@ -238,7 +254,7 @@ data class CompositeType(override val name: String, override val label: String?,
data class RestrictedType(override val name: String, override val label: String?, override val provides: List<String>, val source: String, override val descriptor: Descriptor, val choices: List<Choice>) : TypeNotation() {
companion object : DescribedTypeConstructor<RestrictedType> {
val DESCRIPTOR = UnsignedLong(6L or DESCRIPTOR_TOP_32BITS)
val DESCRIPTOR = DescriptorRegistry.RESTRICTED_TYPE.amqpDescriptor
fun get(describedType: DescribedType): RestrictedType {
if (describedType.descriptor != DESCRIPTOR) {
@ -281,7 +297,7 @@ data class RestrictedType(override val name: String, override val label: String?
data class Choice(val name: String, val value: String) : DescribedType {
companion object : DescribedTypeConstructor<Choice> {
val DESCRIPTOR = UnsignedLong(7L or DESCRIPTOR_TOP_32BITS)
val DESCRIPTOR = DescriptorRegistry.CHOICE.amqpDescriptor
fun get(obj: Any): Choice {
val describedType = obj as DescribedType
@ -308,6 +324,33 @@ data class Choice(val name: String, val value: String) : DescribedType {
}
}
data class ReferencedObject(private val refCounter: Int) : DescribedType {
companion object : DescribedTypeConstructor<ReferencedObject> {
val DESCRIPTOR = DescriptorRegistry.REFERENCED_OBJECT.amqpDescriptor
fun get(obj: Any): ReferencedObject {
val describedType = obj as DescribedType
if (describedType.descriptor != DESCRIPTOR) {
throw NotSerializableException("Unexpected descriptor ${describedType.descriptor}.")
}
return newInstance(describedType.described)
}
override fun getTypeClass(): Class<*> = ReferencedObject::class.java
override fun newInstance(described: Any?): ReferencedObject {
val unInt = described as? UnsignedInteger ?: throw IllegalStateException("Was expecting an UnsignedInteger")
return ReferencedObject(unInt.toInt())
}
}
override fun getDescriptor(): Any = DESCRIPTOR
override fun getDescribed(): UnsignedInteger = UnsignedInteger(refCounter)
override fun toString(): String = "<refObject refCounter=$refCounter/>"
}
private val ARRAY_HASH: String = "Array = true"
private val ALREADY_SEEN_HASH: String = "Already seen = true"
private val NULLABLE_HASH: String = "Nullable = true"

View File

@ -155,6 +155,19 @@ fun Data.withList(block: Data.() -> Unit) {
exit() // exit list
}
/**
* Extension helper for outputting reference to already observed object
*/
fun Data.writeReferencedObject(refObject: ReferencedObject) {
// Write described
putDescribed()
enter()
// Write descriptor
putObject(refObject.descriptor)
putUnsignedInteger(refObject.described)
exit() // exit described
}
private fun resolveTypeVariables(actualType: Type, contextType: Type?): Type {
val resolvedType = if (contextType != null) TypeToken.of(contextType).resolveType(actualType).type else actualType
// TODO: surely we check it is concrete at this point with no TypeVariables

View File

@ -16,15 +16,14 @@ import kotlin.collections.LinkedHashSet
* instances and threads.
*/
open class SerializationOutput(internal val serializerFactory: SerializerFactory) {
// TODO: we're not supporting object refs yet
private val objectHistory: MutableMap<Any, Int> = IdentityHashMap()
private val serializerHistory: MutableSet<AMQPSerializer<*>> = LinkedHashSet()
internal val schemaHistory: MutableSet<TypeNotation> = LinkedHashSet()
/**
* Serialize the given object to AMQP, wrapped in our [Envelope] wrapper which carries an AMQP 1.0 schema, and prefixed
* with a header to indicate that this is serialized with AMQP and not [Kryo], and what version of the Corda implementation
* with a header to indicate that this is serialized with AMQP and not Kryo, and what version of the Corda implementation
* of AMQP serialization constructed the serialized form.
*/
@Throws(NotSerializableException::class)
@ -81,7 +80,20 @@ open class SerializationOutput(internal val serializerFactory: SerializerFactory
serializerHistory.add(serializer)
serializer.writeClassInfo(this)
}
val retrievedRefCount = objectHistory[obj]
if(retrievedRefCount == null) {
serializer.writeObject(obj, data, type, this)
// Important to do it after serialization such that dependent object will have preceding reference numbers
// assigned to them first as they will be first read from the stream on receiving end.
// Skip for primitive types as they are too small and overhead of referencing them will be much higher than their content
if(type is Class<*> && !type.isPrimitive) {
objectHistory.put(obj, objectHistory.size)
}
}
else {
data.writeReferencedObject(ReferencedObject(retrievedRefCount))
}
}
open internal fun writeTypeNotations(vararg typeNotation: TypeNotation): Boolean {

View File

@ -22,6 +22,7 @@ import org.apache.qpid.proton.amqp.*
import org.apache.qpid.proton.codec.DecoderImpl
import org.apache.qpid.proton.codec.EncoderImpl
import org.junit.Ignore
import org.junit.Assert.assertSame
import org.junit.Test
import java.io.IOException
import java.io.NotSerializableException
@ -29,9 +30,7 @@ import java.math.BigDecimal
import java.nio.ByteBuffer
import java.time.*
import java.time.temporal.ChronoUnit
import java.time.temporal.TemporalUnit
import java.util.*
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
@ -140,13 +139,13 @@ class SerializationOutputTests {
data class PolymorphicProperty(val foo: FooInterface?)
private fun serdes(obj: Any,
private inline fun<reified T : Any> serdes(obj: T,
factory: SerializerFactory = SerializerFactory (
AllWhitelist, ClassLoader.getSystemClassLoader()),
freshDeserializationFactory: SerializerFactory = SerializerFactory(
AllWhitelist, ClassLoader.getSystemClassLoader()),
expectedEqual: Boolean = true,
expectDeserializedEqual: Boolean = true): Any {
expectDeserializedEqual: Boolean = true): T {
val ser = SerializationOutput(factory)
val bytes = ser.serialize(obj)
@ -158,6 +157,7 @@ class SerializationOutputTests {
this.register(CompositeType.DESCRIPTOR, CompositeType.Companion)
this.register(Choice.DESCRIPTOR, Choice.Companion)
this.register(RestrictedType.DESCRIPTOR, RestrictedType.Companion)
this.register(ReferencedObject.DESCRIPTOR, ReferencedObject.Companion)
}
EncoderImpl(decoder)
decoder.setByteBuffer(ByteBuffer.wrap(bytes.bytes, 8, bytes.size - 8))
@ -436,7 +436,7 @@ class SerializationOutputTests {
throw IllegalStateException("Layer 2", t)
}
} catch(t: Throwable) {
val desThrowable = serdes(t, factory, factory2, false) as Throwable
val desThrowable = serdes(t, factory, factory2, false)
assertSerializedThrowableEquivalent(t, desThrowable)
}
}
@ -468,7 +468,7 @@ class SerializationOutputTests {
throw e
}
} catch(t: Throwable) {
val desThrowable = serdes(t, factory, factory2, false) as Throwable
val desThrowable = serdes(t, factory, factory2, false)
assertSerializedThrowableEquivalent(t, desThrowable)
}
}
@ -538,7 +538,6 @@ class SerializationOutputTests {
AbstractAMQPSerializationScheme.registerCustomSerializers(factory2)
val desState = serdes(state, factory, factory2, expectedEqual = false, expectDeserializedEqual = false)
assertTrue(desState is TransactionState<*>)
assertTrue((desState as TransactionState<*>).data is FooState)
assertTrue(desState.notary == state.notary)
assertTrue(desState.encumbrance == state.encumbrance)
@ -763,4 +762,36 @@ class SerializationOutputTests {
val obj = StateRef(SecureHash.randomSHA256(), 0)
serdes(obj, factory, factory2)
}
interface Container
data class SimpleContainer(val one: String, val another: String) : Container
data class ParentContainer(val left: SimpleContainer, val right: Container)
@Test
fun `test object referenced multiple times`() {
val simple = SimpleContainer("Fred", "Ginger")
val parentContainer = ParentContainer(simple, simple)
assertSame(parentContainer.left, parentContainer.right)
val parentCopy = serdes(parentContainer)
assertSame(parentCopy.left, parentCopy.right)
}
data class TestNode(val content: String, val children: MutableCollection<TestNode> = ArrayList())
@Test
@Ignore("Ignored due to cyclic graphs not currently supported by AMQP serialization")
fun `test serialization of cyclic graph`() {
val nodeA = TestNode("A")
val nodeB = TestNode("B", ArrayList(Arrays.asList(nodeA)))
nodeA.children.add(nodeB)
// Also blows with StackOverflow error
assertTrue(nodeB.hashCode() > 0)
val bCopy = serdes(nodeB)
assertEquals("A", bCopy.children.single().content)
}
}