mirror of
https://github.com/corda/corda.git
synced 2025-06-13 04:38:19 +00:00
CORDA-780 / CORDA-786 - Enable AMQP for P2P and Storage Contexts
Add plugable mechanism for CorDapps such that they can add their own custom serializers
This commit is contained in:
@ -2,6 +2,7 @@
|
||||
|
||||
package net.corda.nodeapi.internal.serialization.amqp
|
||||
|
||||
import net.corda.core.cordapp.Cordapp
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.nodeapi.internal.serialization.DefaultWhitelist
|
||||
@ -24,7 +25,8 @@ fun SerializerFactory.addToWhitelist(vararg types: Class<*>) {
|
||||
}
|
||||
}
|
||||
|
||||
abstract class AbstractAMQPSerializationScheme : SerializationScheme {
|
||||
abstract class AbstractAMQPSerializationScheme(val cordappLoader: List<Cordapp>) : SerializationScheme {
|
||||
|
||||
companion object {
|
||||
private val serializationWhitelists: List<SerializationWhitelist> by lazy {
|
||||
ServiceLoader.load(SerializationWhitelist::class.java, this::class.java.classLoader).toList() + DefaultWhitelist
|
||||
@ -62,8 +64,15 @@ abstract class AbstractAMQPSerializationScheme : SerializationScheme {
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.EnumSetSerializer(this))
|
||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.ContractAttachmentSerializer(this))
|
||||
}
|
||||
for (whitelistProvider in serializationWhitelists)
|
||||
for (whitelistProvider in serializationWhitelists) {
|
||||
factory.addToWhitelist(*whitelistProvider.whitelist.toTypedArray())
|
||||
}
|
||||
|
||||
cordappLoader.forEach { ca ->
|
||||
ca.serializationCustomSerializers.forEach {
|
||||
factory.registerExternal(CorDappCustomSerializer(it.newInstance(), factory))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val serializerFactoriesForContexts = ConcurrentHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>()
|
||||
@ -97,11 +106,11 @@ abstract class AbstractAMQPSerializationScheme : SerializationScheme {
|
||||
return SerializationOutput(serializerFactory).serialize(obj)
|
||||
}
|
||||
|
||||
protected fun canDeserializeVersion(byteSequence: ByteSequence): Boolean = AMQP_ENABLED && byteSequence == AmqpHeaderV1_0
|
||||
protected fun canDeserializeVersion(byteSequence: ByteSequence): Boolean = byteSequence == AmqpHeaderV1_0
|
||||
}
|
||||
|
||||
// TODO: This will eventually cover server RPC as well and move to node module, but for now this is not implemented
|
||||
class AMQPServerSerializationScheme : AbstractAMQPSerializationScheme() {
|
||||
class AMQPServerSerializationScheme(cordapps: List<Cordapp> = emptyList()) : AbstractAMQPSerializationScheme(cordapps) {
|
||||
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||
throw UnsupportedOperationException()
|
||||
}
|
||||
@ -118,7 +127,7 @@ class AMQPServerSerializationScheme : AbstractAMQPSerializationScheme() {
|
||||
}
|
||||
|
||||
// TODO: This will eventually cover client RPC as well and move to client module, but for now this is not implemented
|
||||
class AMQPClientSerializationScheme : AbstractAMQPSerializationScheme() {
|
||||
class AMQPClientSerializationScheme(cordapps: List<Cordapp> = emptyList()) : AbstractAMQPSerializationScheme(cordapps) {
|
||||
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
@ -0,0 +1,41 @@
|
||||
package net.corda.nodeapi.internal.serialization.amqp
|
||||
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.serialization.SerializationCustomSerializer
|
||||
import net.corda.nodeapi.internal.serialization.amqp.SerializerFactory.Companion.nameForType
|
||||
import org.apache.qpid.proton.amqp.Symbol
|
||||
import org.apache.qpid.proton.codec.Data
|
||||
import java.lang.reflect.Type
|
||||
|
||||
class CorDappCustomSerializer(
|
||||
private val serialiser: SerializationCustomSerializer,
|
||||
factory: SerializerFactory)
|
||||
: AMQPSerializer<Any>, SerializerFor {
|
||||
override val revealSubclassesInSchema: Boolean get() = false
|
||||
override val type: Type get() = serialiser.type
|
||||
override val typeDescriptor = Symbol.valueOf("$DESCRIPTOR_DOMAIN:${nameForType(type)}")
|
||||
val descriptor: Descriptor = Descriptor(typeDescriptor)
|
||||
|
||||
private val proxySerializer: ObjectSerializer by lazy { ObjectSerializer(serialiser.ptype, factory) }
|
||||
|
||||
override fun writeClassInfo(output: SerializationOutput) {}
|
||||
|
||||
override fun writeObject(obj: Any, data: Data, type: Type, output: SerializationOutput) {
|
||||
val proxy = serialiser.toProxy(obj)
|
||||
|
||||
data.withDescribed(descriptor) {
|
||||
data.withList {
|
||||
for (property in proxySerializer.propertySerializers) {
|
||||
property.writeProperty(proxy, this, output)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): Any {
|
||||
return serialiser.fromProxy(uncheckedCast(proxySerializer.readObject(obj, schema, input)))
|
||||
}
|
||||
|
||||
override fun isSerializerFor(clazz: Class<*>): Boolean = clazz == type
|
||||
}
|
||||
|
@ -6,22 +6,27 @@ import org.apache.qpid.proton.amqp.Symbol
|
||||
import org.apache.qpid.proton.codec.Data
|
||||
import java.lang.reflect.Type
|
||||
|
||||
interface SerializerFor {
|
||||
/**
|
||||
* This method should return true if the custom serializer can serialize an instance of the class passed as the
|
||||
* parameter.
|
||||
*/
|
||||
fun isSerializerFor(clazz: Class<*>): Boolean
|
||||
|
||||
val revealSubclassesInSchema: Boolean
|
||||
}
|
||||
|
||||
/**
|
||||
* Base class for serializers of core platform types that do not conform to the usual serialization rules and thus
|
||||
* cannot be automatically serialized.
|
||||
*/
|
||||
abstract class CustomSerializer<T : Any> : AMQPSerializer<T> {
|
||||
abstract class CustomSerializer<T : Any> : AMQPSerializer<T>, SerializerFor {
|
||||
/**
|
||||
* This is a collection of custom serializers that this custom serializer depends on. e.g. for proxy objects
|
||||
* that refer to other custom types etc.
|
||||
*/
|
||||
open val additionalSerializers: Iterable<CustomSerializer<out Any>> = emptyList()
|
||||
|
||||
/**
|
||||
* This method should return true if the custom serializer can serialize an instance of the class passed as the
|
||||
* parameter.
|
||||
*/
|
||||
abstract fun isSerializerFor(clazz: Class<*>): Boolean
|
||||
|
||||
protected abstract val descriptor: Descriptor
|
||||
/**
|
||||
@ -33,7 +38,7 @@ abstract class CustomSerializer<T : Any> : AMQPSerializer<T> {
|
||||
/**
|
||||
* Whether subclasses using this serializer via inheritance should have a mapping in the schema.
|
||||
*/
|
||||
open val revealSubclassesInSchema: Boolean = false
|
||||
override val revealSubclassesInSchema: Boolean get() = false
|
||||
|
||||
override fun writeObject(obj: Any, data: Data, type: Type, output: SerializationOutput) {
|
||||
data.withDescribed(descriptor) {
|
||||
|
@ -82,11 +82,13 @@ private fun <T : Any> propertiesForSerializationFromConstructor(kotlinConstructo
|
||||
for (param in kotlinConstructor.parameters) {
|
||||
val name = param.name ?: throw NotSerializableException("Constructor parameter of $clazz has no name.")
|
||||
val matchingProperty = properties[name] ?:
|
||||
throw NotSerializableException("No property matching constructor parameter named '$name' of '$clazz'." +
|
||||
" If using Java, check that you have the -parameters option specified in the Java compiler.")
|
||||
throw NotSerializableException("No property matching constructor parameter named '$name' of '$clazz'. " +
|
||||
"If using Java, check that you have the -parameters option specified in the Java compiler. " +
|
||||
"Alternately, provide a proxy serializer (SerializationCustomSerializer) if recompiling isn't an option")
|
||||
// Check that the method has a getter in java.
|
||||
val getter = matchingProperty.readMethod ?: throw NotSerializableException("Property has no getter method for $name of $clazz." +
|
||||
" If using Java and the parameter name looks anonymous, check that you have the -parameters option specified in the Java compiler.")
|
||||
val getter = matchingProperty.readMethod ?: throw NotSerializableException("Property has no getter method for $name of $clazz. " +
|
||||
"If using Java and the parameter name looks anonymous, check that you have the -parameters option specified in the Java compiler." +
|
||||
"Alternately, provide a proxy serializer (SerializationCustomSerializer) if recompiling isn't an option")
|
||||
val returnType = resolveTypeVariables(getter.genericReturnType, type)
|
||||
if (constructorParamTakesReturnTypeOfGetter(returnType, getter.genericReturnType, param)) {
|
||||
rc += PropertySerializer.make(name, getter, returnType, factory)
|
||||
|
@ -36,7 +36,7 @@ data class FactorySchemaAndDescriptor(val schemas: SerializationSchemas, val typ
|
||||
open class SerializerFactory(val whitelist: ClassWhitelist, cl: ClassLoader) {
|
||||
private val serializersByType = ConcurrentHashMap<Type, AMQPSerializer<Any>>()
|
||||
private val serializersByDescriptor = ConcurrentHashMap<Any, AMQPSerializer<Any>>()
|
||||
private val customSerializers = CopyOnWriteArrayList<CustomSerializer<out Any>>()
|
||||
private val customSerializers = CopyOnWriteArrayList<SerializerFor>()
|
||||
val transformsCache = ConcurrentHashMap<String, EnumMap<TransformTypes, MutableList<Transform>>>()
|
||||
|
||||
open val classCarpenter = ClassCarpenter(cl, whitelist)
|
||||
@ -196,6 +196,13 @@ open class SerializerFactory(val whitelist: ClassWhitelist, cl: ClassLoader) {
|
||||
}
|
||||
}
|
||||
|
||||
fun registerExternal(customSerializer: CorDappCustomSerializer) {
|
||||
if (!serializersByDescriptor.containsKey(customSerializer.typeDescriptor)) {
|
||||
customSerializers += customSerializer
|
||||
serializersByDescriptor[customSerializer.typeDescriptor] = customSerializer
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Iterate over an AMQP schema, for each type ascertain weather it's on ClassPath of [classloader] amd
|
||||
* if not use the [ClassCarpenter] to generate a class to use in it's place
|
||||
@ -267,11 +274,13 @@ open class SerializerFactory(val whitelist: ClassWhitelist, cl: ClassLoader) {
|
||||
for (customSerializer in customSerializers) {
|
||||
if (customSerializer.isSerializerFor(clazz)) {
|
||||
val declaredSuperClass = declaredType.asClass()?.superclass
|
||||
if (declaredSuperClass == null || !customSerializer.isSerializerFor(declaredSuperClass) || !customSerializer.revealSubclassesInSchema) {
|
||||
return customSerializer
|
||||
return if (declaredSuperClass == null
|
||||
|| !customSerializer.isSerializerFor(declaredSuperClass)
|
||||
|| !customSerializer.revealSubclassesInSchema) {
|
||||
customSerializer as? AMQPSerializer<Any>
|
||||
} else {
|
||||
// Make a subclass serializer for the subclass and return that...
|
||||
return CustomSerializer.SubClass(clazz, uncheckedCast(customSerializer))
|
||||
CustomSerializer.SubClass(clazz, uncheckedCast(customSerializer))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,92 @@
|
||||
package net.corda.nodeapi.internal.serialization.amqp
|
||||
|
||||
import org.junit.Test
|
||||
import net.corda.core.serialization.CordaCustomSerializer
|
||||
import net.corda.core.serialization.CordaCustomSerializerProxy
|
||||
import net.corda.core.serialization.SerializationCustomSerializer
|
||||
import java.lang.reflect.Type
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class CorDappSerializerTests {
|
||||
data class NeedsProxy (val a: String)
|
||||
|
||||
@CordaCustomSerializer
|
||||
class NeedsProxyProxySerializer : SerializationCustomSerializer {
|
||||
@CordaCustomSerializerProxy
|
||||
data class Proxy(val proxy_a_: String)
|
||||
|
||||
override val type: Type get() = NeedsProxy::class.java
|
||||
override val ptype: Type get() = Proxy::class.java
|
||||
|
||||
override fun fromProxy(proxy: Any) : Any {
|
||||
println ("NeedsProxyProxySerialiser - fromProxy")
|
||||
return NeedsProxy((proxy as Proxy).proxy_a_)
|
||||
}
|
||||
override fun toProxy(obj: Any) : Any {
|
||||
println ("NeedsProxyProxySerialiser - to Proxy")
|
||||
return Proxy((obj as NeedsProxy).a)
|
||||
}
|
||||
}
|
||||
|
||||
// Standard proxy serialiser used internally, here for comparison purposes
|
||||
class InternalProxySerialiser(factory: SerializerFactory) :
|
||||
CustomSerializer.Proxy<NeedsProxy, InternalProxySerialiser.Proxy> (
|
||||
NeedsProxy::class.java,
|
||||
InternalProxySerialiser.Proxy::class.java,
|
||||
factory) {
|
||||
data class Proxy(val proxy_a_: String)
|
||||
|
||||
override fun toProxy(obj: NeedsProxy): Proxy {
|
||||
println ("InternalProxySerialiser - toProxy")
|
||||
return Proxy(obj.a)
|
||||
}
|
||||
|
||||
override fun fromProxy(proxy: Proxy): NeedsProxy {
|
||||
println ("InternalProxySerialiser - fromProxy")
|
||||
return NeedsProxy(proxy.proxy_a_)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `type uses proxy`() {
|
||||
val internalProxyFactory = testDefaultFactory()
|
||||
val proxyFactory = testDefaultFactory()
|
||||
val defaultFactory = testDefaultFactory()
|
||||
|
||||
val msg = "help"
|
||||
|
||||
proxyFactory.registerExternal (CorDappCustomSerializer(NeedsProxyProxySerializer(), proxyFactory))
|
||||
internalProxyFactory.register (InternalProxySerialiser(internalProxyFactory))
|
||||
|
||||
val needsProxy = NeedsProxy(msg)
|
||||
|
||||
val bAndSProxy = SerializationOutput(proxyFactory).serializeAndReturnSchema (needsProxy)
|
||||
val bAndSInternal = SerializationOutput(internalProxyFactory).serializeAndReturnSchema (needsProxy)
|
||||
val bAndSDefault = SerializationOutput(defaultFactory).serializeAndReturnSchema (needsProxy)
|
||||
|
||||
val objFromDefault = DeserializationInput(defaultFactory).deserializeAndReturnEnvelope(bAndSDefault.obj)
|
||||
val objFromInternal = DeserializationInput(internalProxyFactory).deserializeAndReturnEnvelope(bAndSInternal.obj)
|
||||
val objFromProxy = DeserializationInput(proxyFactory).deserializeAndReturnEnvelope(bAndSProxy.obj)
|
||||
|
||||
assertEquals(msg, objFromDefault.obj.a)
|
||||
assertEquals(msg, objFromInternal.obj.a)
|
||||
assertEquals(msg, objFromProxy.obj.a)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun proxiedTypeIsNested() {
|
||||
data class A (val a: Int, val b: NeedsProxy)
|
||||
|
||||
val factory = testDefaultFactory()
|
||||
factory.registerExternal (CorDappCustomSerializer(NeedsProxyProxySerializer(), factory))
|
||||
|
||||
val tv1 = 100
|
||||
val tv2 = "pants schmants"
|
||||
val bAndS = SerializationOutput(factory).serializeAndReturnSchema (A(tv1, NeedsProxy(tv2)))
|
||||
|
||||
val objFromDefault = DeserializationInput(factory).deserializeAndReturnEnvelope(bAndS.obj)
|
||||
|
||||
assertEquals(tv1, objFromDefault.obj.a)
|
||||
assertEquals(tv2, objFromDefault.obj.b.a)
|
||||
}
|
||||
}
|
@ -25,7 +25,7 @@ class OverridePKSerializerTest {
|
||||
get() = TODO("not implemented") //To change initializer of created properties use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
class AMQPTestSerializationScheme : AbstractAMQPSerializationScheme() {
|
||||
class AMQPTestSerializationScheme : AbstractAMQPSerializationScheme(emptyList()) {
|
||||
override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory {
|
||||
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
@ -593,7 +593,7 @@ class SerializationOutputTests {
|
||||
fun `test transaction state`() {
|
||||
val state = TransactionState(FooState(), FOO_PROGRAM_ID, MEGA_CORP)
|
||||
|
||||
val scheme = AMQPServerSerializationScheme()
|
||||
val scheme = AMQPServerSerializationScheme(emptyList())
|
||||
val func = scheme::class.superclasses.single { it.simpleName == "AbstractAMQPSerializationScheme" }
|
||||
.java.getDeclaredMethod("registerCustomSerializers", SerializerFactory::class.java)
|
||||
func.isAccessible = true
|
||||
|
Reference in New Issue
Block a user