Merge pull request #2017 from corda/kat/feature/enableAMQP

CORDA-780 - Enable AMQP for P2P and Storage Contexts
This commit is contained in:
Katelyn Baker
2017-12-11 11:37:16 +00:00
committed by GitHub
32 changed files with 567 additions and 54 deletions

View File

@ -2,6 +2,7 @@
package net.corda.nodeapi.internal.serialization.amqp
import net.corda.core.cordapp.Cordapp
import net.corda.core.serialization.*
import net.corda.core.utilities.ByteSequence
import net.corda.nodeapi.internal.serialization.DefaultWhitelist
@ -24,7 +25,8 @@ fun SerializerFactory.addToWhitelist(vararg types: Class<*>) {
}
}
abstract class AbstractAMQPSerializationScheme : SerializationScheme {
abstract class AbstractAMQPSerializationScheme(val cordappLoader: List<Cordapp>) : SerializationScheme {
companion object {
private val serializationWhitelists: List<SerializationWhitelist> by lazy {
ServiceLoader.load(SerializationWhitelist::class.java, this::class.java.classLoader).toList() + DefaultWhitelist
@ -62,8 +64,15 @@ abstract class AbstractAMQPSerializationScheme : SerializationScheme {
register(net.corda.nodeapi.internal.serialization.amqp.custom.EnumSetSerializer(this))
register(net.corda.nodeapi.internal.serialization.amqp.custom.ContractAttachmentSerializer(this))
}
for (whitelistProvider in serializationWhitelists)
for (whitelistProvider in serializationWhitelists) {
factory.addToWhitelist(*whitelistProvider.whitelist.toTypedArray())
}
for (loader in cordappLoader) {
for (schema in loader.serializationCustomSerializers) {
factory.registerExternal(CorDappCustomSerializer(schema, factory))
}
}
}
private val serializerFactoriesForContexts = ConcurrentHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>()
@ -97,11 +106,11 @@ abstract class AbstractAMQPSerializationScheme : SerializationScheme {
return SerializationOutput(serializerFactory).serialize(obj)
}
protected fun canDeserializeVersion(byteSequence: ByteSequence): Boolean = AMQP_ENABLED && byteSequence == AmqpHeaderV1_0
protected fun canDeserializeVersion(byteSequence: ByteSequence): Boolean = byteSequence == AmqpHeaderV1_0
}
// TODO: This will eventually cover server RPC as well and move to node module, but for now this is not implemented
class AMQPServerSerializationScheme : AbstractAMQPSerializationScheme() {
class AMQPServerSerializationScheme(cordapps: List<Cordapp> = emptyList()) : AbstractAMQPSerializationScheme(cordapps) {
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
throw UnsupportedOperationException()
}
@ -118,7 +127,7 @@ class AMQPServerSerializationScheme : AbstractAMQPSerializationScheme() {
}
// TODO: This will eventually cover client RPC as well and move to client module, but for now this is not implemented
class AMQPClientSerializationScheme : AbstractAMQPSerializationScheme() {
class AMQPClientSerializationScheme(cordapps: List<Cordapp> = emptyList()) : AbstractAMQPSerializationScheme(cordapps) {
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}

View File

@ -0,0 +1,86 @@
package net.corda.nodeapi.internal.serialization.amqp
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.SerializationCustomSerializer
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory.Companion.nameForType
import org.apache.qpid.proton.amqp.Symbol
import org.apache.qpid.proton.codec.Data
import java.io.NotSerializableException
import java.lang.reflect.Type
import kotlin.reflect.jvm.javaType
import kotlin.reflect.jvm.jvmErasure
/**
* Index into the types list of the parent type of the serializer object, should be the
* type that this object proxies for
*/
const val CORDAPP_TYPE = 0
/**
* Index into the types list of the parent type of the serializer object, should be the
* type of the proxy object that we're using to represent the object we're proxying for
*/
const val PROXY_TYPE = 1
/**
* Wrapper class for user provided serializers
*
* Through the CorDapp JAR scanner we will have a list of custom serializer types that implement
* the toProxy and fromProxy methods. This class takes an instance of one of those objects and
* embeds it within a serialization context associated with a serializer factory by creating
* and instance of this class and registering that with a [SerializerFactory]
*
* Proxy serializers should transform an unserializable class into a representation that we can serialize
*
* @property serializer in instance of a user written serialization proxy, normally scanned and loaded
* automatically
* @property type the Java [Type] of the class which this serializes, inferred via reflection of the
* [serializer]'s super type
* @property proxyType the Java [Type] of the class into which instances of [type] are proxied for use by
* the underlying serialization engine
*
* @param factory a [SerializerFactory] belonging to the context this serializer is being instantiated
* for
*/
class CorDappCustomSerializer(
private val serializer: SerializationCustomSerializer<*, *>,
factory: SerializerFactory) : AMQPSerializer<Any>, SerializerFor {
override val revealSubclassesInSchema: Boolean get() = false
private val types = serializer::class.supertypes.filter { it.jvmErasure == SerializationCustomSerializer::class }
.flatMap { it.arguments }
.map { it.type!!.javaType }
init {
if (types.size != 2) {
throw NotSerializableException("Unable to determine serializer parent types")
}
}
override val type = types[CORDAPP_TYPE]
val proxyType = types[PROXY_TYPE]
override val typeDescriptor = Symbol.valueOf("$DESCRIPTOR_DOMAIN:${nameForType(type)}")
val descriptor: Descriptor = Descriptor(typeDescriptor)
private val proxySerializer: ObjectSerializer by lazy { ObjectSerializer(proxyType, factory) }
override fun writeClassInfo(output: SerializationOutput) {}
override fun writeObject(obj: Any, data: Data, type: Type, output: SerializationOutput) {
val proxy = uncheckedCast<SerializationCustomSerializer<*, *>,
SerializationCustomSerializer<Any?, Any?>>(serializer).toProxy(obj)
data.withDescribed(descriptor) {
data.withList {
for (property in proxySerializer.propertySerializers) {
property.writeProperty(proxy, this, output)
}
}
}
}
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput) =
uncheckedCast<SerializationCustomSerializer<*, *>, SerializationCustomSerializer<Any?, Any?>>(
serializer).fromProxy(uncheckedCast(proxySerializer.readObject(obj, schemas, input)))!!
override fun isSerializerFor(clazz: Class<*>) = clazz == type
}

View File

@ -6,22 +6,27 @@ import org.apache.qpid.proton.amqp.Symbol
import org.apache.qpid.proton.codec.Data
import java.lang.reflect.Type
interface SerializerFor {
/**
* This method should return true if the custom serializer can serialize an instance of the class passed as the
* parameter.
*/
fun isSerializerFor(clazz: Class<*>): Boolean
val revealSubclassesInSchema: Boolean
}
/**
* Base class for serializers of core platform types that do not conform to the usual serialization rules and thus
* cannot be automatically serialized.
*/
abstract class CustomSerializer<T : Any> : AMQPSerializer<T> {
abstract class CustomSerializer<T : Any> : AMQPSerializer<T>, SerializerFor {
/**
* This is a collection of custom serializers that this custom serializer depends on. e.g. for proxy objects
* that refer to other custom types etc.
*/
open val additionalSerializers: Iterable<CustomSerializer<out Any>> = emptyList()
/**
* This method should return true if the custom serializer can serialize an instance of the class passed as the
* parameter.
*/
abstract fun isSerializerFor(clazz: Class<*>): Boolean
protected abstract val descriptor: Descriptor
/**
@ -33,7 +38,7 @@ abstract class CustomSerializer<T : Any> : AMQPSerializer<T> {
/**
* Whether subclasses using this serializer via inheritance should have a mapping in the schema.
*/
open val revealSubclassesInSchema: Boolean = false
override val revealSubclassesInSchema: Boolean get() = false
override fun writeObject(obj: Any, data: Data, type: Type, output: SerializationOutput) {
data.withDescribed(descriptor) {

View File

@ -82,11 +82,13 @@ private fun <T : Any> propertiesForSerializationFromConstructor(kotlinConstructo
for (param in kotlinConstructor.parameters) {
val name = param.name ?: throw NotSerializableException("Constructor parameter of $clazz has no name.")
val matchingProperty = properties[name] ?:
throw NotSerializableException("No property matching constructor parameter named '$name' of '$clazz'." +
" If using Java, check that you have the -parameters option specified in the Java compiler.")
throw NotSerializableException("No property matching constructor parameter named '$name' of '$clazz'. " +
"If using Java, check that you have the -parameters option specified in the Java compiler. " +
"Alternately, provide a proxy serializer (SerializationCustomSerializer) if recompiling isn't an option")
// Check that the method has a getter in java.
val getter = matchingProperty.readMethod ?: throw NotSerializableException("Property has no getter method for $name of $clazz." +
" If using Java and the parameter name looks anonymous, check that you have the -parameters option specified in the Java compiler.")
val getter = matchingProperty.readMethod ?: throw NotSerializableException("Property has no getter method for $name of $clazz. " +
"If using Java and the parameter name looks anonymous, check that you have the -parameters option specified in the Java compiler." +
"Alternately, provide a proxy serializer (SerializationCustomSerializer) if recompiling isn't an option")
val returnType = resolveTypeVariables(getter.genericReturnType, type)
if (constructorParamTakesReturnTypeOfGetter(returnType, getter.genericReturnType, param)) {
rc += PropertySerializer.make(name, getter, returnType, factory)

View File

@ -36,7 +36,7 @@ data class FactorySchemaAndDescriptor(val schemas: SerializationSchemas, val typ
open class SerializerFactory(val whitelist: ClassWhitelist, cl: ClassLoader) {
private val serializersByType = ConcurrentHashMap<Type, AMQPSerializer<Any>>()
private val serializersByDescriptor = ConcurrentHashMap<Any, AMQPSerializer<Any>>()
private val customSerializers = CopyOnWriteArrayList<CustomSerializer<out Any>>()
private val customSerializers = CopyOnWriteArrayList<SerializerFor>()
val transformsCache = ConcurrentHashMap<String, EnumMap<TransformTypes, MutableList<Transform>>>()
open val classCarpenter = ClassCarpenter(cl, whitelist)
@ -196,9 +196,16 @@ open class SerializerFactory(val whitelist: ClassWhitelist, cl: ClassLoader) {
}
}
fun registerExternal(customSerializer: CorDappCustomSerializer) {
if (!serializersByDescriptor.containsKey(customSerializer.typeDescriptor)) {
customSerializers += customSerializer
serializersByDescriptor[customSerializer.typeDescriptor] = customSerializer
}
}
/**
* Iterate over an AMQP schema, for each type ascertain weather it's on ClassPath of [classloader] amd
* if not use the [ClassCarpenter] to generate a class to use in it's place
* Iterate over an AMQP schema, for each type ascertain whether it's on ClassPath of [classloader] and,
* if not, use the [ClassCarpenter] to generate a class to use in it's place.
*/
private fun processSchema(schemaAndDescriptor: FactorySchemaAndDescriptor, sentinel: Boolean = false) {
val metaSchema = CarpenterMetaSchema.newInstance()
@ -267,11 +274,13 @@ open class SerializerFactory(val whitelist: ClassWhitelist, cl: ClassLoader) {
for (customSerializer in customSerializers) {
if (customSerializer.isSerializerFor(clazz)) {
val declaredSuperClass = declaredType.asClass()?.superclass
if (declaredSuperClass == null || !customSerializer.isSerializerFor(declaredSuperClass) || !customSerializer.revealSubclassesInSchema) {
return customSerializer
return if (declaredSuperClass == null
|| !customSerializer.isSerializerFor(declaredSuperClass)
|| !customSerializer.revealSubclassesInSchema) {
customSerializer as? AMQPSerializer<Any>
} else {
// Make a subclass serializer for the subclass and return that...
return CustomSerializer.SubClass(clazz, uncheckedCast(customSerializer))
CustomSerializer.SubClass(clazz, uncheckedCast(customSerializer))
}
}
}

View File

@ -0,0 +1,148 @@
package net.corda.nodeapi.internal.serialization.amqp
import org.junit.Test
import net.corda.core.serialization.ClassWhitelist
import net.corda.core.serialization.SerializationCustomSerializer
import org.assertj.core.api.Assertions
import java.io.NotSerializableException
import kotlin.test.assertEquals
class CorDappSerializerTests {
data class NeedsProxy (val a: String)
class NeedsProxyProxySerializer : SerializationCustomSerializer<NeedsProxy, NeedsProxyProxySerializer.Proxy> {
data class Proxy(val proxy_a_: String)
override fun fromProxy(proxy: Proxy) = NeedsProxy(proxy.proxy_a_)
override fun toProxy(obj: NeedsProxy) = Proxy(obj.a)
}
// Standard proxy serializer used internally, here for comparison purposes
class InternalProxySerializer(factory: SerializerFactory) :
CustomSerializer.Proxy<NeedsProxy, InternalProxySerializer.Proxy> (
NeedsProxy::class.java,
InternalProxySerializer.Proxy::class.java,
factory) {
data class Proxy(val proxy_a_: String)
override fun toProxy(obj: NeedsProxy): Proxy {
return Proxy(obj.a)
}
override fun fromProxy(proxy: Proxy): NeedsProxy {
return NeedsProxy(proxy.proxy_a_)
}
}
@Test
fun `type uses proxy`() {
val internalProxyFactory = testDefaultFactory()
val proxyFactory = testDefaultFactory()
val defaultFactory = testDefaultFactory()
val msg = "help"
proxyFactory.registerExternal (CorDappCustomSerializer(NeedsProxyProxySerializer(), proxyFactory))
internalProxyFactory.register (InternalProxySerializer(internalProxyFactory))
val needsProxy = NeedsProxy(msg)
val bAndSProxy = SerializationOutput(proxyFactory).serializeAndReturnSchema (needsProxy)
val bAndSInternal = SerializationOutput(internalProxyFactory).serializeAndReturnSchema (needsProxy)
val bAndSDefault = SerializationOutput(defaultFactory).serializeAndReturnSchema (needsProxy)
val objFromDefault = DeserializationInput(defaultFactory).deserializeAndReturnEnvelope(bAndSDefault.obj)
val objFromInternal = DeserializationInput(internalProxyFactory).deserializeAndReturnEnvelope(bAndSInternal.obj)
val objFromProxy = DeserializationInput(proxyFactory).deserializeAndReturnEnvelope(bAndSProxy.obj)
assertEquals(msg, objFromDefault.obj.a)
assertEquals(msg, objFromInternal.obj.a)
assertEquals(msg, objFromProxy.obj.a)
}
@Test
fun proxiedTypeIsNested() {
data class A (val a: Int, val b: NeedsProxy)
val factory = testDefaultFactory()
factory.registerExternal (CorDappCustomSerializer(NeedsProxyProxySerializer(), factory))
val tv1 = 100
val tv2 = "pants schmants"
val bAndS = SerializationOutput(factory).serializeAndReturnSchema (A(tv1, NeedsProxy(tv2)))
val objFromDefault = DeserializationInput(factory).deserializeAndReturnEnvelope(bAndS.obj)
assertEquals(tv1, objFromDefault.obj.a)
assertEquals(tv2, objFromDefault.obj.b.a)
}
@Test
fun testWithWhitelistNotAllowed() {
data class A (val a: Int, val b: NeedsProxy)
class WL : ClassWhitelist {
private val allowedClasses = emptySet<String>()
override fun hasListed(type: Class<*>): Boolean = type.name in allowedClasses
}
val factory = SerializerFactory(WL(), ClassLoader.getSystemClassLoader())
factory.registerExternal (CorDappCustomSerializer(NeedsProxyProxySerializer(), factory))
val tv1 = 100
val tv2 = "pants schmants"
Assertions.assertThatThrownBy {
SerializationOutput(factory).serialize(A(tv1, NeedsProxy(tv2)))
}.isInstanceOf(NotSerializableException::class.java)
}
@Test
fun testWithWhitelistAllowed() {
data class A (val a: Int, val b: NeedsProxy)
class WL : ClassWhitelist {
private val allowedClasses = hashSetOf(
A::class.java.name,
NeedsProxy::class.java.name)
override fun hasListed(type: Class<*>): Boolean = type.name in allowedClasses
}
val factory = SerializerFactory(WL(), ClassLoader.getSystemClassLoader())
factory.registerExternal (CorDappCustomSerializer(NeedsProxyProxySerializer(), factory))
val tv1 = 100
val tv2 = "pants schmants"
val obj = DeserializationInput(factory).deserialize(
SerializationOutput(factory).serialize(A(tv1, NeedsProxy(tv2))))
assertEquals(tv1, obj.a)
assertEquals(tv2, obj.b.a)
}
// The custom type not being whitelisted won't matter here because the act of adding a
// custom serializer bypasses the whitelist
@Test
fun testWithWhitelistAllowedOuterOnly() {
data class A (val a: Int, val b: NeedsProxy)
class WL : ClassWhitelist {
// explicitly don't add NeedsProxy
private val allowedClasses = hashSetOf(A::class.java.name)
override fun hasListed(type: Class<*>): Boolean = type.name in allowedClasses
}
val factory = SerializerFactory(WL(), ClassLoader.getSystemClassLoader())
factory.registerExternal (CorDappCustomSerializer(NeedsProxyProxySerializer(), factory))
val tv1 = 100
val tv2 = "pants schmants"
val obj = DeserializationInput(factory).deserialize(
SerializationOutput(factory).serialize(A(tv1, NeedsProxy(tv2))))
assertEquals(tv1, obj.a)
assertEquals(tv2, obj.b.a)
}
}

View File

@ -25,7 +25,7 @@ class OverridePKSerializerTest {
get() = TODO("not implemented") //To change initializer of created properties use File | Settings | File Templates.
}
class AMQPTestSerializationScheme : AbstractAMQPSerializationScheme() {
class AMQPTestSerializationScheme : AbstractAMQPSerializationScheme(emptyList()) {
override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
}

View File

@ -593,7 +593,7 @@ class SerializationOutputTests {
fun `test transaction state`() {
val state = TransactionState(FooState(), FOO_PROGRAM_ID, MEGA_CORP)
val scheme = AMQPServerSerializationScheme()
val scheme = AMQPServerSerializationScheme(emptyList())
val func = scheme::class.superclasses.single { it.simpleName == "AbstractAMQPSerializationScheme" }
.java.getDeclaredMethod("registerCustomSerializers", SerializerFactory::class.java)
func.isAccessible = true