mirror of
https://github.com/corda/corda.git
synced 2025-01-29 15:43:55 +00:00
CORDA-1521 - Fix rpc attachment smoke test / better AMQP logging (#3213)
* CORDA-1521 - Fix rpc attachment smoke test / better AMQP logging * Remove poor debug message * Review comments * reduce debug spam
This commit is contained in:
parent
a3d88f752d
commit
7cbc316b9d
@ -27,6 +27,7 @@ import net.corda.smoketesting.NodeProcess
|
||||
import org.apache.commons.io.output.NullOutputStream
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Ignore
|
||||
import org.junit.Test
|
||||
import java.io.FilterInputStream
|
||||
import java.io.InputStream
|
||||
@ -94,8 +95,24 @@ class StandaloneCordaRPClientTest {
|
||||
financeJar.copyToDirectory(cordappsDir)
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
fun `test attachments`() {
|
||||
val attachment = InputStreamAndHash.createInMemoryTestZip(attachmentSize, 1)
|
||||
assertFalse(rpcProxy.attachmentExists(attachment.sha256))
|
||||
val id = attachment.inputStream.use { rpcProxy.uploadAttachment(it) }
|
||||
assertEquals(attachment.sha256, id, "Attachment has incorrect SHA256 hash")
|
||||
|
||||
val hash = HashingInputStream(Hashing.sha256(), rpcProxy.openAttachment(id)).use { it ->
|
||||
it.copyTo(NullOutputStream())
|
||||
SecureHash.SHA256(it.hash().asBytes())
|
||||
}
|
||||
assertEquals(attachment.sha256, hash)
|
||||
}
|
||||
|
||||
@Ignore("CORDA-1520 - After switching from Kryo to AMQP this test won't work")
|
||||
@Test
|
||||
fun `test wrapped attachments`() {
|
||||
val attachment = InputStreamAndHash.createInMemoryTestZip(attachmentSize, 1)
|
||||
assertFalse(rpcProxy.attachmentExists(attachment.sha256))
|
||||
val id = WrapperStream(attachment.inputStream).use { rpcProxy.uploadAttachment(it) }
|
||||
|
@ -5,6 +5,7 @@ import net.corda.core.serialization.EncodingWhitelist
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.serialization.internal.*
|
||||
import org.apache.qpid.proton.amqp.Binary
|
||||
import org.apache.qpid.proton.amqp.DescribedType
|
||||
@ -29,6 +30,7 @@ data class ObjectAndEnvelope<out T>(val obj: T, val envelope: Envelope)
|
||||
class DeserializationInput @JvmOverloads constructor(private val serializerFactory: SerializerFactory,
|
||||
private val encodingWhitelist: EncodingWhitelist = NullEncodingWhitelist) {
|
||||
private val objectHistory: MutableList<Any> = mutableListOf()
|
||||
private val logger = loggerFor<DeserializationInput>()
|
||||
|
||||
companion object {
|
||||
@VisibleForTesting
|
||||
@ -73,7 +75,6 @@ class DeserializationInput @JvmOverloads constructor(private val serializerFacto
|
||||
inline fun <reified T : Any> deserialize(bytes: SerializedBytes<T>, context: SerializationContext): T =
|
||||
deserialize(bytes, T::class.java, context)
|
||||
|
||||
|
||||
@Throws(NotSerializableException::class)
|
||||
private fun <R> des(generator: () -> R): R {
|
||||
try {
|
||||
@ -96,6 +97,9 @@ class DeserializationInput @JvmOverloads constructor(private val serializerFacto
|
||||
fun <T : Any> deserialize(bytes: ByteSequence, clazz: Class<T>, context: SerializationContext): T =
|
||||
des {
|
||||
val envelope = getEnvelope(bytes, encodingWhitelist)
|
||||
|
||||
logger.trace("deserialize blob scheme=\"${envelope.schema.toString()}\"")
|
||||
|
||||
clazz.cast(readObjectOrNull(envelope.obj, SerializationSchemas(envelope.schema, envelope.transformsSchema),
|
||||
clazz, context))
|
||||
}
|
||||
|
@ -275,9 +275,13 @@ class EvolutionSerializerGetter : EvolutionSerializerGetterBase() {
|
||||
// both the new and old fingerprint
|
||||
if (newSerializer is CollectionSerializer || newSerializer is MapSerializer) {
|
||||
newSerializer
|
||||
} else {
|
||||
} else if (newSerializer is EnumSerializer){
|
||||
EnumEvolutionSerializer.make(typeNotation, newSerializer, factory, schemas)
|
||||
}
|
||||
else {
|
||||
loggerFor<SerializerFactory>().error("typeNotation=${typeNotation.name} Need to evolve unsupported type")
|
||||
throw NotSerializableException ("${typeNotation.name} cannot be evolved")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -153,7 +153,13 @@ sealed class TypeNotation : DescribedType {
|
||||
abstract val descriptor: Descriptor
|
||||
}
|
||||
|
||||
data class CompositeType(override val name: String, override val label: String?, override val provides: List<String>, override val descriptor: Descriptor, val fields: List<Field>) : TypeNotation() {
|
||||
data class CompositeType(
|
||||
override val name: String,
|
||||
override val label: String?,
|
||||
override val provides: List<String>,
|
||||
override val descriptor: Descriptor,
|
||||
val fields: List<Field>
|
||||
) : TypeNotation() {
|
||||
companion object : DescribedTypeConstructor<CompositeType> {
|
||||
val DESCRIPTOR = AMQPDescriptorRegistry.COMPOSITE_TYPE.amqpDescriptor
|
||||
|
||||
|
@ -5,6 +5,7 @@ import com.google.common.reflect.TypeResolver
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.serialization.ClassWhitelist
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.serialization.internal.carpenter.*
|
||||
import org.apache.qpid.proton.amqp.*
|
||||
import java.io.NotSerializableException
|
||||
@ -54,6 +55,7 @@ open class SerializerFactory(
|
||||
serializersByDescriptor = ConcurrentHashMap(),
|
||||
customSerializers = CopyOnWriteArrayList(),
|
||||
transformsCache = ConcurrentHashMap())
|
||||
|
||||
constructor(whitelist: ClassWhitelist,
|
||||
classLoader: ClassLoader,
|
||||
evolutionSerializerGetter: EvolutionSerializerGetterBase = EvolutionSerializerGetter(),
|
||||
@ -74,6 +76,8 @@ open class SerializerFactory(
|
||||
private fun getEvolutionSerializer(typeNotation: TypeNotation, newSerializer: AMQPSerializer<Any>,
|
||||
schemas: SerializationSchemas) = evolutionSerializerGetter.getEvolutionSerializer(this, typeNotation, newSerializer, schemas)
|
||||
|
||||
private val logger = loggerFor<SerializerFactory>()
|
||||
|
||||
/**
|
||||
* Look up, and manufacture if necessary, a serializer for the given type.
|
||||
*
|
||||
@ -82,6 +86,9 @@ open class SerializerFactory(
|
||||
*/
|
||||
@Throws(NotSerializableException::class)
|
||||
fun get(actualClass: Class<*>?, declaredType: Type): AMQPSerializer<Any> {
|
||||
// can be useful to enable but will be *extremely* chatty if you do
|
||||
logger.trace { "Get Serializer for $actualClass ${declaredType.typeName}" }
|
||||
|
||||
val declaredClass = declaredType.asClass() ?: throw NotSerializableException(
|
||||
"Declared types of $declaredType are not supported.")
|
||||
|
||||
@ -107,10 +114,15 @@ open class SerializerFactory(
|
||||
makeMapSerializer(declaredTypeAmended)
|
||||
}
|
||||
}
|
||||
Enum::class.java.isAssignableFrom(actualClass
|
||||
?: declaredClass) -> serializersByType.computeIfAbsent(actualClass ?: declaredClass) {
|
||||
whitelist.requireWhitelisted(actualType)
|
||||
EnumSerializer(actualType, actualClass ?: declaredClass, this)
|
||||
Enum::class.java.isAssignableFrom(actualClass ?: declaredClass) -> {
|
||||
logger.debug("class=[${actualClass?.simpleName} | $declaredClass] is an enumeration "
|
||||
+ "declaredType=${declaredType.typeName} "
|
||||
+ "isEnum=${declaredType::class.java.isEnum}")
|
||||
|
||||
serializersByType.computeIfAbsent(actualClass ?: declaredClass) {
|
||||
whitelist.requireWhitelisted(actualType)
|
||||
EnumSerializer(actualType, actualClass ?: declaredClass, this)
|
||||
}
|
||||
}
|
||||
else -> {
|
||||
makeClassSerializer(actualClass ?: declaredClass, actualType, declaredType)
|
||||
@ -198,6 +210,7 @@ open class SerializerFactory(
|
||||
@Throws(NotSerializableException::class)
|
||||
fun get(typeDescriptor: Any, schema: SerializationSchemas): AMQPSerializer<Any> {
|
||||
return serializersByDescriptor[typeDescriptor] ?: {
|
||||
logger.trace("get Serializer descriptor=${typeDescriptor}")
|
||||
processSchema(FactorySchemaAndDescriptor(schema, typeDescriptor))
|
||||
serializersByDescriptor[typeDescriptor] ?: throw NotSerializableException(
|
||||
"Could not find type matching descriptor $typeDescriptor.")
|
||||
@ -232,16 +245,24 @@ open class SerializerFactory(
|
||||
private fun processSchema(schemaAndDescriptor: FactorySchemaAndDescriptor, sentinel: Boolean = false) {
|
||||
val metaSchema = CarpenterMetaSchema.newInstance()
|
||||
for (typeNotation in schemaAndDescriptor.schemas.schema.types) {
|
||||
logger.trace("descriptor=${schemaAndDescriptor.typeDescriptor}, typeNotation=${typeNotation.name}")
|
||||
try {
|
||||
val serialiser = processSchemaEntry(typeNotation)
|
||||
// if we just successfully built a serializer for the type but the type fingerprint
|
||||
// doesn't match that of the serialised object then we are dealing with different
|
||||
// instance of the class, as such we need to build an EvolutionSerializer
|
||||
if (serialiser.typeDescriptor != typeNotation.descriptor.name) {
|
||||
logger.info("typeNotation=${typeNotation.name} action=\"requires Evolution\"")
|
||||
getEvolutionSerializer(typeNotation, serialiser, schemaAndDescriptor.schemas)
|
||||
}
|
||||
} catch (e: ClassNotFoundException) {
|
||||
if (sentinel) throw e
|
||||
if (sentinel) {
|
||||
logger.error("typeNotation=${typeNotation.name} error=\"after Carpentry attempt failed to load\"")
|
||||
throw e
|
||||
}
|
||||
else {
|
||||
logger.info("typeNotation=\"${typeNotation.name}\" action=\"carpentry required\"")
|
||||
}
|
||||
metaSchema.buildFor(typeNotation, classloader)
|
||||
}
|
||||
}
|
||||
@ -270,8 +291,16 @@ open class SerializerFactory(
|
||||
}
|
||||
|
||||
private fun processSchemaEntry(typeNotation: TypeNotation) = when (typeNotation) {
|
||||
is CompositeType -> processCompositeType(typeNotation) // java.lang.Class (whether a class or interface)
|
||||
is RestrictedType -> processRestrictedType(typeNotation) // Collection / Map, possibly with generics
|
||||
// java.lang.Class (whether a class or interface)
|
||||
is CompositeType -> {
|
||||
logger.trace("typeNotation=${typeNotation.name} amqpType=CompositeType")
|
||||
processCompositeType(typeNotation)
|
||||
}
|
||||
// Collection / Map, possibly with generics
|
||||
is RestrictedType -> {
|
||||
logger.trace("typeNotation=${typeNotation.name} amqpType=RestrictedType")
|
||||
processRestrictedType(typeNotation)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: class loader logic, and compare the schema.
|
||||
@ -285,6 +314,7 @@ open class SerializerFactory(
|
||||
}
|
||||
|
||||
private fun makeClassSerializer(clazz: Class<*>, type: Type, declaredType: Type): AMQPSerializer<Any> = serializersByType.computeIfAbsent(type) {
|
||||
logger.debug("class=${clazz.simpleName}, type=$type is a composite type")
|
||||
if (clazz.isSynthetic) {
|
||||
// Explicitly ban synthetic classes, we have no way of recreating them when deserializing. This also
|
||||
// captures Lambda expressions and other anonymous functions
|
||||
|
@ -14,7 +14,15 @@ import java.lang.reflect.Type
|
||||
object InputStreamSerializer : CustomSerializer.Implements<InputStream>(InputStream::class.java) {
|
||||
override val revealSubclassesInSchema: Boolean = true
|
||||
|
||||
override val schemaForDocumentation = Schema(listOf(RestrictedType(type.toString(), "", listOf(type.toString()), SerializerFactory.primitiveTypeName(ByteArray::class.java)!!, descriptor, emptyList())))
|
||||
override val schemaForDocumentation = Schema(
|
||||
listOf(
|
||||
RestrictedType(
|
||||
type.toString(),
|
||||
"",
|
||||
listOf(type.toString()),
|
||||
SerializerFactory.primitiveTypeName(ByteArray::class.java)!!,
|
||||
descriptor,
|
||||
emptyList())))
|
||||
|
||||
override fun writeDescribedObject(obj: InputStream, data: Data, type: Type, output: SerializationOutput,
|
||||
context: SerializationContext
|
||||
|
@ -0,0 +1,53 @@
|
||||
package net.corda.serialization.internal.amqp
|
||||
|
||||
import net.corda.core.internal.InputStreamAndHash
|
||||
import net.corda.serialization.internal.amqp.custom.InputStreamSerializer
|
||||
import net.corda.serialization.internal.amqp.testutils.TestSerializationOutput
|
||||
import net.corda.serialization.internal.amqp.testutils.deserialize
|
||||
import net.corda.serialization.internal.amqp.testutils.testDefaultFactory
|
||||
import org.junit.Test
|
||||
import java.io.FilterInputStream
|
||||
import java.io.InputStream
|
||||
|
||||
class StreamTests {
|
||||
|
||||
private class WrapperStream(input: InputStream) : FilterInputStream(input)
|
||||
|
||||
@Test
|
||||
fun inputStream() {
|
||||
val attachment = InputStreamAndHash.createInMemoryTestZip(2116, 1)
|
||||
val id : InputStream = WrapperStream(attachment.inputStream)
|
||||
|
||||
val serializerFactory = testDefaultFactory().apply {
|
||||
register(InputStreamSerializer)
|
||||
}
|
||||
|
||||
val bytes = TestSerializationOutput(true, serializerFactory).serialize(id)
|
||||
|
||||
val deserializerFactory = testDefaultFactory().apply {
|
||||
register(InputStreamSerializer)
|
||||
}
|
||||
|
||||
DeserializationInput(serializerFactory).deserialize(bytes)
|
||||
DeserializationInput(deserializerFactory).deserialize(bytes)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun listInputStream() {
|
||||
val attachment = InputStreamAndHash.createInMemoryTestZip(2116, 1)
|
||||
val id /* : List<InputStream> */= listOf(WrapperStream(attachment.inputStream))
|
||||
|
||||
val serializerFactory = testDefaultFactory().apply {
|
||||
register(InputStreamSerializer)
|
||||
}
|
||||
|
||||
val bytes = TestSerializationOutput(true, serializerFactory).serialize(id)
|
||||
|
||||
val deserializerFactory = testDefaultFactory().apply {
|
||||
register(InputStreamSerializer)
|
||||
}
|
||||
|
||||
DeserializationInput(serializerFactory).deserialize(bytes)
|
||||
DeserializationInput(deserializerFactory).deserialize(bytes)
|
||||
}
|
||||
}
|
@ -30,6 +30,15 @@ class TestSerializationOutput(
|
||||
}
|
||||
super.writeTransformSchema(transformsSchema, data)
|
||||
}
|
||||
|
||||
@Throws(NotSerializableException::class)
|
||||
fun <T : Any> serialize(obj: T): SerializedBytes<T> {
|
||||
try {
|
||||
return _serialize(obj, testSerializationContext)
|
||||
} finally {
|
||||
andFinally()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun testName(): String = Thread.currentThread().stackTrace[2].methodName
|
||||
|
Loading…
x
Reference in New Issue
Block a user