diff --git a/.gitignore b/.gitignore
index 270ec992fe..0cbd0c082a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -33,6 +33,9 @@ lib/dokka.jar
.idea/shelf
.idea/dataSources
+# Include the -parameters compiler option by default in IntelliJ required for serialization.
+!.idea/compiler.xml
+
# if you remove the above rule, at least ignore the following:
# User-specific stuff:
diff --git a/.idea/compiler.xml b/.idea/compiler.xml
new file mode 100644
index 0000000000..4594ed2b96
--- /dev/null
+++ b/.idea/compiler.xml
@@ -0,0 +1,91 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/build.gradle b/build.gradle
index 683b88a17d..9e842dd05a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -106,7 +106,7 @@ allprojects {
}
tasks.withType(JavaCompile) {
- options.compilerArgs << "-Xlint:unchecked" << "-Xlint:deprecation" << "-Xlint:-options"
+ options.compilerArgs << "-Xlint:unchecked" << "-Xlint:deprecation" << "-Xlint:-options" << "-parameters"
}
tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).all {
diff --git a/core/build.gradle b/core/build.gradle
index e504b99e82..0e24a0eb52 100644
--- a/core/build.gradle
+++ b/core/build.gradle
@@ -74,6 +74,9 @@ dependencies {
// Requery: SQL based query & persistence for Kotlin
compile "io.requery:requery-kotlin:$requery_version"
+
+ // For AMQP serialisation.
+ compile "org.apache.qpid:proton-j:0.18.0"
}
configurations {
diff --git a/core/src/main/kotlin/net/corda/core/Utils.kt b/core/src/main/kotlin/net/corda/core/Utils.kt
index 2039551e0f..d9fdbbcf6f 100644
--- a/core/src/main/kotlin/net/corda/core/Utils.kt
+++ b/core/src/main/kotlin/net/corda/core/Utils.kt
@@ -3,7 +3,6 @@
package net.corda.core
-import com.google.common.base.Function
import com.google.common.base.Throwables
import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.*
@@ -24,6 +23,7 @@ import java.nio.file.*
import java.nio.file.attribute.FileAttribute
import java.time.Duration
import java.time.temporal.Temporal
+import java.util.HashMap
import java.util.concurrent.*
import java.util.concurrent.locks.ReentrantLock
import java.util.function.BiConsumer
@@ -32,6 +32,13 @@ import java.util.zip.Deflater
import java.util.zip.ZipEntry
import java.util.zip.ZipInputStream
import java.util.zip.ZipOutputStream
+import kotlin.collections.Iterable
+import kotlin.collections.LinkedHashMap
+import kotlin.collections.List
+import kotlin.collections.filter
+import kotlin.collections.firstOrNull
+import kotlin.collections.fold
+import kotlin.collections.forEach
import kotlin.concurrent.withLock
import kotlin.reflect.KProperty
@@ -452,3 +459,9 @@ fun codePointsString(vararg codePoints: Int): String {
codePoints.forEach { builder.append(Character.toChars(it)) }
return builder.toString()
}
+
+fun Class.checkNotUnorderedHashMap() {
+ if (HashMap::class.java.isAssignableFrom(this) && !LinkedHashMap::class.java.isAssignableFrom(this)) {
+ throw NotSerializableException("Map type $this is unstable under iteration. Suggested fix: use LinkedHashMap instead.")
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogicRef.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogicRef.kt
index 208f2a8131..c8cdd4c171 100644
--- a/core/src/main/kotlin/net/corda/core/flows/FlowLogicRef.kt
+++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogicRef.kt
@@ -9,7 +9,6 @@ import java.lang.reflect.Type
import java.util.*
import kotlin.reflect.KFunction
import kotlin.reflect.KParameter
-import kotlin.reflect.full.primaryConstructor
import kotlin.reflect.jvm.javaConstructor
import kotlin.reflect.jvm.javaType
@@ -64,18 +63,11 @@ class FlowLogicRefFactory(val flowWhitelist: Map>) : Singlet
}
/**
- * Create a [FlowLogicRef] by matching against the available constructors and the given args.
+ * Create a [FlowLogicRef] by assuming a single constructor and the given args.
*/
fun create(type: Class>, vararg args: Any?): FlowLogicRef {
- // If it's not a Kotlin class, do the Java path.
- if (type.kotlin.primaryConstructor == null)
- return createJava(type, *args)
-
- // Find the right constructor to use, based on passed argument types. This is for when we don't know
- // the right argument names.
- //
// TODO: This is used via RPC but it's probably better if we pass in argument names and values explicitly
- // to avoid guessing which constructor too use.
+ // to avoid requiring only a single constructor.
val argTypes = args.map { it?.javaClass }
val constructor = try {
type.kotlin.constructors.single { ctor ->
diff --git a/core/src/main/kotlin/net/corda/core/serialization/amqp/AMQPPrimitiveSerializer.kt b/core/src/main/kotlin/net/corda/core/serialization/amqp/AMQPPrimitiveSerializer.kt
new file mode 100644
index 0000000000..2935b19cb9
--- /dev/null
+++ b/core/src/main/kotlin/net/corda/core/serialization/amqp/AMQPPrimitiveSerializer.kt
@@ -0,0 +1,23 @@
+package net.corda.core.serialization.amqp
+
+import com.google.common.primitives.Primitives
+import org.apache.qpid.proton.codec.Data
+import java.lang.reflect.Type
+
+/**
+ * Serializer / deserializer for native AMQP types (Int, Float, String etc).
+ */
+class AMQPPrimitiveSerializer(clazz: Class<*>) : AMQPSerializer {
+ override val typeDescriptor: String = SerializerFactory.primitiveTypeName(Primitives.wrap(clazz))!!
+ override val type: Type = clazz
+
+ // NOOP since this is a primitive type.
+ override fun writeClassInfo(output: SerializationOutput) {
+ }
+
+ override fun writeObject(obj: Any, data: Data, type: Type, output: SerializationOutput) {
+ data.putObject(obj)
+ }
+
+ override fun readObject(obj: Any, envelope: Envelope, input: DeserializationInput): Any = obj
+}
\ No newline at end of file
diff --git a/core/src/main/kotlin/net/corda/core/serialization/amqp/AMQPSerializer.kt b/core/src/main/kotlin/net/corda/core/serialization/amqp/AMQPSerializer.kt
new file mode 100644
index 0000000000..20465bb9cb
--- /dev/null
+++ b/core/src/main/kotlin/net/corda/core/serialization/amqp/AMQPSerializer.kt
@@ -0,0 +1,38 @@
+package net.corda.core.serialization.amqp
+
+import org.apache.qpid.proton.codec.Data
+import java.lang.reflect.Type
+
+/**
+ * Implemented to serialize and deserialize different types of objects to/from AMQP.
+ */
+interface AMQPSerializer {
+ /**
+ * The JVM type this can serialize and deserialize.
+ */
+ val type: Type
+
+ /**
+ * Textual unique representation of the JVM type this represents. Will be encoded into the AMQP stream and
+ * will appear in the schema.
+ *
+ * This should be unique enough that we can use one global cache of [AMQPSerializer]s and use this as the look up key.
+ */
+ val typeDescriptor: String
+
+ /**
+ * Add anything required to the AMQP schema via [SerializationOutput.writeTypeNotations] and any dependent serializers
+ * via [SerializationOutput.requireSerializer]. e.g. for the elements of an array.
+ */
+ fun writeClassInfo(output: SerializationOutput)
+
+ /**
+ * Write the given object, with declared type, to the output.
+ */
+ fun writeObject(obj: Any, data: Data, type: Type, output: SerializationOutput)
+
+ /**
+ * Read the given object from the input. The envelope is provided in case the schema is required.
+ */
+ fun readObject(obj: Any, envelope: Envelope, input: DeserializationInput): Any
+}
\ No newline at end of file
diff --git a/core/src/main/kotlin/net/corda/core/serialization/amqp/ArraySerializer.kt b/core/src/main/kotlin/net/corda/core/serialization/amqp/ArraySerializer.kt
new file mode 100644
index 0000000000..2b1c6f5c55
--- /dev/null
+++ b/core/src/main/kotlin/net/corda/core/serialization/amqp/ArraySerializer.kt
@@ -0,0 +1,62 @@
+package net.corda.core.serialization.amqp
+
+import org.apache.qpid.proton.codec.Data
+import java.io.NotSerializableException
+import java.lang.reflect.GenericArrayType
+import java.lang.reflect.ParameterizedType
+import java.lang.reflect.Type
+
+/**
+ * Serialization / deserialization of arrays.
+ */
+class ArraySerializer(override val type: Type) : AMQPSerializer {
+ private val typeName = type.typeName
+
+ override val typeDescriptor = "$DESCRIPTOR_DOMAIN:${fingerprintForType(type)}"
+
+ private val elementType: Type = makeElementType()
+
+ private val typeNotation: TypeNotation = RestrictedType(typeName, null, emptyList(), "list", Descriptor(typeDescriptor, null), emptyList())
+
+ private fun makeElementType(): Type {
+ return (type as? Class<*>)?.componentType ?: (type as GenericArrayType).genericComponentType
+ }
+
+ override fun writeClassInfo(output: SerializationOutput) {
+ if (output.writeTypeNotations(typeNotation)) {
+ output.requireSerializer(elementType)
+ }
+ }
+
+ override fun writeObject(obj: Any, data: Data, type: Type, output: SerializationOutput) {
+ // Write described
+ data.withDescribed(typeNotation.descriptor) {
+ withList {
+ for (entry in obj as Array<*>) {
+ output.writeObjectOrNull(entry, this, elementType)
+ }
+ }
+ }
+ }
+
+ override fun readObject(obj: Any, envelope: Envelope, input: DeserializationInput): Any {
+ return (obj as List<*>).map { input.readObjectOrNull(it, envelope, elementType) }.toArrayOfType(elementType)
+ }
+
+ private fun List.toArrayOfType(type: Type): Any {
+ val elementType: Class<*> = if (type is Class<*>) {
+ type
+ } else if (type is ParameterizedType) {
+ type.rawType as Class<*>
+ } else {
+ throw NotSerializableException("Unexpected array element type $type")
+ }
+ val list = this
+ return java.lang.reflect.Array.newInstance(elementType, this.size).apply {
+ val array = this
+ for (i in 0..lastIndex) {
+ java.lang.reflect.Array.set(array, i, list[i])
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/kotlin/net/corda/core/serialization/amqp/CollectionSerializer.kt b/core/src/main/kotlin/net/corda/core/serialization/amqp/CollectionSerializer.kt
new file mode 100644
index 0000000000..3e2d74002c
--- /dev/null
+++ b/core/src/main/kotlin/net/corda/core/serialization/amqp/CollectionSerializer.kt
@@ -0,0 +1,59 @@
+package net.corda.core.serialization.amqp
+
+import org.apache.qpid.proton.codec.Data
+import java.io.NotSerializableException
+import java.lang.reflect.ParameterizedType
+import java.lang.reflect.Type
+import java.util.*
+import kotlin.collections.Collection
+import kotlin.collections.LinkedHashSet
+import kotlin.collections.Set
+
+/**
+ * Serialization / deserialization of predefined set of supported [Collection] types covering mostly [List]s and [Set]s.
+ */
+class CollectionSerializer(val declaredType: ParameterizedType) : AMQPSerializer {
+ override val type: Type = declaredType as? DeserializedParameterizedType ?: DeserializedParameterizedType.make(declaredType.toString())
+ private val typeName = declaredType.toString()
+ override val typeDescriptor = "$DESCRIPTOR_DOMAIN:${fingerprintForType(type)}"
+
+ companion object {
+ private val supportedTypes: Map>, (Collection<*>) -> Collection<*>> = mapOf(
+ Collection::class.java to { coll -> coll },
+ List::class.java to { coll -> coll },
+ Set::class.java to { coll -> Collections.unmodifiableSet(LinkedHashSet(coll)) },
+ SortedSet::class.java to { coll -> Collections.unmodifiableSortedSet(TreeSet(coll)) },
+ NavigableSet::class.java to { coll -> Collections.unmodifiableNavigableSet(TreeSet(coll)) }
+ )
+ }
+
+ private val concreteBuilder: (Collection<*>) -> Collection<*> = findConcreteType(declaredType.rawType as Class<*>)
+
+ private fun findConcreteType(clazz: Class<*>): (Collection<*>) -> Collection<*> {
+ return supportedTypes[clazz] ?: throw NotSerializableException("Unsupported map type $clazz.")
+ }
+
+ private val typeNotation: TypeNotation = RestrictedType(typeName, null, emptyList(), "list", Descriptor(typeDescriptor, null), emptyList())
+
+ override fun writeClassInfo(output: SerializationOutput) {
+ if (output.writeTypeNotations(typeNotation)) {
+ output.requireSerializer(declaredType.actualTypeArguments[0])
+ }
+ }
+
+ override fun writeObject(obj: Any, data: Data, type: Type, output: SerializationOutput) {
+ // Write described
+ data.withDescribed(typeNotation.descriptor) {
+ withList {
+ for (entry in obj as Collection<*>) {
+ output.writeObjectOrNull(entry, this, declaredType.actualTypeArguments[0])
+ }
+ }
+ }
+ }
+
+ override fun readObject(obj: Any, envelope: Envelope, input: DeserializationInput): Any {
+ // TODO: Can we verify the entries in the list?
+ return concreteBuilder((obj as List<*>).map { input.readObjectOrNull(it, envelope, declaredType.actualTypeArguments[0]) })
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/kotlin/net/corda/core/serialization/amqp/DeserializationInput.kt b/core/src/main/kotlin/net/corda/core/serialization/amqp/DeserializationInput.kt
new file mode 100644
index 0000000000..b47d75b8bc
--- /dev/null
+++ b/core/src/main/kotlin/net/corda/core/serialization/amqp/DeserializationInput.kt
@@ -0,0 +1,89 @@
+package net.corda.core.serialization.amqp
+
+import com.google.common.base.Throwables
+import net.corda.core.serialization.SerializedBytes
+import org.apache.qpid.proton.amqp.DescribedType
+import org.apache.qpid.proton.codec.Data
+import java.io.NotSerializableException
+import java.lang.reflect.Type
+import java.nio.ByteBuffer
+import java.util.*
+
+/**
+ * Main entry point for deserializing an AMQP encoded object.
+ *
+ * @param serializerFactory This is the factory for [AMQPSerializer] instances and can be shared across multiple
+ * instances and threads.
+ */
+class DeserializationInput(private val serializerFactory: SerializerFactory = SerializerFactory()) {
+ // TODO: we're not supporting object refs yet
+ private val objectHistory: MutableList = ArrayList()
+
+ @Throws(NotSerializableException::class)
+ inline fun deserialize(bytes: SerializedBytes): T = deserialize(bytes, T::class.java)
+
+ /**
+ * This is the main entry point for deserialization of AMQP payloads, and expects a byte sequence involving a header
+ * indicating what version of Corda serialization was used, followed by an [Envelope] which carries the object to
+ * be deserialized and a schema describing the types of the objects.
+ */
+ @Throws(NotSerializableException::class)
+ fun deserialize(bytes: SerializedBytes, clazz: Class): T {
+ try {
+ // Check that the lead bytes match expected header
+ if (!subArraysEqual(bytes.bytes, 0, 8, AmqpHeaderV1_0.bytes, 0)) {
+ throw NotSerializableException("Serialization header does not match.")
+ }
+ val data = Data.Factory.create()
+ val size = data.decode(ByteBuffer.wrap(bytes.bytes, 8, bytes.size - 8))
+ if (size.toInt() != bytes.size - 8) {
+ throw NotSerializableException("Unexpected size of data")
+ }
+ val envelope = Envelope.get(data)
+ // Now pick out the obj and schema from the envelope.
+ return clazz.cast(readObjectOrNull(envelope.obj, envelope, clazz))
+ } catch(nse: NotSerializableException) {
+ throw nse
+ } catch(t: Throwable) {
+ throw NotSerializableException("Unexpected throwable: ${t.message} ${Throwables.getStackTraceAsString(t)}")
+ } finally {
+ objectHistory.clear()
+ }
+ }
+
+ internal fun readObjectOrNull(obj: Any?, envelope: Envelope, type: Type): Any? {
+ if (obj == null) {
+ return null
+ } else {
+ return readObject(obj, envelope, type)
+ }
+ }
+
+ internal fun readObject(obj: Any, envelope: Envelope, type: Type): Any {
+ if (obj is DescribedType) {
+ // Look up serializer in factory by descriptor
+ val serializer = serializerFactory.get(obj.descriptor, envelope)
+ if (serializer.type != type && !serializer.type.isSubClassOf(type)) throw NotSerializableException("Described type with descriptor ${obj.descriptor} was expected to be of type $type")
+ return serializer.readObject(obj.described, envelope, this)
+ } else {
+ return obj
+ }
+ }
+
+ private fun Type.isSubClassOf(type: Type): Boolean {
+ return type == Object::class.java ||
+ (this is Class<*> && type is Class<*> && type.isAssignableFrom(this)) ||
+ (this is DeserializedParameterizedType && type is Class<*> && this.rawType == type && this.isFullyWildcarded)
+ }
+
+ private fun subArraysEqual(a: ByteArray, aOffset: Int, length: Int, b: ByteArray, bOffset: Int): Boolean {
+ if (aOffset + length > a.size || bOffset + length > b.size) throw IndexOutOfBoundsException()
+ var bytesRemaining = length
+ var aPos = aOffset
+ var bPos = bOffset
+ while (bytesRemaining-- > 0) {
+ if (a[aPos++] != b[bPos++]) return false
+ }
+ return true
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/kotlin/net/corda/core/serialization/amqp/DeserializedGenericArrayType.kt b/core/src/main/kotlin/net/corda/core/serialization/amqp/DeserializedGenericArrayType.kt
new file mode 100644
index 0000000000..5183c00954
--- /dev/null
+++ b/core/src/main/kotlin/net/corda/core/serialization/amqp/DeserializedGenericArrayType.kt
@@ -0,0 +1,18 @@
+package net.corda.core.serialization.amqp
+
+import java.lang.reflect.GenericArrayType
+import java.lang.reflect.Type
+import java.util.*
+
+/**
+ * Implementation of [GenericArrayType] that we can actually construct.
+ */
+class DeserializedGenericArrayType(private val componentType: Type) : GenericArrayType {
+ override fun getGenericComponentType(): Type = componentType
+ override fun getTypeName(): String = "${componentType.typeName}[]"
+ override fun toString(): String = typeName
+ override fun hashCode(): Int = Objects.hashCode(componentType)
+ override fun equals(other: Any?): Boolean {
+ return other is GenericArrayType && componentType.equals(other.genericComponentType)
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/kotlin/net/corda/core/serialization/amqp/DeserializedParameterizedType.kt b/core/src/main/kotlin/net/corda/core/serialization/amqp/DeserializedParameterizedType.kt
new file mode 100644
index 0000000000..2cd0ae1298
--- /dev/null
+++ b/core/src/main/kotlin/net/corda/core/serialization/amqp/DeserializedParameterizedType.kt
@@ -0,0 +1,162 @@
+package net.corda.core.serialization.amqp
+
+import java.io.NotSerializableException
+import java.lang.reflect.ParameterizedType
+import java.lang.reflect.Type
+import java.lang.reflect.TypeVariable
+import java.util.*
+
+/**
+ * Implementation of [ParameterizedType] that we can actually construct, and a parser from the string representation
+ * of the JDK implementation which we use as the textual format in the AMQP schema.
+ */
+class DeserializedParameterizedType(private val rawType: Class<*>, private val params: Array, private val ownerType: Type? = null) : ParameterizedType {
+ init {
+ if (params.isEmpty()) {
+ throw NotSerializableException("Must be at least one parameter type in a ParameterizedType")
+ }
+ if (params.size != rawType.typeParameters.size) {
+ throw NotSerializableException("Expected ${rawType.typeParameters.size} for ${rawType.name} but found ${params.size}")
+ }
+ // We do not check bounds. Both our use cases (Collection and Map) are not bounded.
+ if (rawType.typeParameters.any { boundedType(it) }) throw NotSerializableException("Bounded types in ParameterizedTypes not supported, but found a bound in $rawType")
+ }
+
+ private fun boundedType(type: TypeVariable>): Boolean {
+ return !(type.bounds.size == 1 && type.bounds[0] == Object::class.java)
+ }
+
+ val isFullyWildcarded: Boolean = params.all { it == SerializerFactory.AnyType }
+
+ private val _typeName: String = makeTypeName()
+
+ private fun makeTypeName(): String {
+ return if (isFullyWildcarded) {
+ rawType.name
+ } else {
+ val paramsJoined = params.map { it.typeName }.joinToString(", ")
+ "${rawType.name}<$paramsJoined>"
+ }
+ }
+
+ companion object {
+ // Maximum depth/nesting of generics before we suspect some DoS attempt.
+ const val MAX_DEPTH: Int = 32
+
+ fun make(name: String, cl: ClassLoader = DeserializedParameterizedType::class.java.classLoader): Type {
+ val paramTypes = ArrayList()
+ val pos = parseTypeList("$name>", paramTypes, cl)
+ if (pos <= name.length) {
+ throw NotSerializableException("Malformed string form of ParameterizedType. Unexpected '>' at character position $pos of $name.")
+ }
+ if (paramTypes.size != 1) {
+ throw NotSerializableException("Expected only one type, but got $paramTypes")
+ }
+ return paramTypes[0]
+ }
+
+ private fun parseTypeList(params: String, types: MutableList, cl: ClassLoader, depth: Int = 0): Int {
+ var pos = 0
+ var typeStart = 0
+ var needAType = true
+ var skippingWhitespace = false
+ while (pos < params.length) {
+ if (params[pos] == '<') {
+ val typeEnd = pos++
+ val paramTypes = ArrayList()
+ pos = parseTypeParams(params, pos, paramTypes, cl, depth + 1)
+ types += makeParameterizedType(params.substring(typeStart, typeEnd).trim(), paramTypes, cl)
+ typeStart = pos
+ needAType = false
+ } else if (params[pos] == ',') {
+ val typeEnd = pos++
+ val typeName = params.substring(typeStart, typeEnd).trim()
+ if (!typeName.isEmpty()) {
+ types += makeType(typeName, cl)
+ } else if (needAType) {
+ throw NotSerializableException("Expected a type, not ','")
+ }
+ typeStart = pos
+ needAType = true
+ } else if (params[pos] == '>') {
+ val typeEnd = pos++
+ val typeName = params.substring(typeStart, typeEnd).trim()
+ if (!typeName.isEmpty()) {
+ types += makeType(typeName, cl)
+ } else if (needAType) {
+ throw NotSerializableException("Expected a type, not '>'")
+ }
+ return pos
+ } else {
+ // Skip forwards, checking character types
+ if (pos == typeStart) {
+ skippingWhitespace = false
+ if (params[pos].isWhitespace()) {
+ typeStart = pos++
+ } else if (!needAType) {
+ throw NotSerializableException("Not expecting a type")
+ } else if (params[pos] == '*') {
+ pos++
+ } else if (!params[pos].isJavaIdentifierStart()) {
+ throw NotSerializableException("Invalid character at start of type: ${params[pos]}")
+ } else {
+ pos++
+ }
+ } else {
+ if (params[pos].isWhitespace()) {
+ pos++
+ skippingWhitespace = true
+ } else if (!skippingWhitespace && (params[pos] == '.' || params[pos].isJavaIdentifierPart())) {
+ pos++
+ } else {
+ throw NotSerializableException("Invalid character in middle of type: ${params[pos]}")
+ }
+ }
+ }
+ }
+ throw NotSerializableException("Missing close generics '>'")
+ }
+
+ private fun makeType(typeName: String, cl: ClassLoader): Type {
+ // Not generic
+ return if (typeName == "*") SerializerFactory.AnyType else Class.forName(typeName, false, cl)
+ }
+
+ private fun makeParameterizedType(rawTypeName: String, args: MutableList, cl: ClassLoader): Type {
+ return DeserializedParameterizedType(makeType(rawTypeName, cl) as Class<*>, args.toTypedArray(), null)
+ }
+
+ private fun parseTypeParams(params: String, startPos: Int, paramTypes: MutableList, cl: ClassLoader, depth: Int): Int {
+ if (depth == MAX_DEPTH) {
+ throw NotSerializableException("Maximum depth of nested generics reached: $depth")
+ }
+ return startPos + parseTypeList(params.substring(startPos), paramTypes, cl, depth)
+ }
+ }
+
+ override fun getRawType(): Type = rawType
+
+ override fun getOwnerType(): Type? = ownerType
+
+ override fun getActualTypeArguments(): Array = params
+
+ override fun getTypeName(): String = _typeName
+
+ override fun toString(): String = _typeName
+
+ override fun hashCode(): Int {
+ return Arrays.hashCode(this.actualTypeArguments) xor Objects.hashCode(this.ownerType) xor Objects.hashCode(this.rawType)
+ }
+
+ override fun equals(other: Any?): Boolean {
+ if (other is ParameterizedType) {
+ if (this === other) {
+ return true
+ } else {
+ return this.ownerType == other.ownerType && this.rawType == other.rawType && Arrays.equals(this.actualTypeArguments, other.actualTypeArguments)
+ }
+ } else {
+ return false
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/kotlin/net/corda/core/serialization/amqp/MapSerializer.kt b/core/src/main/kotlin/net/corda/core/serialization/amqp/MapSerializer.kt
new file mode 100644
index 0000000000..2ea61c6598
--- /dev/null
+++ b/core/src/main/kotlin/net/corda/core/serialization/amqp/MapSerializer.kt
@@ -0,0 +1,66 @@
+package net.corda.core.serialization.amqp
+
+import net.corda.core.checkNotUnorderedHashMap
+import org.apache.qpid.proton.codec.Data
+import java.io.NotSerializableException
+import java.lang.reflect.ParameterizedType
+import java.lang.reflect.Type
+import java.util.*
+import kotlin.collections.Map
+import kotlin.collections.iterator
+import kotlin.collections.map
+
+/**
+ * Serialization / deserialization of certain supported [Map] types.
+ */
+class MapSerializer(val declaredType: ParameterizedType) : AMQPSerializer {
+ override val type: Type = declaredType as? DeserializedParameterizedType ?: DeserializedParameterizedType.make(declaredType.toString())
+ private val typeName = declaredType.toString()
+ override val typeDescriptor = "$DESCRIPTOR_DOMAIN:${fingerprintForType(type)}"
+
+ companion object {
+ private val supportedTypes: Map>, (Map<*, *>) -> Map<*, *>> = mapOf(
+ Map::class.java to { map -> Collections.unmodifiableMap(map) },
+ SortedMap::class.java to { map -> Collections.unmodifiableSortedMap(TreeMap(map)) },
+ NavigableMap::class.java to { map -> Collections.unmodifiableNavigableMap(TreeMap(map)) }
+ )
+ }
+
+ private val concreteBuilder: (Map<*, *>) -> Map<*, *> = findConcreteType(declaredType.rawType as Class<*>)
+
+ private fun findConcreteType(clazz: Class<*>): (Map<*, *>) -> Map<*, *> {
+ return supportedTypes[clazz] ?: throw NotSerializableException("Unsupported map type $clazz.")
+ }
+
+ private val typeNotation: TypeNotation = RestrictedType(typeName, null, emptyList(), "map", Descriptor(typeDescriptor, null), emptyList())
+
+ override fun writeClassInfo(output: SerializationOutput) {
+ if (output.writeTypeNotations(typeNotation)) {
+ output.requireSerializer(declaredType.actualTypeArguments[0])
+ output.requireSerializer(declaredType.actualTypeArguments[1])
+ }
+ }
+
+ override fun writeObject(obj: Any, data: Data, type: Type, output: SerializationOutput) {
+ obj.javaClass.checkNotUnorderedHashMap()
+ // Write described
+ data.withDescribed(typeNotation.descriptor) {
+ // Write map
+ data.putMap()
+ data.enter()
+ for (entry in obj as Map<*, *>) {
+ output.writeObjectOrNull(entry.key, data, declaredType.actualTypeArguments[0])
+ output.writeObjectOrNull(entry.value, data, declaredType.actualTypeArguments[1])
+ }
+ data.exit() // exit map
+ }
+ }
+
+ override fun readObject(obj: Any, envelope: Envelope, input: DeserializationInput): Any {
+ // TODO: General generics question. Do we need to validate that entries in Maps and Collections match the generic type? Is it a security hole?
+ val entries: Iterable> = (obj as Map<*, *>).map { readEntry(envelope, input, it) }
+ return concreteBuilder(entries.toMap())
+ }
+
+ private fun readEntry(envelope: Envelope, input: DeserializationInput, entry: Map.Entry) = input.readObjectOrNull(entry.key, envelope, declaredType.actualTypeArguments[0]) to input.readObjectOrNull(entry.value, envelope, declaredType.actualTypeArguments[1])
+}
\ No newline at end of file
diff --git a/core/src/main/kotlin/net/corda/core/serialization/amqp/ObjectSerializer.kt b/core/src/main/kotlin/net/corda/core/serialization/amqp/ObjectSerializer.kt
new file mode 100644
index 0000000000..2ccfad81d6
--- /dev/null
+++ b/core/src/main/kotlin/net/corda/core/serialization/amqp/ObjectSerializer.kt
@@ -0,0 +1,74 @@
+package net.corda.core.serialization.amqp
+
+import org.apache.qpid.proton.amqp.UnsignedInteger
+import org.apache.qpid.proton.codec.Data
+import java.io.NotSerializableException
+import java.lang.reflect.Constructor
+import java.lang.reflect.Type
+import kotlin.reflect.jvm.javaConstructor
+
+/**
+ * Responsible for serializing and deserializing a regular object instance via a series of properties (matched with a constructor).
+ */
+class ObjectSerializer(val clazz: Class<*>) : AMQPSerializer {
+ override val type: Type get() = clazz
+ private val javaConstructor: Constructor?
+ private val propertySerializers: Collection
+
+ init {
+ val kotlinConstructor = constructorForDeserialization(clazz)
+ javaConstructor = kotlinConstructor?.javaConstructor
+ propertySerializers = propertiesForSerialization(kotlinConstructor, clazz)
+ }
+ private val typeName = clazz.name
+ override val typeDescriptor = "$DESCRIPTOR_DOMAIN:${fingerprintForType(type)}"
+ private val interfaces = interfacesForSerialization(clazz) // TODO maybe this proves too much and we need annotations to restrict.
+
+ private val typeNotation: TypeNotation = CompositeType(typeName, null, generateProvides(), Descriptor(typeDescriptor, null), generateFields())
+
+ override fun writeClassInfo(output: SerializationOutput) {
+ output.writeTypeNotations(typeNotation)
+ for (iface in interfaces) {
+ output.requireSerializer(iface)
+ }
+ }
+
+ override fun writeObject(obj: Any, data: Data, type: Type, output: SerializationOutput) {
+ // Write described
+ data.withDescribed(typeNotation.descriptor) {
+ // Write list
+ withList {
+ for (property in propertySerializers) {
+ property.writeProperty(obj, this, output)
+ }
+ }
+ }
+ }
+
+ override fun readObject(obj: Any, envelope: Envelope, input: DeserializationInput): Any {
+ if (obj is UnsignedInteger) {
+ // TODO: Object refs
+ TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
+ } else if (obj is List<*>) {
+ if (obj.size > propertySerializers.size) throw NotSerializableException("Too many properties in described type $typeName")
+ val params = obj.zip(propertySerializers).map { it.second.readProperty(it.first, envelope, input) }
+ return construct(params)
+ } else throw NotSerializableException("Body of described type is unexpected $obj")
+ }
+
+ private fun generateFields(): List {
+ return propertySerializers.map { Field(it.name, it.type, it.requires, it.default, null, it.mandatory, false) }
+ }
+
+ private fun generateProvides(): List {
+ return interfaces.map { it.typeName }
+ }
+
+
+ fun construct(properties: List): Any {
+ if (javaConstructor == null) {
+ throw NotSerializableException("Attempt to deserialize an interface: $clazz. Serialized form is invalid.")
+ }
+ return javaConstructor.newInstance(*properties.toTypedArray())
+ }
+}
\ No newline at end of file
diff --git a/core/src/main/kotlin/net/corda/core/serialization/amqp/PropertySerializer.kt b/core/src/main/kotlin/net/corda/core/serialization/amqp/PropertySerializer.kt
new file mode 100644
index 0000000000..50cb6c5581
--- /dev/null
+++ b/core/src/main/kotlin/net/corda/core/serialization/amqp/PropertySerializer.kt
@@ -0,0 +1,93 @@
+package net.corda.core.serialization.amqp
+
+import org.apache.qpid.proton.codec.Data
+import java.lang.reflect.Method
+import kotlin.reflect.full.memberProperties
+import kotlin.reflect.jvm.javaGetter
+
+/**
+ * Base class for serialization of a property of an object.
+ */
+sealed class PropertySerializer(val name: String, val readMethod: Method) {
+ abstract fun writeProperty(obj: Any?, data: Data, output: SerializationOutput)
+ abstract fun readProperty(obj: Any?, envelope: Envelope, input: DeserializationInput): Any?
+
+ val type: String = generateType()
+ val requires: List = generateRequires()
+ val default: String? = generateDefault()
+ val mandatory: Boolean = generateMandatory()
+
+ private val isInterface: Boolean get() = (readMethod.genericReturnType as? Class<*>)?.isInterface ?: false
+ private val isJVMPrimitive: Boolean get() = (readMethod.genericReturnType as? Class<*>)?.isPrimitive ?: false
+
+ private fun generateType(): String {
+ return if (isInterface) "*" else {
+ val primitiveName = SerializerFactory.primitiveTypeName(readMethod.genericReturnType)
+ return primitiveName ?: readMethod.genericReturnType.typeName
+ }
+ }
+
+ private fun generateRequires(): List {
+ return if (isInterface) listOf(readMethod.genericReturnType.typeName) else emptyList()
+ }
+
+ private fun generateDefault(): String? {
+ if (isJVMPrimitive) {
+ return when (readMethod.genericReturnType) {
+ java.lang.Boolean.TYPE -> "false"
+ java.lang.Character.TYPE -> ""
+ else -> "0"
+ }
+ } else {
+ return null
+ }
+ }
+
+ private fun generateMandatory(): Boolean {
+ return isJVMPrimitive || !readMethod.returnsNullable()
+ }
+
+ private fun Method.returnsNullable(): Boolean {
+ val returnTypeString = this.declaringClass.kotlin.memberProperties.firstOrNull { it.javaGetter == this }?.returnType?.toString() ?: "?"
+ return returnTypeString.endsWith('?') || returnTypeString.endsWith('!')
+ }
+
+ companion object {
+ fun make(name: String, readMethod: Method): PropertySerializer {
+ val type = readMethod.genericReturnType
+ if (SerializerFactory.isPrimitive(type)) {
+ // This is a little inefficient for performance since it does a runtime check of type. We could do build time check with lots of subclasses here.
+ return AMQPPrimitivePropertySerializer(name, readMethod)
+ } else {
+ return DescribedTypePropertySerializer(name, readMethod)
+ }
+ }
+ }
+
+ /**
+ * A property serializer for a complex type (another object).
+ */
+ class DescribedTypePropertySerializer(name: String, readMethod: Method) : PropertySerializer(name, readMethod) {
+ override fun readProperty(obj: Any?, envelope: Envelope, input: DeserializationInput): Any? {
+ return input.readObjectOrNull(obj, envelope, readMethod.genericReturnType)
+ }
+
+ override fun writeProperty(obj: Any?, data: Data, output: SerializationOutput) {
+ output.writeObjectOrNull(readMethod.invoke(obj), data, readMethod.genericReturnType)
+ }
+ }
+
+ /**
+ * A property serializer for an AMQP primitive type (Int, String, etc).
+ */
+ class AMQPPrimitivePropertySerializer(name: String, readMethod: Method) : PropertySerializer(name, readMethod) {
+ override fun readProperty(obj: Any?, envelope: Envelope, input: DeserializationInput): Any? {
+ return obj
+ }
+
+ override fun writeProperty(obj: Any?, data: Data, output: SerializationOutput) {
+ data.putObject(readMethod.invoke(obj))
+ }
+ }
+}
+
diff --git a/core/src/main/kotlin/net/corda/core/serialization/amqp/Schema.kt b/core/src/main/kotlin/net/corda/core/serialization/amqp/Schema.kt
new file mode 100644
index 0000000000..64a28a7aae
--- /dev/null
+++ b/core/src/main/kotlin/net/corda/core/serialization/amqp/Schema.kt
@@ -0,0 +1,357 @@
+package net.corda.core.serialization.amqp
+
+import com.google.common.hash.Hasher
+import com.google.common.hash.Hashing
+import net.corda.core.crypto.Base58
+import net.corda.core.serialization.OpaqueBytes
+import org.apache.qpid.proton.amqp.DescribedType
+import org.apache.qpid.proton.amqp.UnsignedLong
+import org.apache.qpid.proton.codec.Data
+import org.apache.qpid.proton.codec.DescribedTypeConstructor
+import java.io.NotSerializableException
+import java.lang.reflect.GenericArrayType
+import java.lang.reflect.ParameterizedType
+import java.lang.reflect.Type
+
+// TODO: get an assigned number as per AMQP spec
+val DESCRIPTOR_TOP_32BITS: Long = 0xc0da0000
+
+val DESCRIPTOR_DOMAIN: String = "net.corda"
+
+// "corda" + majorVersionByte + minorVersionMSB + minorVersionLSB
+val AmqpHeaderV1_0: OpaqueBytes = OpaqueBytes("corda\u0001\u0000\u0000".toByteArray())
+
+/**
+ * This class wraps all serialized data, so that the schema can be carried along with it. We will provide various internal utilities
+ * to decompose and recompose with/without schema etc so that e.g. we can store objects with a (relationally) normalised out schema to
+ * avoid excessive duplication.
+ */
+// TODO: make the schema parsing lazy since mostly schemas will have been seen before and we only need it if we don't recognise a type descriptor.
+data class Envelope(val obj: Any?, val schema: Schema) : DescribedType {
+ companion object : DescribedTypeConstructor {
+ val DESCRIPTOR = UnsignedLong(1L or DESCRIPTOR_TOP_32BITS)
+ val DESCRIPTOR_OBJECT = Descriptor(null, DESCRIPTOR)
+
+ fun get(data: Data): Envelope {
+ val describedType = data.`object` as DescribedType
+ if (describedType.descriptor != DESCRIPTOR) {
+ throw NotSerializableException("Unexpected descriptor ${describedType.descriptor}.")
+ }
+ val list = describedType.described as List<*>
+ return newInstance(listOf(list[0], Schema.get(list[1]!!)))
+ }
+
+ override fun getTypeClass(): Class<*> = Envelope::class.java
+
+ override fun newInstance(described: Any?): Envelope {
+ val list = described as? List<*> ?: throw IllegalStateException("Was expecting a list")
+ return Envelope(list[0], list[1] as Schema)
+ }
+ }
+
+ override fun getDescriptor(): Any = DESCRIPTOR
+
+ override fun getDescribed(): Any = listOf(obj, schema)
+}
+
+/**
+ * This and the classes below are OO representations of the AMQP XML schema described in the specification. Their
+ * [toString] representations generate the associated XML form.
+ */
+data class Schema(val types: List) : DescribedType {
+ companion object : DescribedTypeConstructor {
+ val DESCRIPTOR = UnsignedLong(2L or DESCRIPTOR_TOP_32BITS)
+
+ fun get(obj: Any): Schema {
+ val describedType = obj as DescribedType
+ if (describedType.descriptor != DESCRIPTOR) {
+ throw NotSerializableException("Unexpected descriptor ${describedType.descriptor}.")
+ }
+ val list = describedType.described as List<*>
+ return newInstance(listOf((list[0] as List<*>).map { TypeNotation.get(it!!) }))
+ }
+
+ override fun getTypeClass(): Class<*> = Schema::class.java
+
+ override fun newInstance(described: Any?): Schema {
+ val list = described as? List<*> ?: throw IllegalStateException("Was expecting a list")
+ @Suppress("UNCHECKED_CAST")
+ return Schema(list[0] as List)
+ }
+ }
+
+ override fun getDescriptor(): Any = DESCRIPTOR
+
+ override fun getDescribed(): Any = listOf(types)
+
+ override fun toString(): String = types.joinToString("\n")
+}
+
+data class Descriptor(val name: String?, val code: UnsignedLong?) : DescribedType {
+ companion object : DescribedTypeConstructor {
+ val DESCRIPTOR = UnsignedLong(3L or DESCRIPTOR_TOP_32BITS)
+
+ fun get(obj: Any): Descriptor {
+ val describedType = obj as DescribedType
+ if (describedType.descriptor != DESCRIPTOR) {
+ throw NotSerializableException("Unexpected descriptor ${describedType.descriptor}.")
+ }
+ return newInstance(describedType.described)
+ }
+
+ override fun getTypeClass(): Class<*> = Descriptor::class.java
+
+ override fun newInstance(described: Any?): Descriptor {
+ val list = described as? List<*> ?: throw IllegalStateException("Was expecting a list")
+ return Descriptor(list[0] as? String, list[1] as? UnsignedLong)
+ }
+ }
+
+ override fun getDescriptor(): Any = DESCRIPTOR
+
+ override fun getDescribed(): Any = listOf(name, code)
+
+ override fun toString(): String {
+ val sb = StringBuilder("")
+ return sb.toString()
+ }
+}
+
+data class Field(val name: String, val type: String, val requires: List, val default: String?, val label: String?, val mandatory: Boolean, val multiple: Boolean) : DescribedType {
+ companion object : DescribedTypeConstructor {
+ val DESCRIPTOR = UnsignedLong(4L or DESCRIPTOR_TOP_32BITS)
+
+ fun get(obj: Any): Field {
+ val describedType = obj as DescribedType
+ if (describedType.descriptor != DESCRIPTOR) {
+ throw NotSerializableException("Unexpected descriptor ${describedType.descriptor}.")
+ }
+ return newInstance(describedType.described)
+ }
+
+ override fun getTypeClass(): Class<*> = Field::class.java
+
+ override fun newInstance(described: Any?): Field {
+ val list = described as? List<*> ?: throw IllegalStateException("Was expecting a list")
+ @Suppress("UNCHECKED_CAST")
+ return Field(list[0] as String, list[1] as String, list[2] as List, list[3] as? String, list[4] as? String, list[5] as Boolean, list[6] as Boolean)
+ }
+ }
+
+ override fun getDescriptor(): Any = DESCRIPTOR
+
+ override fun getDescribed(): Any = listOf(name, type, requires, default, label, mandatory, multiple)
+
+ override fun toString(): String {
+ val sb = StringBuilder("")
+ return sb.toString()
+ }
+}
+
+sealed class TypeNotation : DescribedType {
+ companion object {
+ fun get(obj: Any): TypeNotation {
+ val describedType = obj as DescribedType
+ if (describedType.descriptor == CompositeType.DESCRIPTOR) {
+ return CompositeType.get(describedType)
+ } else if (describedType.descriptor == RestrictedType.DESCRIPTOR) {
+ return RestrictedType.get(describedType)
+ } else {
+ throw NotSerializableException("Unexpected descriptor ${describedType.descriptor}.")
+ }
+ }
+ }
+
+ abstract val name: String
+ abstract val label: String?
+ abstract val provides: List
+ abstract val descriptor: Descriptor
+}
+
+data class CompositeType(override val name: String, override val label: String?, override val provides: List, override val descriptor: Descriptor, val fields: List) : TypeNotation() {
+ companion object : DescribedTypeConstructor {
+ val DESCRIPTOR = UnsignedLong(5L or DESCRIPTOR_TOP_32BITS)
+
+ fun get(describedType: DescribedType): CompositeType {
+ if (describedType.descriptor != DESCRIPTOR) {
+ throw NotSerializableException("Unexpected descriptor ${describedType.descriptor}.")
+ }
+ val list = describedType.described as List<*>
+ return newInstance(listOf(list[0], list[1], list[2], Descriptor.get(list[3]!!), (list[4] as List<*>).map { Field.get(it!!) }))
+ }
+
+ override fun getTypeClass(): Class<*> = CompositeType::class.java
+
+ override fun newInstance(described: Any?): CompositeType {
+ val list = described as? List<*> ?: throw IllegalStateException("Was expecting a list")
+ @Suppress("UNCHECKED_CAST")
+ return CompositeType(list[0] as String, list[1] as? String, list[2] as List, list[3] as Descriptor, list[4] as List)
+ }
+ }
+
+ override fun getDescriptor(): Any = DESCRIPTOR
+
+ override fun getDescribed(): Any = listOf(name, label, provides, descriptor, fields)
+
+ override fun toString(): String {
+ val sb = StringBuilder("\n")
+ sb.append(" $descriptor\n")
+ for (field in fields) {
+ sb.append(" $field\n")
+ }
+ sb.append("")
+ return sb.toString()
+ }
+}
+
+data class RestrictedType(override val name: String, override val label: String?, override val provides: List, val source: String, override val descriptor: Descriptor, val choices: List) : TypeNotation() {
+ companion object : DescribedTypeConstructor {
+ val DESCRIPTOR = UnsignedLong(6L or DESCRIPTOR_TOP_32BITS)
+
+ fun get(describedType: DescribedType): RestrictedType {
+ if (describedType.descriptor != DESCRIPTOR) {
+ throw NotSerializableException("Unexpected descriptor ${describedType.descriptor}.")
+ }
+ val list = describedType.described as List<*>
+ return newInstance(listOf(list[0], list[1], list[2], list[3], Descriptor.get(list[4]!!), (list[5] as List<*>).map { Choice.get(it!!) }))
+ }
+
+ override fun getTypeClass(): Class<*> = RestrictedType::class.java
+
+ override fun newInstance(described: Any?): RestrictedType {
+ val list = described as? List<*> ?: throw IllegalStateException("Was expecting a list")
+ @Suppress("UNCHECKED_CAST")
+ return RestrictedType(list[0] as String, list[1] as? String, list[2] as List, list[3] as String, list[4] as Descriptor, list[5] as List)
+ }
+ }
+
+ override fun getDescriptor(): Any = DESCRIPTOR
+
+ override fun getDescribed(): Any = listOf(name, label, provides, source, descriptor, choices)
+
+ override fun toString(): String {
+ val sb = StringBuilder("\n")
+ sb.append(" $descriptor\n")
+ sb.append("")
+ return sb.toString()
+ }
+}
+
+data class Choice(val name: String, val value: String) : DescribedType {
+ companion object : DescribedTypeConstructor {
+ val DESCRIPTOR = UnsignedLong(7L or DESCRIPTOR_TOP_32BITS)
+
+ fun get(obj: Any): Choice {
+ val describedType = obj as DescribedType
+ if (describedType.descriptor != DESCRIPTOR) {
+ throw NotSerializableException("Unexpected descriptor ${describedType.descriptor}.")
+ }
+ return newInstance(describedType.described)
+ }
+
+ override fun getTypeClass(): Class<*> = Choice::class.java
+
+ override fun newInstance(described: Any?): Choice {
+ val list = described as? List<*> ?: throw IllegalStateException("Was expecting a list")
+ return Choice(list[0] as String, list[1] as String)
+ }
+ }
+
+ override fun getDescriptor(): Any = DESCRIPTOR
+
+ override fun getDescribed(): Any = listOf(name, value)
+
+ override fun toString(): String {
+ return ""
+ }
+}
+
+private val ARRAY_HASH: String = "Array = true"
+private val ALREADY_SEEN_HASH: String = "Already seen = true"
+private val NULLABLE_HASH: String = "Nullable = true"
+private val NOT_NULLABLE_HASH: String = "Nullable = false"
+private val ANY_TYPE_HASH: String = "Any type = true"
+
+/**
+ * The method generates a fingerprint for a given JVM [Type] that should be unique to the schema representation.
+ * Thus it only takes into account properties and types and only supports the same object graph subset as the overall
+ * serialization code.
+ *
+ * The idea being that even for two classes that share the same name but differ in a minor way, the fingerprint will be
+ * different.
+ */
+// TODO: write tests
+internal fun fingerprintForType(type: Type): String = Base58.encode(fingerprintForType(type, HashSet(), Hashing.murmur3_128().newHasher()).hash().asBytes())
+
+private fun fingerprintForType(type: Type, alreadySeen: MutableSet, hasher: Hasher): Hasher {
+ return if (type in alreadySeen) {
+ hasher.putUnencodedChars(ALREADY_SEEN_HASH)
+ } else {
+ alreadySeen += type
+ if (type is SerializerFactory.AnyType) {
+ hasher.putUnencodedChars(ANY_TYPE_HASH)
+ } else if (type is Class<*>) {
+ if (type.isArray) {
+ fingerprintForType(type.componentType, alreadySeen, hasher).putUnencodedChars(ARRAY_HASH)
+ } else if (SerializerFactory.isPrimitive(type)) {
+ hasher.putUnencodedChars(type.name)
+ } else if (Collection::class.java.isAssignableFrom(type) || Map::class.java.isAssignableFrom(type)) {
+ hasher.putUnencodedChars(type.name)
+ } else {
+ // Hash the class + properties + interfaces
+ propertiesForSerialization(constructorForDeserialization(type), type).fold(hasher.putUnencodedChars(type.name)) { orig, param ->
+ fingerprintForType(param.readMethod.genericReturnType, alreadySeen, orig).putUnencodedChars(param.name).putUnencodedChars(if (param.mandatory) NOT_NULLABLE_HASH else NULLABLE_HASH)
+ }
+ interfacesForSerialization(type).map { fingerprintForType(it, alreadySeen, hasher) }
+ hasher
+ }
+ } else if (type is ParameterizedType) {
+ // Hash the rawType + params
+ type.actualTypeArguments.fold(fingerprintForType(type.rawType, alreadySeen, hasher)) { orig, paramType -> fingerprintForType(paramType, alreadySeen, orig) }
+ } else if (type is GenericArrayType) {
+ // Hash the element type + some array hash
+ fingerprintForType(type.genericComponentType, alreadySeen, hasher).putUnencodedChars(ARRAY_HASH)
+ } else {
+ throw NotSerializableException("Don't know how to hash $type")
+ }
+ }
+}
diff --git a/core/src/main/kotlin/net/corda/core/serialization/amqp/SerializationHelper.kt b/core/src/main/kotlin/net/corda/core/serialization/amqp/SerializationHelper.kt
new file mode 100644
index 0000000000..107769cde7
--- /dev/null
+++ b/core/src/main/kotlin/net/corda/core/serialization/amqp/SerializationHelper.kt
@@ -0,0 +1,139 @@
+package net.corda.core.serialization.amqp
+
+import org.apache.qpid.proton.codec.Data
+import java.beans.Introspector
+import java.beans.PropertyDescriptor
+import java.io.NotSerializableException
+import java.lang.reflect.Modifier
+import java.lang.reflect.ParameterizedType
+import java.lang.reflect.Type
+import kotlin.reflect.KClass
+import kotlin.reflect.KFunction
+import kotlin.reflect.full.findAnnotation
+import kotlin.reflect.full.primaryConstructor
+import kotlin.reflect.jvm.javaType
+
+/**
+ * Annotation indicating a constructor to be used to reconstruct instances of a class during deserialization.
+ */
+@Target(AnnotationTarget.CONSTRUCTOR)
+@Retention(AnnotationRetention.RUNTIME)
+annotation class ConstructorForDeserialization
+
+/**
+ * Code for finding the constructor we will use for deserialization.
+ *
+ * If there's only one constructor, it selects that. If there are two and one is the default, it selects the other.
+ * Otherwise it starts with the primary constructor in kotlin, if there is one, and then will override this with any that is
+ * annotated with [@CordaConstructor]. It will report an error if more than one constructor is annotated.
+ */
+internal fun constructorForDeserialization(clazz: Class): KFunction? {
+ if (isConcrete(clazz)) {
+ var preferredCandidate: KFunction? = clazz.kotlin.primaryConstructor
+ var annotatedCount = 0
+ val kotlinConstructors = clazz.kotlin.constructors
+ val hasDefault = kotlinConstructors.any { it.parameters.isEmpty() }
+ for (kotlinConstructor in kotlinConstructors) {
+ if (preferredCandidate == null && kotlinConstructors.size == 1 && !hasDefault) {
+ preferredCandidate = kotlinConstructor
+ } else if (preferredCandidate == null && kotlinConstructors.size == 2 && hasDefault && kotlinConstructor.parameters.isNotEmpty()) {
+ preferredCandidate = kotlinConstructor
+ } else if (kotlinConstructor.findAnnotation() != null) {
+ if (annotatedCount++ > 0) {
+ throw NotSerializableException("More than one constructor for $clazz is annotated with @CordaConstructor.")
+ }
+ preferredCandidate = kotlinConstructor
+ }
+ }
+ return preferredCandidate ?: throw NotSerializableException("No constructor for deserialization found for $clazz.")
+ } else {
+ return null
+ }
+}
+
+/**
+ * Identifies the properties to be used during serialization by attempting to find those that match the parameters to the
+ * deserialization constructor, if the class is concrete. If it is abstract, or an interface, then use all the properties.
+ *
+ * Note, you will need any Java classes to be compiled with the `-parameters` option to ensure constructor parameters have
+ * names accessible via reflection.
+ */
+internal fun propertiesForSerialization(kotlinConstructor: KFunction?, clazz: Class<*>): Collection {
+ return if (kotlinConstructor != null) propertiesForSerialization(kotlinConstructor) else propertiesForSerialization(clazz)
+}
+
+private fun isConcrete(clazz: Class<*>): Boolean = !(clazz.isInterface || Modifier.isAbstract(clazz.modifiers))
+
+private fun propertiesForSerialization(kotlinConstructor: KFunction): Collection {
+ val clazz = (kotlinConstructor.returnType.classifier as KClass<*>).javaObjectType
+ // Kotlin reflection doesn't work with Java getters the way you might expect, so we drop back to good ol' beans.
+ val properties: Map = Introspector.getBeanInfo(clazz).propertyDescriptors.filter { it.name != "class" }.groupBy { it.name }.mapValues { it.value[0] }
+ val rc: MutableList = ArrayList(kotlinConstructor.parameters.size)
+ for (param in kotlinConstructor.parameters) {
+ val name = param.name ?: throw NotSerializableException("Constructor parameter of $clazz has no name.")
+ val matchingProperty = properties[name] ?: throw NotSerializableException("No property matching constructor parameter named $name of $clazz. If using Java, check that you have the -parameters option specified in the Java compiler.")
+ // Check that the method has a getter in java.
+ val getter = matchingProperty.readMethod ?: throw NotSerializableException("Property has no getter method for $name of $clazz. If using Java and the parameter name looks anonymous, check that you have the -parameters option specified in the Java compiler.")
+ if (getter.genericReturnType == param.type.javaType) {
+ rc += PropertySerializer.make(name, getter)
+ } else {
+ throw NotSerializableException("Property type ${getter.genericReturnType} for $name of $clazz differs from constructor parameter type ${param.type.javaType}")
+ }
+ }
+ return rc
+}
+
+private fun propertiesForSerialization(clazz: Class<*>): Collection {
+ // Kotlin reflection doesn't work with Java getters the way you might expect, so we drop back to good ol' beans.
+ val properties = Introspector.getBeanInfo(clazz).propertyDescriptors.filter { it.name != "class" }.sortedBy { it.name }
+ val rc: MutableList = ArrayList(properties.size)
+ for (property in properties) {
+ // Check that the method has a getter in java.
+ val getter = property.readMethod ?: throw NotSerializableException("Property has no getter method for ${property.name} of $clazz.")
+ rc += PropertySerializer.make(property.name, getter)
+ }
+ return rc
+}
+
+internal fun interfacesForSerialization(clazz: Class<*>): List {
+ val interfaces = LinkedHashSet()
+ exploreType(clazz, interfaces)
+ return interfaces.toList()
+}
+
+private fun exploreType(type: Type?, interfaces: MutableSet) {
+ val clazz = (type as? Class<*>) ?: (type as? ParameterizedType)?.rawType as? Class<*>
+ if (clazz != null) {
+ for (newInterface in clazz.genericInterfaces) {
+ if (newInterface !in interfaces) {
+ interfaces += newInterface
+ exploreType(newInterface, interfaces)
+ }
+ }
+ exploreType(clazz.genericSuperclass, interfaces)
+ }
+}
+
+/**
+ * Extension helper for writing described objects.
+ */
+fun Data.withDescribed(descriptor: Descriptor, block: Data.() -> Unit) {
+ // Write described
+ putDescribed()
+ enter()
+ // Write descriptor
+ putObject(descriptor.code ?: descriptor.name)
+ block()
+ exit() // exit described
+}
+
+/**
+ * Extension helper for writing lists.
+ */
+fun Data.withList(block: Data.() -> Unit) {
+ // Write list
+ putList()
+ enter()
+ block()
+ exit() // exit list
+}
\ No newline at end of file
diff --git a/core/src/main/kotlin/net/corda/core/serialization/amqp/SerializationOutput.kt b/core/src/main/kotlin/net/corda/core/serialization/amqp/SerializationOutput.kt
new file mode 100644
index 0000000000..f440d62c2a
--- /dev/null
+++ b/core/src/main/kotlin/net/corda/core/serialization/amqp/SerializationOutput.kt
@@ -0,0 +1,85 @@
+package net.corda.core.serialization.amqp
+
+import net.corda.core.serialization.SerializedBytes
+import org.apache.qpid.proton.codec.Data
+import java.io.NotSerializableException
+import java.lang.reflect.Type
+import java.nio.ByteBuffer
+import java.util.*
+import kotlin.collections.LinkedHashSet
+
+/**
+ * Main entry point for serializing an object to AMQP.
+ *
+ * @param serializerFactory This is the factory for [AMQPSerializer] instances and can be shared across multiple
+ * instances and threads.
+ */
+class SerializationOutput(private val serializerFactory: SerializerFactory = SerializerFactory()) {
+ // TODO: we're not supporting object refs yet
+ private val objectHistory: MutableMap = IdentityHashMap()
+ private val serializerHistory: MutableSet = LinkedHashSet()
+ private val schemaHistory: MutableSet = LinkedHashSet()
+
+ /**
+ * Serialize the given object to AMQP, wrapped in our [Envelope] wrapper which carries an AMQP 1.0 schema, and prefixed
+ * with a header to indicate that this is serialized with AMQP and not [Kryo], and what version of the Corda implementation
+ * of AMQP serialization contructed the serialized form.
+ */
+ @Throws(NotSerializableException::class)
+ fun serialize(obj: T): SerializedBytes {
+ try {
+ val data = Data.Factory.create()
+ data.withDescribed(Envelope.DESCRIPTOR_OBJECT) {
+ withList {
+ // Our object
+ writeObject(obj, this)
+ // The schema
+ putObject(Schema(schemaHistory.toList()))
+ }
+ }
+ val bytes = ByteArray(data.encodedSize().toInt() + 8)
+ val buf = ByteBuffer.wrap(bytes)
+ buf.put(AmqpHeaderV1_0.bytes)
+ data.encode(buf)
+ return SerializedBytes(bytes)
+ } finally {
+ objectHistory.clear()
+ serializerHistory.clear()
+ schemaHistory.clear()
+ }
+ }
+
+ internal fun writeObject(obj: Any, data: Data) {
+ writeObject(obj, data, obj.javaClass)
+ }
+
+ internal fun writeObjectOrNull(obj: Any?, data: Data, type: Type) {
+ if (obj == null) {
+ data.putNull()
+ } else {
+ writeObject(obj, data, if (type == SerializerFactory.AnyType) obj.javaClass else type)
+ }
+ }
+
+ internal fun writeObject(obj: Any, data: Data, type: Type) {
+ val serializer = serializerFactory.get(obj.javaClass, type)
+ if (serializer !in serializerHistory) {
+ serializer.writeClassInfo(this)
+ }
+ serializer.writeObject(obj, data, type, this)
+ }
+
+ internal fun writeTypeNotations(vararg typeNotation: TypeNotation): Boolean {
+ return schemaHistory.addAll(typeNotation)
+ }
+
+ internal fun requireSerializer(type: Type) {
+ if (type != SerializerFactory.AnyType) {
+ val serializer = serializerFactory.get(null, type)
+ if (serializer !in serializerHistory) {
+ serializer.writeClassInfo(this)
+ }
+ }
+ }
+}
+
diff --git a/core/src/main/kotlin/net/corda/core/serialization/amqp/SerializerFactory.kt b/core/src/main/kotlin/net/corda/core/serialization/amqp/SerializerFactory.kt
new file mode 100644
index 0000000000..1456c9a7ca
--- /dev/null
+++ b/core/src/main/kotlin/net/corda/core/serialization/amqp/SerializerFactory.kt
@@ -0,0 +1,196 @@
+package net.corda.core.serialization.amqp
+
+import com.google.common.primitives.Primitives
+import net.corda.core.checkNotUnorderedHashMap
+import net.corda.core.serialization.AllWhitelist
+import net.corda.core.serialization.ClassWhitelist
+import net.corda.core.serialization.CordaSerializable
+import org.apache.qpid.proton.amqp.*
+import java.io.NotSerializableException
+import java.lang.reflect.GenericArrayType
+import java.lang.reflect.ParameterizedType
+import java.lang.reflect.Type
+import java.util.*
+import java.util.concurrent.ConcurrentHashMap
+import javax.annotation.concurrent.ThreadSafe
+
+/**
+ * Factory of serializers designed to be shared across threads and invocations.
+ */
+// TODO: object references
+// TODO: class references? (e.g. cheat with repeated descriptors using a long encoding, like object ref proposal)
+// TODO: Inner classes etc
+// TODO: support for custom serialisation of core types (of e.g. PublicKey, Throwables)
+// TODO: exclude schemas for core types that don't need custom serializers that everyone already knows the schema for.
+// TODO: support for intern-ing of deserialized objects for some core types (e.g. PublicKey) for memory efficiency
+// TODO: maybe support for caching of serialized form of some core types for performance
+// TODO: profile for performance in general
+// TODO: use guava caches etc so not unbounded
+// TODO: do we need to support a transient annotation to exclude certain properties?
+// TODO: incorporate the class carpenter for classes not on the classpath.
+// TODO: apply class loader logic and an "app context" throughout this code.
+// TODO: schema evolution solution when the fingerprints do not line up.
+@ThreadSafe
+class SerializerFactory(val whitelist: ClassWhitelist = AllWhitelist) {
+ private val serializersByType = ConcurrentHashMap()
+ private val serializersByDescriptor = ConcurrentHashMap()
+
+ /**
+ * Look up, and manufacture if necessary, a serializer for the given type.
+ *
+ * @param actualType Will be null if there isn't an actual object instance available (e.g. for
+ * restricted type processing).
+ */
+ @Throws(NotSerializableException::class)
+ fun get(actualType: Class<*>?, declaredType: Type): AMQPSerializer {
+ if (declaredType is ParameterizedType) {
+ return serializersByType.computeIfAbsent(declaredType) {
+ // We allow only Collection and Map.
+ val rawType = declaredType.rawType
+ if (rawType is Class<*>) {
+ checkParameterisedTypesConcrete(declaredType.actualTypeArguments)
+ if (Collection::class.java.isAssignableFrom(rawType)) {
+ CollectionSerializer(declaredType)
+ } else if (Map::class.java.isAssignableFrom(rawType)) {
+ makeMapSerializer(declaredType)
+ } else {
+ throw NotSerializableException("Declared types of $declaredType are not supported.")
+ }
+ } else {
+ throw NotSerializableException("Declared types of $declaredType are not supported.")
+ }
+ }
+ } else if (declaredType is Class<*>) {
+ // Simple classes allowed
+ if (Collection::class.java.isAssignableFrom(declaredType)) {
+ return serializersByType.computeIfAbsent(declaredType) { CollectionSerializer(DeserializedParameterizedType(declaredType, arrayOf(AnyType), null)) }
+ } else if (Map::class.java.isAssignableFrom(declaredType)) {
+ return serializersByType.computeIfAbsent(declaredType) { makeMapSerializer(DeserializedParameterizedType(declaredType, arrayOf(AnyType, AnyType), null)) }
+ } else {
+ return makeClassSerializer(actualType ?: declaredType)
+ }
+ } else if (declaredType is GenericArrayType) {
+ return serializersByType.computeIfAbsent(declaredType) { ArraySerializer(declaredType) }
+ } else {
+ throw NotSerializableException("Declared types of $declaredType are not supported.")
+ }
+ }
+
+ @Throws(NotSerializableException::class)
+ fun get(typeDescriptor: Any, envelope: Envelope): AMQPSerializer {
+ return serializersByDescriptor[typeDescriptor] ?: {
+ processSchema(envelope.schema)
+ serializersByDescriptor[typeDescriptor] ?: throw NotSerializableException("Could not find type matching descriptor $typeDescriptor.")
+ }()
+ }
+
+ private fun processSchema(schema: Schema) {
+ for (typeNotation in schema.types) {
+ processSchemaEntry(typeNotation)
+ }
+ }
+
+ private fun processSchemaEntry(typeNotation: TypeNotation) {
+ when (typeNotation) {
+ is CompositeType -> processCompositeType(typeNotation) // java.lang.Class (whether a class or interface)
+ is RestrictedType -> processRestrictedType(typeNotation) // Collection / Map, possibly with generics
+ }
+ }
+
+ private fun restrictedTypeForName(name: String): Type {
+ return if (name.endsWith("[]")) {
+ DeserializedGenericArrayType(restrictedTypeForName(name.substring(0, name.lastIndex - 1)))
+ } else {
+ DeserializedParameterizedType.make(name)
+ }
+ }
+
+ private fun processRestrictedType(typeNotation: RestrictedType) {
+ serializersByDescriptor.computeIfAbsent(typeNotation.descriptor.name!!) {
+ // TODO: class loader logic, and compare the schema.
+ val type = restrictedTypeForName(typeNotation.name)
+ get(null, type)
+ }
+ }
+
+ private fun processCompositeType(typeNotation: CompositeType) {
+ serializersByDescriptor.computeIfAbsent(typeNotation.descriptor.name!!) {
+ // TODO: class loader logic, and compare the schema.
+ val clazz = Class.forName(typeNotation.name)
+ get(clazz, clazz)
+ }
+ }
+
+ private fun checkParameterisedTypesConcrete(actualTypeArguments: Array) {
+ for (type in actualTypeArguments) {
+ // Needs to be another parameterised type or a class, or any type.
+ if (type !is Class<*>) {
+ if (type is ParameterizedType) {
+ checkParameterisedTypesConcrete(type.actualTypeArguments)
+ } else if (type != AnyType) {
+ throw NotSerializableException("Declared parameterised types containing $type as a parameter are not supported.")
+ }
+ }
+ }
+ }
+
+ private fun makeClassSerializer(clazz: Class<*>): AMQPSerializer {
+ return serializersByType.computeIfAbsent(clazz) {
+ if (clazz.isArray) {
+ whitelisted(clazz.componentType)
+ ArraySerializer(clazz)
+ } else if (isPrimitive(clazz)) {
+ AMQPPrimitiveSerializer(clazz)
+ } else {
+ whitelisted(clazz)
+ ObjectSerializer(clazz)
+ }
+ }
+ }
+
+ private fun whitelisted(clazz: Class<*>): Boolean {
+ if (whitelist.hasListed(clazz) || clazz.isAnnotationPresent(CordaSerializable::class.java)) {
+ return true
+ } else {
+ throw NotSerializableException("Class $clazz is not on the whitelist or annotated with @CordaSerializable.")
+ }
+ }
+
+ private fun makeMapSerializer(declaredType: ParameterizedType): AMQPSerializer {
+ val rawType = declaredType.rawType as Class<*>
+ rawType.checkNotUnorderedHashMap()
+ return MapSerializer(declaredType)
+ }
+
+ companion object {
+ fun isPrimitive(type: Type): Boolean = type is Class<*> && Primitives.wrap(type) in primitiveTypeNames
+
+ fun primitiveTypeName(type: Type): String? = primitiveTypeNames[type as? Class<*>]
+
+ private val primitiveTypeNames: Map, String> = mapOf(
+ Boolean::class.java to "boolean",
+ Byte::class.java to "byte",
+ UnsignedByte::class.java to "ubyte",
+ Short::class.java to "short",
+ UnsignedShort::class.java to "ushort",
+ Integer::class.java to "int",
+ UnsignedInteger::class.java to "uint",
+ Long::class.java to "long",
+ UnsignedLong::class.java to "ulong",
+ Float::class.java to "float",
+ Double::class.java to "double",
+ Decimal32::class.java to "decimal32",
+ Decimal64::class.java to "decimal62",
+ Decimal128::class.java to "decimal128",
+ Char::class.java to "char",
+ Date::class.java to "timestamp",
+ UUID::class.java to "uuid",
+ ByteArray::class.java to "binary",
+ String::class.java to "string",
+ Symbol::class.java to "symbol")
+ }
+
+ object AnyType : Type {
+ override fun toString(): String = "*"
+ }
+}
diff --git a/core/src/test/java/net/corda/core/serialization/amqp/JavaSerializationOutputTests.java b/core/src/test/java/net/corda/core/serialization/amqp/JavaSerializationOutputTests.java
new file mode 100644
index 0000000000..fe9e9f07a1
--- /dev/null
+++ b/core/src/test/java/net/corda/core/serialization/amqp/JavaSerializationOutputTests.java
@@ -0,0 +1,226 @@
+package net.corda.core.serialization.amqp;
+
+import net.corda.core.serialization.SerializedBytes;
+import org.apache.qpid.proton.codec.DecoderImpl;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import java.io.NotSerializableException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+import static org.junit.Assert.assertTrue;
+
+public class JavaSerializationOutputTests {
+
+ static class Foo {
+ private final String bob;
+ private final int count;
+
+ public Foo(String msg, long count) {
+ this.bob = msg;
+ this.count = (int) count;
+ }
+
+ @ConstructorForDeserialization
+ public Foo(String fred, int count) {
+ this.bob = fred;
+ this.count = count;
+ }
+
+ public String getFred() {
+ return bob;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Foo foo = (Foo) o;
+
+ if (count != foo.count) return false;
+ return bob != null ? bob.equals(foo.bob) : foo.bob == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = bob != null ? bob.hashCode() : 0;
+ result = 31 * result + count;
+ return result;
+ }
+ }
+
+ static class UnAnnotatedFoo {
+ private final String bob;
+ private final int count;
+
+ public UnAnnotatedFoo(String fred, int count) {
+ this.bob = fred;
+ this.count = count;
+ }
+
+ public String getFred() {
+ return bob;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ UnAnnotatedFoo foo = (UnAnnotatedFoo) o;
+
+ if (count != foo.count) return false;
+ return bob != null ? bob.equals(foo.bob) : foo.bob == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = bob != null ? bob.hashCode() : 0;
+ result = 31 * result + count;
+ return result;
+ }
+ }
+
+ static class BoxedFoo {
+ private final String fred;
+ private final Integer count;
+
+ public BoxedFoo(String fred, Integer count) {
+ this.fred = fred;
+ this.count = count;
+ }
+
+ public String getFred() {
+ return fred;
+ }
+
+ public Integer getCount() {
+ return count;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ BoxedFoo boxedFoo = (BoxedFoo) o;
+
+ if (fred != null ? !fred.equals(boxedFoo.fred) : boxedFoo.fred != null) return false;
+ return count != null ? count.equals(boxedFoo.count) : boxedFoo.count == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = fred != null ? fred.hashCode() : 0;
+ result = 31 * result + (count != null ? count.hashCode() : 0);
+ return result;
+ }
+ }
+
+
+ static class BoxedFooNotNull {
+ private final String fred;
+ private final Integer count;
+
+ public BoxedFooNotNull(String fred, Integer count) {
+ this.fred = fred;
+ this.count = count;
+ }
+
+ public String getFred() {
+ return fred;
+ }
+
+ @Nonnull
+ public Integer getCount() {
+ return count;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ BoxedFooNotNull boxedFoo = (BoxedFooNotNull) o;
+
+ if (fred != null ? !fred.equals(boxedFoo.fred) : boxedFoo.fred != null) return false;
+ return count != null ? count.equals(boxedFoo.count) : boxedFoo.count == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = fred != null ? fred.hashCode() : 0;
+ result = 31 * result + (count != null ? count.hashCode() : 0);
+ return result;
+ }
+ }
+
+ private Object serdes(Object obj) throws NotSerializableException {
+ SerializerFactory factory = new SerializerFactory();
+ SerializationOutput ser = new SerializationOutput(factory);
+ SerializedBytes