[CORDA-3501] - Provide option for user to specify custom serializers (#5837)

* [CORDA-3501] - Provide option for user to specify custom serializers

* Remove not needed integration test shemas
This commit is contained in:
Dimos Raptis 2020-01-09 11:29:16 +00:00 committed by Rick Parker
parent d0543d7270
commit ce774e459a
7 changed files with 169 additions and 16 deletions

View File

@ -0,0 +1,96 @@
package net.corda.client.rpc
import net.corda.core.serialization.SerializationCustomSerializer
import net.corda.core.serialization.internal._driverSerializationEnv
import net.corda.core.serialization.internal._rpcClientSerializationEnv
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.internal.enclosedCordapp
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
class RpcCustomSerializersTest {
@Test
fun `when custom serializers are not provided, the classpath is scanned to identify any existing ones`() {
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) {
val server = startNode(providedName = ALICE_NAME).get()
withSerializationEnvironmentsReset {
val client = CordaRPCClient(hostAndPort = server.rpcAddress)
val serializers = client.getRegisteredCustomSerializers()
assertThat(serializers).hasSize(1)
assertThat(serializers).hasOnlyElementsOfType(MySerializer::class.java)
}
}
}
@Test
fun `when an empty set of custom serializers is provided, no scanning is performed and this empty set is used instead`() {
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) {
val server = startNode(providedName = ALICE_NAME).get()
withSerializationEnvironmentsReset {
val client = CordaRPCClient(hostAndPort = server.rpcAddress, customSerializers = emptySet())
val serializers = client.getRegisteredCustomSerializers()
assertThat(serializers).isEmpty()
}
}
}
@Test
fun `when a set of custom serializers is explicitly provided, these are used instead of scanning the classpath`() {
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptyList())) {
val server = startNode(providedName = ALICE_NAME).get()
withSerializationEnvironmentsReset {
val client = CordaRPCClient(hostAndPort = server.rpcAddress, customSerializers = setOf(MySerializer()))
val serializers = client.getRegisteredCustomSerializers()
assertThat(serializers).hasSize(1)
assertThat(serializers).hasOnlyElementsOfType(MySerializer::class.java)
}
}
}
/**
* This is done to avoid re-using the serialization environment setup by the driver the same way
* it will happen when a client is initialised outside a node.
*/
private fun withSerializationEnvironmentsReset(block: () -> Unit) {
val driverSerializationEnv = _driverSerializationEnv.get()
try {
if (driverSerializationEnv != null) {
_driverSerializationEnv.set(null)
}
block()
} finally {
if (driverSerializationEnv != null) {
_driverSerializationEnv.set(driverSerializationEnv)
}
if (_rpcClientSerializationEnv.get() != null) {
_rpcClientSerializationEnv.set(null)
}
}
}
class MySerializer: SerializationCustomSerializer<MyMagicClass, MySerializer.Proxy> {
class Proxy(val magicObject: MyMagicClass)
override fun fromProxy(proxy: Proxy): MyMagicClass {
return MyMagicClass(proxy.magicObject.someField)
}
override fun toProxy(obj: MyMagicClass): Proxy {
return Proxy(obj)
}
}
class MyMagicClass(val someField: String)
}

View File

