diff --git a/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt b/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt index e898c7b610..f8b28eb37b 100644 --- a/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt +++ b/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt @@ -27,6 +27,7 @@ import net.corda.smoketesting.NodeProcess import org.apache.commons.io.output.NullOutputStream import org.junit.After import org.junit.Before +import org.junit.Ignore import org.junit.Test import java.io.FilterInputStream import java.io.InputStream @@ -94,8 +95,24 @@ class StandaloneCordaRPClientTest { financeJar.copyToDirectory(cordappsDir) } + @Test fun `test attachments`() { + val attachment = InputStreamAndHash.createInMemoryTestZip(attachmentSize, 1) + assertFalse(rpcProxy.attachmentExists(attachment.sha256)) + val id = attachment.inputStream.use { rpcProxy.uploadAttachment(it) } + assertEquals(attachment.sha256, id, "Attachment has incorrect SHA256 hash") + + val hash = HashingInputStream(Hashing.sha256(), rpcProxy.openAttachment(id)).use { it -> + it.copyTo(NullOutputStream()) + SecureHash.SHA256(it.hash().asBytes()) + } + assertEquals(attachment.sha256, hash) + } + + @Ignore("CORDA-1520 - After switching from Kryo to AMQP this test won't work") + @Test + fun `test wrapped attachments`() { val attachment = InputStreamAndHash.createInMemoryTestZip(attachmentSize, 1) assertFalse(rpcProxy.attachmentExists(attachment.sha256)) val id = WrapperStream(attachment.inputStream).use { rpcProxy.uploadAttachment(it) } diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/DeserializationInput.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/DeserializationInput.kt index ac1fa6eb15..822bb3d855 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/DeserializationInput.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/DeserializationInput.kt @@ -5,6 +5,7 @@ import net.corda.core.serialization.EncodingWhitelist import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializedBytes import net.corda.core.utilities.ByteSequence +import net.corda.core.utilities.loggerFor import net.corda.serialization.internal.* import org.apache.qpid.proton.amqp.Binary import org.apache.qpid.proton.amqp.DescribedType @@ -29,6 +30,7 @@ data class ObjectAndEnvelope(val obj: T, val envelope: Envelope) class DeserializationInput @JvmOverloads constructor(private val serializerFactory: SerializerFactory, private val encodingWhitelist: EncodingWhitelist = NullEncodingWhitelist) { private val objectHistory: MutableList = mutableListOf() + private val logger = loggerFor() companion object { @VisibleForTesting @@ -73,7 +75,6 @@ class DeserializationInput @JvmOverloads constructor(private val serializerFacto inline fun deserialize(bytes: SerializedBytes, context: SerializationContext): T = deserialize(bytes, T::class.java, context) - @Throws(NotSerializableException::class) private fun des(generator: () -> R): R { try { @@ -96,6 +97,9 @@ class DeserializationInput @JvmOverloads constructor(private val serializerFacto fun deserialize(bytes: ByteSequence, clazz: Class, context: SerializationContext): T = des { val envelope = getEnvelope(bytes, encodingWhitelist) + + logger.trace("deserialize blob scheme=\"${envelope.schema.toString()}\"") + clazz.cast(readObjectOrNull(envelope.obj, SerializationSchemas(envelope.schema, envelope.transformsSchema), clazz, context)) } diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/EvolutionSerializer.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/EvolutionSerializer.kt index 62914e1518..7a99b142ff 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/EvolutionSerializer.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/EvolutionSerializer.kt @@ -275,9 +275,13 @@ class EvolutionSerializerGetter : EvolutionSerializerGetterBase() { // both the new and old fingerprint if (newSerializer is CollectionSerializer || newSerializer is MapSerializer) { newSerializer - } else { + } else if (newSerializer is EnumSerializer){ EnumEvolutionSerializer.make(typeNotation, newSerializer, factory, schemas) } + else { + loggerFor().error("typeNotation=${typeNotation.name} Need to evolve unsupported type") + throw NotSerializableException ("${typeNotation.name} cannot be evolved") + } } } } diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/Schema.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/Schema.kt index 009268c5e3..1c6156f818 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/Schema.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/Schema.kt @@ -153,7 +153,13 @@ sealed class TypeNotation : DescribedType { abstract val descriptor: Descriptor } -data class CompositeType(override val name: String, override val label: String?, override val provides: List, override val descriptor: Descriptor, val fields: List) : TypeNotation() { +data class CompositeType( + override val name: String, + override val label: String?, + override val provides: List, + override val descriptor: Descriptor, + val fields: List +) : TypeNotation() { companion object : DescribedTypeConstructor { val DESCRIPTOR = AMQPDescriptorRegistry.COMPOSITE_TYPE.amqpDescriptor diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializerFactory.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializerFactory.kt index 7162f27afb..b63e0f12e9 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializerFactory.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/SerializerFactory.kt @@ -5,6 +5,7 @@ import com.google.common.reflect.TypeResolver import net.corda.core.internal.uncheckedCast import net.corda.core.serialization.ClassWhitelist import net.corda.core.utilities.loggerFor +import net.corda.core.utilities.trace import net.corda.serialization.internal.carpenter.* import org.apache.qpid.proton.amqp.* import java.io.NotSerializableException @@ -54,6 +55,7 @@ open class SerializerFactory( serializersByDescriptor = ConcurrentHashMap(), customSerializers = CopyOnWriteArrayList(), transformsCache = ConcurrentHashMap()) + constructor(whitelist: ClassWhitelist, classLoader: ClassLoader, evolutionSerializerGetter: EvolutionSerializerGetterBase = EvolutionSerializerGetter(), @@ -74,6 +76,8 @@ open class SerializerFactory( private fun getEvolutionSerializer(typeNotation: TypeNotation, newSerializer: AMQPSerializer, schemas: SerializationSchemas) = evolutionSerializerGetter.getEvolutionSerializer(this, typeNotation, newSerializer, schemas) + private val logger = loggerFor() + /** * Look up, and manufacture if necessary, a serializer for the given type. * @@ -82,6 +86,9 @@ open class SerializerFactory( */ @Throws(NotSerializableException::class) fun get(actualClass: Class<*>?, declaredType: Type): AMQPSerializer { + // can be useful to enable but will be *extremely* chatty if you do + logger.trace { "Get Serializer for $actualClass ${declaredType.typeName}" } + val declaredClass = declaredType.asClass() ?: throw NotSerializableException( "Declared types of $declaredType are not supported.") @@ -107,10 +114,15 @@ open class SerializerFactory( makeMapSerializer(declaredTypeAmended) } } - Enum::class.java.isAssignableFrom(actualClass - ?: declaredClass) -> serializersByType.computeIfAbsent(actualClass ?: declaredClass) { - whitelist.requireWhitelisted(actualType) - EnumSerializer(actualType, actualClass ?: declaredClass, this) + Enum::class.java.isAssignableFrom(actualClass ?: declaredClass) -> { + logger.debug("class=[${actualClass?.simpleName} | $declaredClass] is an enumeration " + + "declaredType=${declaredType.typeName} " + + "isEnum=${declaredType::class.java.isEnum}") + + serializersByType.computeIfAbsent(actualClass ?: declaredClass) { + whitelist.requireWhitelisted(actualType) + EnumSerializer(actualType, actualClass ?: declaredClass, this) + } } else -> { makeClassSerializer(actualClass ?: declaredClass, actualType, declaredType) @@ -198,6 +210,7 @@ open class SerializerFactory( @Throws(NotSerializableException::class) fun get(typeDescriptor: Any, schema: SerializationSchemas): AMQPSerializer { return serializersByDescriptor[typeDescriptor] ?: { + logger.trace("get Serializer descriptor=${typeDescriptor}") processSchema(FactorySchemaAndDescriptor(schema, typeDescriptor)) serializersByDescriptor[typeDescriptor] ?: throw NotSerializableException( "Could not find type matching descriptor $typeDescriptor.") @@ -232,16 +245,24 @@ open class SerializerFactory( private fun processSchema(schemaAndDescriptor: FactorySchemaAndDescriptor, sentinel: Boolean = false) { val metaSchema = CarpenterMetaSchema.newInstance() for (typeNotation in schemaAndDescriptor.schemas.schema.types) { + logger.trace("descriptor=${schemaAndDescriptor.typeDescriptor}, typeNotation=${typeNotation.name}") try { val serialiser = processSchemaEntry(typeNotation) // if we just successfully built a serializer for the type but the type fingerprint // doesn't match that of the serialised object then we are dealing with different // instance of the class, as such we need to build an EvolutionSerializer if (serialiser.typeDescriptor != typeNotation.descriptor.name) { + logger.info("typeNotation=${typeNotation.name} action=\"requires Evolution\"") getEvolutionSerializer(typeNotation, serialiser, schemaAndDescriptor.schemas) } } catch (e: ClassNotFoundException) { - if (sentinel) throw e + if (sentinel) { + logger.error("typeNotation=${typeNotation.name} error=\"after Carpentry attempt failed to load\"") + throw e + } + else { + logger.info("typeNotation=\"${typeNotation.name}\" action=\"carpentry required\"") + } metaSchema.buildFor(typeNotation, classloader) } } @@ -270,8 +291,16 @@ open class SerializerFactory( } private fun processSchemaEntry(typeNotation: TypeNotation) = when (typeNotation) { - is CompositeType -> processCompositeType(typeNotation) // java.lang.Class (whether a class or interface) - is RestrictedType -> processRestrictedType(typeNotation) // Collection / Map, possibly with generics + // java.lang.Class (whether a class or interface) + is CompositeType -> { + logger.trace("typeNotation=${typeNotation.name} amqpType=CompositeType") + processCompositeType(typeNotation) + } + // Collection / Map, possibly with generics + is RestrictedType -> { + logger.trace("typeNotation=${typeNotation.name} amqpType=RestrictedType") + processRestrictedType(typeNotation) + } } // TODO: class loader logic, and compare the schema. @@ -285,6 +314,7 @@ open class SerializerFactory( } private fun makeClassSerializer(clazz: Class<*>, type: Type, declaredType: Type): AMQPSerializer = serializersByType.computeIfAbsent(type) { + logger.debug("class=${clazz.simpleName}, type=$type is a composite type") if (clazz.isSynthetic) { // Explicitly ban synthetic classes, we have no way of recreating them when deserializing. This also // captures Lambda expressions and other anonymous functions diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/custom/InputStreamSerializer.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/custom/InputStreamSerializer.kt index cc100d551e..1a7f5fce89 100644 --- a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/custom/InputStreamSerializer.kt +++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/custom/InputStreamSerializer.kt @@ -14,7 +14,15 @@ import java.lang.reflect.Type object InputStreamSerializer : CustomSerializer.Implements(InputStream::class.java) { override val revealSubclassesInSchema: Boolean = true - override val schemaForDocumentation = Schema(listOf(RestrictedType(type.toString(), "", listOf(type.toString()), SerializerFactory.primitiveTypeName(ByteArray::class.java)!!, descriptor, emptyList()))) + override val schemaForDocumentation = Schema( + listOf( + RestrictedType( + type.toString(), + "", + listOf(type.toString()), + SerializerFactory.primitiveTypeName(ByteArray::class.java)!!, + descriptor, + emptyList()))) override fun writeDescribedObject(obj: InputStream, data: Data, type: Type, output: SerializationOutput, context: SerializationContext diff --git a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/StreamTests.kt b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/StreamTests.kt new file mode 100644 index 0000000000..4ac37e433b --- /dev/null +++ b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/StreamTests.kt @@ -0,0 +1,53 @@ +package net.corda.serialization.internal.amqp + +import net.corda.core.internal.InputStreamAndHash +import net.corda.serialization.internal.amqp.custom.InputStreamSerializer +import net.corda.serialization.internal.amqp.testutils.TestSerializationOutput +import net.corda.serialization.internal.amqp.testutils.deserialize +import net.corda.serialization.internal.amqp.testutils.testDefaultFactory +import org.junit.Test +import java.io.FilterInputStream +import java.io.InputStream + +class StreamTests { + + private class WrapperStream(input: InputStream) : FilterInputStream(input) + + @Test + fun inputStream() { + val attachment = InputStreamAndHash.createInMemoryTestZip(2116, 1) + val id : InputStream = WrapperStream(attachment.inputStream) + + val serializerFactory = testDefaultFactory().apply { + register(InputStreamSerializer) + } + + val bytes = TestSerializationOutput(true, serializerFactory).serialize(id) + + val deserializerFactory = testDefaultFactory().apply { + register(InputStreamSerializer) + } + + DeserializationInput(serializerFactory).deserialize(bytes) + DeserializationInput(deserializerFactory).deserialize(bytes) + } + + @Test + fun listInputStream() { + val attachment = InputStreamAndHash.createInMemoryTestZip(2116, 1) + val id /* : List */= listOf(WrapperStream(attachment.inputStream)) + + val serializerFactory = testDefaultFactory().apply { + register(InputStreamSerializer) + } + + val bytes = TestSerializationOutput(true, serializerFactory).serialize(id) + + val deserializerFactory = testDefaultFactory().apply { + register(InputStreamSerializer) + } + + DeserializationInput(serializerFactory).deserialize(bytes) + DeserializationInput(deserializerFactory).deserialize(bytes) + } +} \ No newline at end of file diff --git a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/testutils/AMQPTestUtils.kt b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/testutils/AMQPTestUtils.kt index 68e4227172..8d78ed61c6 100644 --- a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/testutils/AMQPTestUtils.kt +++ b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/testutils/AMQPTestUtils.kt @@ -30,6 +30,15 @@ class TestSerializationOutput( } super.writeTransformSchema(transformsSchema, data) } + + @Throws(NotSerializableException::class) + fun serialize(obj: T): SerializedBytes { + try { + return _serialize(obj, testSerializationContext) + } finally { + andFinally() + } + } } fun testName(): String = Thread.currentThread().stackTrace[2].methodName