@ -10,6 +10,7 @@ import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.createInstancesOfClassesImplementing
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.messaging.ClientRpcSslOptions
@ -19,10 +20,12 @@ import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.serialization.internal.effectiveSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.days
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcConnectorTcpTransport
import net.corda.serialization.internal.AMQP_RPC_CLIENT_CONTEXT
import net.corda.serialization.internal.SerializationFactoryImpl
import net.corda.serialization.internal.amqp.SerializationFactoryCacheKey
import net.corda.serialization.internal.amqp.SerializerFactory
import java.time.Duration
@ -339,16 +342,19 @@ class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -
* The client will attempt to connect to a live server by trying each address in the list. If the servers are not in
* HA mode, the client will round-robin from the beginning of the list and try all servers.
* @param classLoader a classloader, which will be used (if provided) to discover available [SerializationCustomSerializer]s
* and [SerializationWhitelist]s
* If the created RPC client is intended to use types with custom serializers / whitelists,
* a classloader will need to be provided that contains the associated CorDapp jars.
* and [SerializationWhitelist]s. If no classloader is provided, the classloader of the current class will be used by default
* for the aforementioned discovery process.
* @param customSerializers a set of [SerializationCustomSerializer]s to be used. If this parameter is specified, then no classpath scanning
* will be performed for custom serializers, the provided ones will be used instead. This parameter serves as a more user-friendly option
* to specify your serializers and disable the classpath scanning (e.g. for performance reasons).
*/
class CordaRPCClient private constructor(
private val hostAndPort: NetworkHostAndPort?,
private val haAddressPool: List<NetworkHostAndPort>,
private val configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
private val sslConfiguration: ClientRpcSslOptions? = null,
private val classLoader: ClassLoader? = null
private val classLoader: ClassLoader? = null,
private val customSerializers: Set<SerializationCustomSerializer<*, *>>? = null
) {
@JvmOverloads
@ -411,30 +417,74 @@ class CordaRPCClient private constructor(
classLoader = classLoader
)
@JvmOverloads
constructor(
hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null,
customSerializers: Set<SerializationCustomSerializer<*, *>>?
) : this(
hostAndPort = hostAndPort,
haAddressPool = emptyList(),
configuration = configuration,
sslConfiguration = sslConfiguration,
classLoader = classLoader,
customSerializers = customSerializers
)
@JvmOverloads
constructor(
haAddressPool: List<NetworkHostAndPort>,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null,
customSerializers: Set<SerializationCustomSerializer<*, *>>?
) : this(
hostAndPort = null,
haAddressPool = haAddressPool,
configuration = configuration,
sslConfiguration = sslConfiguration,
classLoader = classLoader,
customSerializers = customSerializers
)
// Here to keep the keep ABI compatibility happy
companion object {}
@CordaInternal
@VisibleForTesting
fun getRegisteredCustomSerializers(): List<SerializationCustomSerializer<*, *>> {
return (effectiveSerializationEnv.serializationFactory as SerializationFactoryImpl).getRegisteredSchemes()
.filterIsInstance<AMQPClientSerializationScheme>()
.flatMap { it.getRegisteredCustomSerializers() }
}
init {
try {
effectiveSerializationEnv
} catch (e: IllegalStateException) {
try {
val cache = Caffeine.newBuilder().maximumSize(128).build<SerializationFactoryCacheKey, SerializerFactory>().asMap()
val cache = Caffeine.newBuilder().maximumSize(128)
.build<SerializationFactoryCacheKey, SerializerFactory>().asMap()
// If the client has explicitly provided a classloader use this one to scan for custom serializers,
// otherwise use the current one.
val serializationClassLoader = this.classLoader ?: this.javaClass.classLoader
val customSerializers = createInstancesOfClassesImplementing(
// If the client has explicitly provided a set of custom serializers, avoid performing any scanning and use these instead.
val discoveredCustomSerializers = customSerializers ?: createInstancesOfClassesImplementing(
serializationClassLoader,
SerializationCustomSerializer::class.java
)
val serializationWhitelists = ServiceLoader.load(
SerializationWhitelist::class.java,
serializationClassLoader
).toSet()
AMQPClientSerializationScheme.initialiseSerialization(
serializationClassLoader,
customSerializers,
discoveredCustomSerializers,
serializationWhitelists,
cache
)

View File

@ -7,6 +7,7 @@ import net.corda.core.serialization.SerializationContext.UseCase
import net.corda.core.serialization.SerializationCustomSerializer
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.serialization.internal.SerializationEnvironment
import net.corda.core.serialization.internal._rpcClientSerializationEnv
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.serialization.internal.*
import net.corda.serialization.internal.amqp.*
@ -35,7 +36,8 @@ class AMQPClientSerializationScheme(
companion object {
/** Call from main only. */
fun initialiseSerialization(classLoader: ClassLoader? = null, customSerializers: Set<SerializationCustomSerializer<*, *>> = emptySet(), serializationWhitelists: Set<SerializationWhitelist> = emptySet(), serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory> = AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised()) {
nodeSerializationEnv = createSerializationEnv(classLoader, customSerializers, serializationWhitelists, serializerFactoriesForContexts)
_rpcClientSerializationEnv.set(createSerializationEnv(classLoader, customSerializers,
serializationWhitelists, serializerFactoriesForContexts))
}
fun createSerializationEnv(classLoader: ClassLoader? = null, customSerializers: Set<SerializationCustomSerializer<*, *>> = emptySet(), serializationWhitelists: Set<SerializationWhitelist> = emptySet(), serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory> = AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised()): SerializationEnvironment {

View File

@ -75,6 +75,8 @@ var nodeSerializationEnv by _nodeSerializationEnv
val _driverSerializationEnv = SimpleToggleField<SerializationEnvironment>("driverSerializationEnv")
val _rpcClientSerializationEnv = SimpleToggleField<SerializationEnvironment>("rpcClientSerializationEnv")
val _contextSerializationEnv = ThreadLocalToggleField<SerializationEnvironment>("contextSerializationEnv")
val _inheritableContextSerializationEnv = InheritableThreadLocalToggleField<SerializationEnvironment>("inheritableContextSerializationEnv") { stack ->
@ -89,7 +91,8 @@ private val serializationEnvFields = listOf(
_nodeSerializationEnv,
_driverSerializationEnv,
_contextSerializationEnv,
_inheritableContextSerializationEnv
_inheritableContextSerializationEnv,
_rpcClientSerializationEnv
)
val _allEnabledSerializationEnvs: List<Pair<String, SerializationEnvironment>>

View File

@ -120,7 +120,6 @@
<ID>ComplexMethod:CheckpointDumper.kt$CheckpointDumper$fun dump()</ID>
<ID>ComplexMethod:CheckpointDumper.kt$CheckpointDumper$private fun FlowIORequest&lt;*&gt;.toSuspendedOn(suspendedTimestamp: Instant, now: Instant): SuspendedOn</ID>
<ID>ComplexMethod:ClassCarpenter.kt$ClassCarpenterImpl$ private fun validateSchema(schema: Schema)</ID>
<ID>ComplexMethod:CollectSignaturesFlow.kt$CollectSignaturesFlow$@Suspendable override fun call(): SignedTransaction</ID>
<ID>ComplexMethod:CompatibleTransactionTests.kt$CompatibleTransactionTests$@Test fun `Command visibility tests`()</ID>
<ID>ComplexMethod:ConfigUtilities.kt$// For Iterables figure out the type parameter and apply the same logic as above on the individual elements. private fun Iterable&lt;*&gt;.toConfigIterable(field: Field): Iterable&lt;Any?&gt;</ID>
<ID>ComplexMethod:ConfigUtilities.kt$// TODO Move this to KeyStoreConfigHelpers. fun MutualSslConfiguration.configureDevKeyAndTrustStores(myLegalName: CordaX500Name, signingCertificateStore: FileBasedCertificateStoreSupplier, certificatesDirectory: Path, cryptoService: CryptoService? = null)</ID>
@ -1262,7 +1261,6 @@
<ID>MaxLineLength:AMQPClientSerializationScheme.kt$AMQPClientSerializationScheme$return SerializerFactoryBuilder.build(context.whitelist, context.deserializationClassLoader, context.lenientCarpenterEnabled).apply { register(RpcClientObservableDeSerializer) register(RpcClientCordaFutureSerializer(this)) register(RxNotificationSerializer(this)) }</ID>
<ID>MaxLineLength:AMQPClientSerializationScheme.kt$AMQPClientSerializationScheme.Companion$ fun initialiseSerialization(classLoader: ClassLoader? = null, customSerializers: Set&lt;SerializationCustomSerializer&lt;*, *&gt;&gt; = emptySet(), serializationWhitelists: Set&lt;SerializationWhitelist&gt; = emptySet(), serializerFactoriesForContexts: MutableMap&lt;SerializationFactoryCacheKey, SerializerFactory&gt; = AccessOrderLinkedHashMap&lt;SerializationFactoryCacheKey, SerializerFactory&gt;(128).toSynchronised())</ID>
<ID>MaxLineLength:AMQPClientSerializationScheme.kt$AMQPClientSerializationScheme.Companion$fun createSerializationEnv(classLoader: ClassLoader? = null, customSerializers: Set&lt;SerializationCustomSerializer&lt;*, *&gt;&gt; = emptySet(), serializationWhitelists: Set&lt;SerializationWhitelist&gt; = emptySet(), serializerFactoriesForContexts: MutableMap&lt;SerializationFactoryCacheKey, SerializerFactory&gt; = AccessOrderLinkedHashMap&lt;SerializationFactoryCacheKey, SerializerFactory&gt;(128).toSynchronised()): SerializationEnvironment</ID>
<ID>MaxLineLength:AMQPClientSerializationScheme.kt$AMQPClientSerializationScheme.Companion$nodeSerializationEnv = createSerializationEnv(classLoader, customSerializers, serializationWhitelists, serializerFactoriesForContexts)</ID>
<ID>MaxLineLength:AMQPClientSerializationScheme.kt$AMQPClientSerializationScheme.Companion$registerScheme(AMQPClientSerializationScheme(customSerializers, serializationWhitelists, serializerFactoriesForContexts))</ID>
<ID>MaxLineLength:AMQPSerializationScheme.kt$AbstractAMQPSerializationScheme$val key = SerializationFactoryCacheKey(context.whitelist, context.deserializationClassLoader, context.preventDataLoss, context.customSerializers)</ID>
<ID>MaxLineLength:AMQPSerializationScheme.kt$AbstractAMQPSerializationScheme${ // This is a hack introduced in version 3 to fix a spring boot issue - CORDA-1747. // It breaks the shell because it overwrites the CordappClassloader with the system classloader that doesn't know about any CorDapps. // In case a spring boot serialization issue with generics is found, a better solution needs to be found to address it. // var contextToUse = context // if (context.useCase == SerializationContext.UseCase.RPCClient) { // contextToUse = context.withClassLoader(getContextClassLoader()) // } val serializerFactory = getSerializerFactory(context) return DeserializationInput(serializerFactory).deserialize(byteSequence, clazz, context) }</ID>
@ -1621,9 +1619,7 @@
<ID>MaxLineLength:ClockUtilsTest.kt$ClockUtilsTest$assertFalse(NodeSchedulerService.awaitWithDeadline(stoppedClock, stoppedClock.instant().minus(1.hours), future), "Should have reached deadline")</ID>
<ID>MaxLineLength:ClockUtilsTest.kt$ClockUtilsTest$assertTrue(NodeSchedulerService.awaitWithDeadline(stoppedClock, advancedClock.instant(), future), "Should not have reached deadline")</ID>
<ID>MaxLineLength:CollectSignaturesFlow.kt$CollectSignatureFlow : FlowLogic</ID>
<ID>MaxLineLength:CollectSignaturesFlow.kt$CollectSignaturesFlow$else -&gt; throw IllegalArgumentException("Signatures can only be collected from Party or AnonymousParty, not $destination")</ID>
<ID>MaxLineLength:CollectSignaturesFlow.kt$CollectSignaturesFlow$override val progressTracker: ProgressTracker = CollectSignaturesFlow.tracker()</ID>
<ID>MaxLineLength:CollectSignaturesFlow.kt$CollectSignaturesFlow$val wellKnownParty = checkNotNull(keyToSigningParty[it]) { "Could not find a session or wellKnown party for key ${it.toStringShort()}" }</ID>
<ID>MaxLineLength:CollectSignaturesFlow.kt$SignTransactionFlow$override val progressTracker: ProgressTracker = SignTransactionFlow.tracker()</ID>
<ID>MaxLineLength:CollectSignaturesFlowTests.kt$CollectSignaturesFlowTests.TestFlow.Initiator$val sessions = excludeHostNode(serviceHub, groupAbstractPartyByWellKnownParty(serviceHub, state.owners)).map { initiateFlow(it.key) }</ID>
<ID>MaxLineLength:CollectionSerializer.kt$CollectionSerializer$private val typeNotation: TypeNotation = RestrictedType(AMQPTypeIdentifiers.nameForType(declaredType), null, emptyList(), "list", Descriptor(typeDescriptor), emptyList())</ID>
@ -4224,6 +4220,7 @@
<ID>TopLevelPropertyNaming:SerializationEnvironment.kt$val _contextSerializationEnv = ThreadLocalToggleField&lt;SerializationEnvironment&gt;("contextSerializationEnv")</ID>
<ID>TopLevelPropertyNaming:SerializationEnvironment.kt$val _driverSerializationEnv = SimpleToggleField&lt;SerializationEnvironment&gt;("driverSerializationEnv")</ID>
<ID>TopLevelPropertyNaming:SerializationEnvironment.kt$val _inheritableContextSerializationEnv = InheritableThreadLocalToggleField&lt;SerializationEnvironment&gt;("inheritableContextSerializationEnv") { stack -&gt; stack.fold(false) { isAGlobalThreadBeingCreated, e -&gt; isAGlobalThreadBeingCreated || (e.className == "io.netty.util.concurrent.GlobalEventExecutor" &amp;&amp; e.methodName == "startThread") || (e.className == "java.util.concurrent.ForkJoinPool\$DefaultForkJoinWorkerThreadFactory" &amp;&amp; e.methodName == "newThread") } }</ID>
<ID>TopLevelPropertyNaming:SerializationEnvironment.kt$val _rpcClientSerializationEnv = SimpleToggleField&lt;SerializationEnvironment&gt;("rpcClientSerializationEnv")</ID>
<ID>TopLevelPropertyNaming:SerializationFormat.kt$const val encodingNotPermittedFormat = "Encoding not permitted: %s"</ID>
<ID>VariableNaming:AttachmentsClassLoaderSerializationTests.kt$AttachmentsClassLoaderSerializationTests$val DUMMY_NOTARY = TestIdentity(DUMMY_NOTARY_NAME, 20).party</ID>
<ID>VariableNaming:AttachmentsClassLoaderSerializationTests.kt$AttachmentsClassLoaderSerializationTests$val MEGA_CORP = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party</ID>
@ -4432,9 +4429,6 @@
<ID>WildcardImport:ClassCarpenterTestUtils.kt$import net.corda.serialization.internal.amqp.*</ID>
<ID>WildcardImport:ClassCarpenterTestUtils.kt$import net.corda.serialization.internal.model.*</ID>
<ID>WildcardImport:CloseableTab.kt$import tornadofx.*</ID>
<ID>WildcardImport:CollectSignaturesFlowTests.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:CollectSignaturesFlowTests.kt$import net.corda.core.identity.*</ID>
<ID>WildcardImport:CollectSignaturesFlowTests.kt$import net.corda.testing.core.*</ID>
<ID>WildcardImport:CommandLineCompatibilityCheckerTest.kt$import org.hamcrest.CoreMatchers.*</ID>
<ID>WildcardImport:CommercialPaper.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:CommercialPaperIssueFlow.kt$import net.corda.core.flows.*</ID>

View File

@ -3,6 +3,7 @@ package net.corda.serialization.internal
import net.corda.core.DeleteForDJVM
import net.corda.core.KeepForDJVM
import net.corda.core.crypto.SecureHash
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.copyBytes
import net.corda.core.serialization.*
import net.corda.core.utilities.ByteSequence
@ -81,6 +82,9 @@ open class SerializationFactoryImpl(
val magicSize = amqpMagic.size
}
@VisibleForTesting
fun getRegisteredSchemes() = registeredSchemes
private val creator: List<StackTraceElement> = Exception().stackTrace.asList()
private val registeredSchemes: MutableCollection<SerializationScheme> = Collections.synchronizedCollection(mutableListOf())

View File

@ -6,6 +6,7 @@ import net.corda.core.DeleteForDJVM
import net.corda.core.KeepForDJVM
import net.corda.core.StubOutForDJVM
import net.corda.core.cordapp.Cordapp
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.toSynchronised
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.*
@ -53,6 +54,9 @@ abstract class AbstractAMQPSerializationScheme(
AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised()
)
@VisibleForTesting
fun getRegisteredCustomSerializers() = cordappCustomSerializers
// This is a bit gross but a broader check for ConcurrentMap is not allowed inside DJVM.
private val serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory> =
if (maybeNotConcurrentSerializerFactoriesForContexts is