[CORDA-2390] - Add whitelists and custom serializers from cordapps to serialization … (#4551)

* Add whitelists and custom serializers from cordapps to serialization context

* Remove changes in TransactionBuilder, add caching

* Add whitelists and custom serializers from cordapps to serialization context

* Remove changes in TransactionBuilder, add caching

* Address comments

* Increase node memory for SIMM integration test

* Cache only serialization context

* Increase integ test timeout

* Fix API breakage

* Increase max heap size for web server integ test

* Move classloading utils from separate module to core.internal

* Adjust heap size for more integ tests

* Increase time window for IRS demo transactions

* Fix determinator

* Add parameter in core-deterministic

* Stub out class-loading method for DJVM
This commit is contained in:
Dimos Raptis 2019-01-13 20:15:05 +00:00 committed by Gavin Thomas
parent 36cd9b9791
commit 5b34020e59
37 changed files with 278 additions and 124 deletions

View File

@ -3,12 +3,15 @@ package net.corda.client.rpc
import com.github.benmanes.caffeine.cache.Caffeine
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
import net.corda.core.internal.loadClassesImplementing
import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.messaging.ClientRpcSslOptions
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.serialization.SerializationCustomSerializer
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.serialization.internal.effectiveSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.days
@ -19,6 +22,7 @@ import net.corda.serialization.internal.AMQP_RPC_CLIENT_CONTEXT
import net.corda.serialization.internal.amqp.SerializationFactoryCacheKey
import net.corda.serialization.internal.amqp.SerializerFactory
import java.time.Duration
import java.util.ServiceLoader
/**
* This class is essentially just a wrapper for an RPCConnection<CordaRPCOps> and can be treated identically.
@ -240,6 +244,7 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
* @param configuration An optional configuration used to tweak client behaviour.
* @param sslConfiguration An optional [ClientRpcSslOptions] used to enable secure communication with the server.
* @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode.
* @param classLoader a classloader, which will be used (if provided) to discover available [SerializationCustomSerializer]s and [SerializationWhitelist]s
* The client will attempt to connect to a live server by trying each address in the list. If the servers are not in
* HA mode, the client will round-robin from the beginning of the list and try all servers.
*/
@ -252,8 +257,9 @@ class CordaRPCClient private constructor(
) {
@JvmOverloads
constructor(hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT)
: this(hostAndPort, configuration, null)
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
classLoader: ClassLoader? = null)
: this(hostAndPort, configuration, null, classLoader = classLoader)
/**
* @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode.
@ -287,7 +293,7 @@ class CordaRPCClient private constructor(
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null
): CordaRPCClient {
return CordaRPCClient(hostAndPort, configuration, sslConfiguration, classLoader)
return CordaRPCClient(hostAndPort, configuration, sslConfiguration, classLoader = classLoader)
}
}
@ -296,7 +302,14 @@ class CordaRPCClient private constructor(
effectiveSerializationEnv
} catch (e: IllegalStateException) {
try {
AMQPClientSerializationScheme.initialiseSerialization(classLoader, Caffeine.newBuilder().maximumSize(128).build<SerializationFactoryCacheKey, SerializerFactory>().asMap())
// If the client has provided a classloader, the associated classpath is checked for available custom serializers and serialization whitelists.
if (classLoader != null) {
val customSerializers = loadClassesImplementing(classLoader, SerializationCustomSerializer::class.java)
val serializationWhitelists = ServiceLoader.load(SerializationWhitelist::class.java, classLoader).toSet()
AMQPClientSerializationScheme.initialiseSerialization(classLoader, customSerializers, serializationWhitelists, Caffeine.newBuilder().maximumSize(128).build<SerializationFactoryCacheKey, SerializerFactory>().asMap())
} else {
AMQPClientSerializationScheme.initialiseSerialization(classLoader, serializerFactoriesForContexts = Caffeine.newBuilder().maximumSize(128).build<SerializationFactoryCacheKey, SerializerFactory>().asMap())
}
} catch (e: IllegalStateException) {
// Race e.g. two of these constructed in parallel, ignore.
}

View File

@ -5,6 +5,7 @@ import net.corda.core.internal.toSynchronised
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationContext.UseCase
import net.corda.core.serialization.SerializationCustomSerializer
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.serialization.internal.SerializationEnvironment
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.serialization.internal.*
@ -17,24 +18,25 @@ import net.corda.serialization.internal.amqp.custom.RxNotificationSerializer
*/
class AMQPClientSerializationScheme(
cordappCustomSerializers: Set<SerializationCustomSerializer<*,*>>,
cordappSerializationWhitelists: Set<SerializationWhitelist>,
serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory>
) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) {
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised())
constructor(cordapps: List<Cordapp>, serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory>) : this(cordapps.customSerializers, serializerFactoriesForContexts)
) : AbstractAMQPSerializationScheme(cordappCustomSerializers, cordappSerializationWhitelists, serializerFactoriesForContexts) {
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, cordapps.serializationWhitelists, AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised())
constructor(cordapps: List<Cordapp>, serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory>) : this(cordapps.customSerializers, cordapps.serializationWhitelists, serializerFactoriesForContexts)
@Suppress("UNUSED")
constructor() : this(emptySet(), AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised())
constructor() : this(emptySet(), emptySet(), AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised())
companion object {
/** Call from main only. */
fun initialiseSerialization(classLoader: ClassLoader? = null, serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory> = AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised()) {
nodeSerializationEnv = createSerializationEnv(classLoader, serializerFactoriesForContexts)
fun initialiseSerialization(classLoader: ClassLoader? = null, customSerializers: Set<SerializationCustomSerializer<*, *>> = emptySet(), serializationWhitelists: Set<SerializationWhitelist> = emptySet(), serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory> = AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised()) {
nodeSerializationEnv = createSerializationEnv(classLoader, customSerializers, serializationWhitelists, serializerFactoriesForContexts)
}
fun createSerializationEnv(classLoader: ClassLoader? = null, serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory> = AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised()): SerializationEnvironment {
fun createSerializationEnv(classLoader: ClassLoader? = null, customSerializers: Set<SerializationCustomSerializer<*, *>> = emptySet(), serializationWhitelists: Set<SerializationWhitelist> = emptySet(), serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory> = AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised()): SerializationEnvironment {
return SerializationEnvironment.with(
SerializationFactoryImpl().apply {
registerScheme(AMQPClientSerializationScheme(emptyList(), serializerFactoriesForContexts))
registerScheme(AMQPClientSerializationScheme(customSerializers, serializationWhitelists, serializerFactoriesForContexts))
},
storageContext = AMQP_STORAGE_CONTEXT,
p2pContext = if (classLoader != null) AMQP_P2P_CONTEXT.withClassLoader(classLoader) else AMQP_P2P_CONTEXT,

View File

@ -0,0 +1,8 @@
package net.corda.core.internal
/**
* Stubbing out non-deterministic method.
*/
fun <T: Any> loadClassesImplementing(classloader: ClassLoader, clazz: Class<T>): Set<T> {
return emptySet()
}

View File

@ -4,6 +4,7 @@ import net.corda.core.serialization.ClassWhitelist
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationContext.UseCase.P2P
import net.corda.core.serialization.SerializationCustomSerializer
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.serialization.internal.SerializationEnvironment
import net.corda.core.serialization.internal._contextSerializationEnv
import net.corda.serialization.internal.*
@ -57,15 +58,16 @@ class LocalSerializationRule(private val label: String) : TestRule {
private fun createTestSerializationEnv(): SerializationEnvironment {
val factory = SerializationFactoryImpl(mutableMapOf()).apply {
registerScheme(AMQPSerializationScheme(emptySet(), AccessOrderLinkedHashMap(128)))
registerScheme(AMQPSerializationScheme(emptySet(), emptySet(), AccessOrderLinkedHashMap(128)))
}
return SerializationEnvironment.with(factory, AMQP_P2P_CONTEXT)
}
private class AMQPSerializationScheme(
cordappCustomSerializers: Set<SerializationCustomSerializer<*, *>>,
cordappSerializationWhitelists: Set<SerializationWhitelist>,
serializerFactoriesForContexts: AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>
) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) {
) : AbstractAMQPSerializationScheme(cordappCustomSerializers, cordappSerializationWhitelists, serializerFactoriesForContexts) {
override fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory {
throw UnsupportedOperationException()
}

View File

@ -111,6 +111,8 @@ dependencies {
// required to use @Type annotation
compile "org.hibernate:hibernate-core:$hibernate_version"
compile group: "io.github.classgraph", name: "classgraph", version: class_graph_version
}
// TODO Consider moving it to quasar-utils in the future (introduced with PR-1388)

View File

@ -0,0 +1,31 @@
package net.corda.core.internal
import io.github.classgraph.ClassGraph
import net.corda.core.CordaInternal
import net.corda.core.DeleteForDJVM
import net.corda.core.StubOutForDJVM
import kotlin.reflect.full.createInstance
/**
* Creates instances of all the classes in the classpath of the provided classloader, which implement the interface of the provided class.
* @param classloader the classloader, which will be searched for the classes.
* @param clazz the class of the interface, which the classes - to be returned - must implement.
*
* @return instances of the identified classes.
* @throws IllegalArgumentException if the classes found do not have proper constructors.
*
* Note: In order to be instantiated, the associated classes must:
* - be non-abstract
* - either be a Kotlin object or have a constructor with no parameters (or only optional ones)
*/
@StubOutForDJVM
fun <T: Any> loadClassesImplementing(classloader: ClassLoader, clazz: Class<T>): Set<T> {
return ClassGraph().addClassLoader(classloader)
.enableAllInfo()
.scan()
.getClassesImplementing(clazz.name)
.filterNot { it.isAbstract }
.mapNotNull { classloader.loadClass(it.name).asSubclass(clazz) }
.map { it.kotlin.objectInstance ?: it.kotlin.createInstance() }
.toSet()
}

View File

@ -19,12 +19,16 @@ import net.corda.core.node.services.vault.Builder
import net.corda.core.node.services.vault.Sort
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.internal.AttachmentsClassLoaderBuilder
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.transactions.WireTransaction
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.MDC
import java.security.PublicKey
import java.util.jar.JarEntry
import java.util.jar.JarInputStream
// *Internal* Corda-specific utilities.

View File

@ -1,6 +1,7 @@
@file:KeepForDJVM
package net.corda.core.serialization
import co.paralleluniverse.io.serialization.Serialization
import net.corda.core.CordaInternal
import net.corda.core.DeleteForDJVM
import net.corda.core.DoNotImplement
@ -160,6 +161,10 @@ interface SerializationContext {
* The use case we are serializing or deserializing for. See [UseCase].
*/
val useCase: UseCase
/**
* Additional custom serializers that will be made available during (de)serialization.
*/
val customSerializers: Set<SerializationCustomSerializer<*, *>>
/**
* Helper method to return a new context based on this context with the property added.
@ -200,6 +205,11 @@ interface SerializationContext {
*/
fun withWhitelisted(clazz: Class<*>): SerializationContext
/**
* Helper method to return a new context based on this context with the given serializers added.
*/
fun withCustomSerializers(serializers: Set<SerializationCustomSerializer<*, *>>): SerializationContext
/**
* Helper method to return a new context based on this context but with serialization using the format this header sequence represents.
*/
@ -335,3 +345,15 @@ interface ClassWhitelist {
interface EncodingWhitelist {
fun acceptEncoding(encoding: SerializationEncoding): Boolean
}
/**
* Helper method to return a new context based on this context with the given list of classes specifically whitelisted.
*/
fun SerializationContext.withWhitelist(classes: List<Class<*>>): SerializationContext {
var currentContext = this
classes.forEach {
clazz -> currentContext = currentContext.withWhitelisted(clazz)
}
return currentContext
}

View File

@ -2,6 +2,7 @@ package net.corda.core.serialization.internal
import net.corda.core.CordaException
import net.corda.core.KeepForDJVM
import net.corda.core.internal.loadClassesImplementing
import net.corda.core.contracts.Attachment
import net.corda.core.contracts.ContractAttachment
import net.corda.core.contracts.TransactionVerificationException.OverlappingAttachmentsException
@ -10,7 +11,11 @@ import net.corda.core.crypto.sha256
import net.corda.core.internal.*
import net.corda.core.internal.cordapp.targetPlatformVersion
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.MissingAttachmentsException
import net.corda.core.serialization.SerializationCustomSerializer
import net.corda.core.serialization.SerializationFactory
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.serialization.*
import net.corda.core.serialization.internal.AttachmentURLStreamHandlerFactory.toUrl
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
@ -18,6 +23,7 @@ import java.io.ByteArrayOutputStream
import java.io.IOException
import java.io.InputStream
import java.net.*
import java.util.*
/**
* A custom ClassLoader that knows how to load classes from a set of attachments. The attachments themselves only
@ -174,34 +180,38 @@ class AttachmentsClassLoader(attachments: List<Attachment>, parent: ClassLoader
}
/**
* This is just a factory that provides a cache to avoid constructing expensive [AttachmentsClassLoader]s.
* This is just a factory that provides caches to optimise expensive construction/loading of classloaders, serializers, whitelisted classes.
*/
@VisibleForTesting
internal object AttachmentsClassLoaderBuilder {
private const val ATTACHMENT_CLASSLOADER_CACHE_SIZE = 1000
private const val CACHE_SIZE = 1000
// This runs in the DJVM so it can't use caffeine.
private val cache: MutableMap<List<SecureHash>, AttachmentsClassLoader> = createSimpleCache<List<SecureHash>, AttachmentsClassLoader>(ATTACHMENT_CLASSLOADER_CACHE_SIZE)
.toSynchronised()
fun build(attachments: List<Attachment>): AttachmentsClassLoader {
return cache.computeIfAbsent(attachments.map { it.id }.sorted()) {
AttachmentsClassLoader(attachments)
}
}
private val cache: MutableMap<Set<SecureHash>, SerializationContext> = createSimpleCache(CACHE_SIZE)
fun <T> withAttachmentsClassloaderContext(attachments: List<Attachment>, block: (ClassLoader) -> T): T {
val attachmentIds = attachments.map { it.id }.toSet()
// Create classloader from the attachments.
val transactionClassLoader = AttachmentsClassLoaderBuilder.build(attachments)
val serializationContext = cache.computeIfAbsent(attachmentIds) {
// Create classloader and load serializers, whitelisted classes
val transactionClassLoader = AttachmentsClassLoader(attachments)
val serializers = loadClassesImplementing(transactionClassLoader, SerializationCustomSerializer::class.java)
val whitelistedClasses = ServiceLoader.load(SerializationWhitelist::class.java, transactionClassLoader)
.flatMap { it.whitelist }
.toList()
// Create a new serializationContext for the current Transaction.
val transactionSerializationContext = SerializationFactory.defaultFactory.defaultContext.withPreventDataLoss().withClassLoader(transactionClassLoader)
// Create a new serializationContext for the current Transaction.
SerializationFactory.defaultFactory.defaultContext
.withPreventDataLoss()
.withClassLoader(transactionClassLoader)
.withWhitelist(whitelistedClasses)
.withCustomSerializers(serializers)
}
// Deserialize all relevant classes in the transaction classloader.
return SerializationFactory.defaultFactory.withCurrentContext(transactionSerializationContext) {
block(transactionClassLoader)
return SerializationFactory.defaultFactory.withCurrentContext(serializationContext) {
block(serializationContext.deserializationClassLoader)
}
}
}

View File

@ -0,0 +1,34 @@
package net.corda.core.internal
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import java.lang.IllegalArgumentException
class ClassLoadingUtilsTest {
interface BaseInterface {}
interface BaseInterface2 {}
class ConcreteClassWithEmptyConstructor: BaseInterface {}
abstract class AbstractClass: BaseInterface
class ConcreteClassWithNonEmptyConstructor(private val someData: Int): BaseInterface2 {}
@Test
fun predicateClassAreLoadedSuccessfully() {
val classes = loadClassesImplementing(BaseInterface::class.java.classLoader, BaseInterface::class.java)
val classNames = classes.map { it.javaClass.name }
assertThat(classNames).contains(ConcreteClassWithEmptyConstructor::class.java.name)
assertThat(classNames).doesNotContain(AbstractClass::class.java.name)
}
@Test(expected = IllegalArgumentException::class)
fun throwsExceptionWhenClassDoesNotContainProperConstructors() {
val classes = loadClassesImplementing(BaseInterface::class.java.classLoader, BaseInterface2::class.java)
}
}

View File

@ -190,7 +190,7 @@ object TwoPartyDealFlow {
// We set the transaction's time-window: it may be that none of the contracts need this!
// But it can't hurt to have one.
ptx.setTimeWindow(serviceHub.clock.instant(), 30.seconds)
ptx.setTimeWindow(serviceHub.clock.instant(), 60.seconds)
return Triple(ptx, arrayListOf(deal.participants.single { it is Party && serviceHub.myInfo.isLegalIdentity(it) }.owningKey), emptyList())
}
}

View File

@ -47,7 +47,7 @@ class FlowCheckpointVersionNodeStartupCheckTest {
val result = if (page.snapshot.states.isNotEmpty()) {
page.snapshot.states.first()
} else {
val r = page.updates.timeout(5, TimeUnit.SECONDS).take(1).toBlocking().single()
val r = page.updates.timeout(10, TimeUnit.SECONDS).take(1).toBlocking().single()
if (r.consumed.isNotEmpty()) r.consumed.first() else r.produced.first()
}
assertNotNull(result)

View File

@ -4,6 +4,7 @@ import net.corda.core.cordapp.Cordapp
import net.corda.core.internal.toSynchronised
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationCustomSerializer
import net.corda.core.serialization.SerializationWhitelist
import net.corda.serialization.internal.CordaSerializationMagic
import net.corda.serialization.internal.amqp.*
import net.corda.serialization.internal.amqp.custom.RxNotificationSerializer
@ -14,12 +15,13 @@ import net.corda.serialization.internal.amqp.custom.RxNotificationSerializer
*/
class AMQPServerSerializationScheme(
cordappCustomSerializers: Set<SerializationCustomSerializer<*, *>>,
cordappSerializationWhitelists: Set<SerializationWhitelist>,
serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory>
) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) {
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised())
constructor(cordapps: List<Cordapp>, serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory>) : this(cordapps.customSerializers, serializerFactoriesForContexts)
) : AbstractAMQPSerializationScheme(cordappCustomSerializers, cordappSerializationWhitelists, serializerFactoriesForContexts) {
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, cordapps.serializationWhitelists, AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised())
constructor(cordapps: List<Cordapp>, serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory>) : this(cordapps.customSerializers, cordapps.serializationWhitelists, serializerFactoriesForContexts)
constructor() : this(emptySet(), AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised() )
constructor() : this(emptySet(), emptySet(), AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised() )
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
throw UnsupportedOperationException()

View File

@ -59,7 +59,7 @@ class RoundTripObservableSerializerTests {
@Test
fun roundTripTest1() {
val serializationScheme = AMQPRoundTripRPCSerializationScheme(
serializationContext, emptySet(), AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised())
serializationContext, emptySet(), emptySet(), AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised())
// Fake up a message ID, needs to be used on both "sides". The server setting it in the subscriptionMap,
// the client as a property of the deserializer which, in the actual RPC client, is pulled off of

View File

@ -4,6 +4,7 @@ import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSer
import net.corda.core.context.Trace
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationCustomSerializer
import net.corda.core.serialization.SerializationWhitelist
import net.corda.node.serialization.amqp.RpcServerObservableSerializer
import net.corda.nodeapi.RPCApi
import net.corda.serialization.internal.CordaSerializationMagic
@ -20,9 +21,10 @@ import net.corda.client.rpc.internal.ObservableContext as ClientObservableContex
class AMQPRoundTripRPCSerializationScheme(
private val serializationContext: SerializationContext,
cordappCustomSerializers: Set<SerializationCustomSerializer<*, *>>,
cordappSerializationWhitelists: Set<SerializationWhitelist>,
serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory>)
: AbstractAMQPSerializationScheme(
cordappCustomSerializers, serializerFactoriesForContexts
cordappCustomSerializers, cordappSerializationWhitelists, serializerFactoriesForContexts
) {
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
return SerializerFactoryBuilder.build(AllWhitelist, javaClass.classLoader).apply {

View File

@ -23,7 +23,7 @@ class AttachmentDemoTest {
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = demoUser, maximumHeapSize = "1g"),
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = demoUser, maximumHeapSize = "1g")
).map { it.getOrThrow() }
val webserverHandle = startWebserver(nodeB).getOrThrow()
val webserverHandle = startWebserver(nodeB, "1g").getOrThrow()
val senderThread = supplyAsync {
CordaRPCClient(nodeA.rpcAddress).start(demoUser[0].username, demoUser[0].password).use {

View File

@ -63,7 +63,7 @@ object FixingFlow {
// We set the transaction's time-window: it may be that none of the contracts need this!
// But it can't hurt to have one.
ptx.setTimeWindow(serviceHub.clock.instant(), 30.seconds)
ptx.setTimeWindow(serviceHub.clock.instant(), 60.seconds)
}
@Suspendable

View File

@ -33,6 +33,7 @@ dependencies {
cordapp project(':finance:workflows')
cordapp project(path: ':samples:simm-valuation-demo:contracts-states', configuration: 'shrinkArtifacts')
cordapp project(':samples:simm-valuation-demo:flows')
cordapp project(':confidential-identities')
// Corda integration dependencies
cordaRuntime project(path: ":node:capsule", configuration: 'runtimeArtifacts')
@ -71,6 +72,7 @@ task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar', nodeTask,
cordapp project(':finance:workflows')
cordapp project(':samples:simm-valuation-demo:contracts-states')
cordapp project(':samples:simm-valuation-demo:flows')
cordapp project(':confidential-identities')
rpcUsers = [['username': "default", 'password': "default", 'permissions': [ 'ALL' ]]]
}
node {

View File

@ -0,0 +1,30 @@
package net.corda.vega.plugin.customserializers
import com.opengamma.strata.basics.currency.Currency
import net.corda.core.serialization.SerializationCustomSerializer
@Suppress("UNUSED")
class CurrencySerializer : SerializationCustomSerializer<Currency, CurrencySerializer.Proxy> {
data class Proxy(val currency: String)
override fun fromProxy(proxy: Proxy): Currency {
return withCurrentClassLoader { Currency.parse(proxy.currency) }
}
override fun toProxy(obj: Currency) = Proxy(obj.toString())
/**
* The initialization of [Currency] uses the classpath to identify needed resources.
* However, it gives priority to the classloader in the thread context over the one this class was loaded with.
* See: [com.opengamma.strata.collect.io.ResourceLocator.classLoader]
*
* This is the reason we temporarily override the class loader in the thread context here, with the classloader of this
* class, which is guaranteed to contain everything in the 3rd party library's classpath.
*/
private fun withCurrentClassLoader(serializationFunction: () -> Currency): Currency {
val threadClassLoader = Thread.currentThread().contextClassLoader
Thread.currentThread().contextClassLoader = this.javaClass.classLoader
val result =serializationFunction()
Thread.currentThread().contextClassLoader = threadClassLoader
return result
}
}

View File

@ -1,40 +0,0 @@
package net.corda.vega.plugin
import com.google.common.collect.Ordering
import com.opengamma.strata.basics.currency.Currency
import com.opengamma.strata.basics.currency.CurrencyAmount
import com.opengamma.strata.basics.currency.MultiCurrencyAmount
import com.opengamma.strata.basics.date.Tenor
import com.opengamma.strata.collect.array.DoubleArray
import com.opengamma.strata.market.curve.CurveName
import com.opengamma.strata.market.param.CurrencyParameterSensitivities
import com.opengamma.strata.market.param.CurrencyParameterSensitivity
import com.opengamma.strata.market.param.TenorDateParameterMetadata
import com.opengamma.strata.market.param.ParameterMetadata
import net.corda.core.serialization.SerializationWhitelist
import net.corda.vega.analytics.CordaMarketData
import net.corda.vega.analytics.InitialMarginTriple
/**
* [SimmService] is the object that makes available the flows and services for the Simm agreement / evaluation flow.
* It is loaded via discovery - see [SerializationWhitelist].
* It is also the object that enables a human usable web service for demo purpose
* It is loaded via discovery see [WebServerPluginRegistry].
*/
class SimmFlowsPluginRegistry : SerializationWhitelist {
override val whitelist = listOf(
// MultiCurrencyAmount::class.java,
// Ordering.natural<Comparable<Any>>().javaClass,
// CurrencyAmount::class.java,
// Currency::class.java,
// InitialMarginTriple::class.java,
// CordaMarketData::class.java,
// CurrencyParameterSensitivities::class.java,
// CurrencyParameterSensitivity::class.java,
DoubleArray::class.java
// CurveName::class.java,
// TenorDateParameterMetadata::class.java,
// Tenor::class.java,
// ParameterMetadata::class.java
)
}

View File

@ -1,12 +0,0 @@
package net.corda.vega.plugin.customserializers
import com.opengamma.strata.basics.currency.Currency
import net.corda.core.serialization.SerializationCustomSerializer
@Suppress("UNUSED")
class CurrencySerializer : SerializationCustomSerializer<Currency, CurrencySerializer.Proxy> {
data class Proxy(val currency: String)
override fun fromProxy(proxy: Proxy): Currency = Currency.parse(proxy.currency)
override fun toProxy(obj: Currency) = Proxy(obj.toString())
}

View File

@ -9,9 +9,11 @@ import net.corda.serialization.internal.amqp.AbstractAMQPSerializationScheme
import net.corda.testing.common.internal.ProjectStructure.projectRootDir
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.DUMMY_BANK_B_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import net.corda.testing.http.HttpApi
import net.corda.testing.node.NotarySpec
import net.corda.testing.node.internal.FINANCE_CORDAPPS
import net.corda.testing.node.internal.findCordapp
import net.corda.vega.api.PortfolioApi
@ -51,14 +53,15 @@ class SimmValuationTest {
val logConfigFile = projectRootDir / "samples" / "simm-valuation-demo" / "src" / "main" / "resources" / "log4j2.xml"
assertThat(logConfigFile).isRegularFile()
driver(DriverParameters(isDebug = true,
cordappsForAllNodes = listOf(findCordapp("net.corda.vega.flows"), findCordapp("net.corda.vega.contracts")) + FINANCE_CORDAPPS,
systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))
cordappsForAllNodes = listOf(findCordapp("net.corda.vega.flows"), findCordapp("net.corda.vega.contracts"), findCordapp("net.corda.confidential")) + FINANCE_CORDAPPS,
systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()),
notarySpecs = listOf(NotarySpec(DUMMY_NOTARY_NAME, maximumHeapSize = "1g")))
) {
val nodeAFuture = startNode(providedName = nodeALegalName)
val nodeBFuture = startNode(providedName = nodeBLegalName)
val nodeAFuture = startNode(providedName = nodeALegalName, maximumHeapSize = "1g")
val nodeBFuture = startNode(providedName = nodeBLegalName, maximumHeapSize = "1g")
val (nodeA, nodeB) = listOf(nodeAFuture, nodeBFuture).map { it.getOrThrow() }
val nodeAWebServerFuture = startWebserver(nodeA)
val nodeBWebServerFuture = startWebserver(nodeB)
val nodeAWebServerFuture = startWebserver(nodeA, "1g")
val nodeBWebServerFuture = startWebserver(nodeB, "1g")
val nodeAApi = HttpApi.fromHostAndPort(nodeAWebServerFuture.getOrThrow().listenAddress, "api/simmvaluationdemo")
val nodeBApi = HttpApi.fromHostAndPort(nodeBWebServerFuture.getOrThrow().listenAddress, "api/simmvaluationdemo")
val nodeBParty = getPartyWithName(nodeAApi, nodeBLegalName)
@ -68,10 +71,9 @@ class SimmValuationTest {
assertTradeExists(nodeBApi, nodeAParty, testTradeId)
assertTradeExists(nodeAApi, nodeBParty, testTradeId)
// TODO Dimos - uncomment this on the CORDA-2390 branch to prove that the fix works.
// runValuationsBetween(nodeAApi, nodeBParty)
// assertValuationExists(nodeBApi, nodeAParty)
// assertValuationExists(nodeAApi, nodeBParty)
runValuationsBetween(nodeAApi, nodeBParty)
assertValuationExists(nodeBApi, nodeAParty)
assertValuationExists(nodeAApi, nodeBParty)
}
}

View File

@ -9,14 +9,12 @@ import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.node.services.Permissions.Companion.all
import net.corda.node.services.Permissions.Companion.startFlow
import net.corda.testing.core.BOC_NAME
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.DUMMY_BANK_B_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.core.*
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.InProcess
import net.corda.testing.driver.OutOfProcess
import net.corda.testing.driver.driver
import net.corda.testing.node.NotarySpec
import net.corda.testing.node.TestCordapp
import net.corda.testing.node.User
import net.corda.testing.node.internal.FINANCE_CORDAPPS
@ -39,12 +37,13 @@ class TraderDemoTest {
driver(DriverParameters(
startNodesInProcess = true,
inMemoryDB = false,
cordappsForAllNodes = FINANCE_CORDAPPS + TestCordapp.findCordapp("net.corda.traderdemo")
cordappsForAllNodes = FINANCE_CORDAPPS + TestCordapp.findCordapp("net.corda.traderdemo"),
notarySpecs = listOf(NotarySpec(DUMMY_NOTARY_NAME, maximumHeapSize = "1g"))
)) {
val (nodeA, nodeB, bankNode) = listOf(
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)),
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)),
startNode(providedName = BOC_NAME, rpcUsers = listOf(bankUser))
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), maximumHeapSize = "1g"),
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), maximumHeapSize = "1g"),
startNode(providedName = BOC_NAME, rpcUsers = listOf(bankUser), maximumHeapSize = "1g")
).map { (it.getOrThrow() as InProcess) }
val (nodeARpc, nodeBRpc) = listOf(nodeA, nodeB).map {

View File

@ -26,7 +26,8 @@ data class SerializationContextImpl @JvmOverloads constructor(override val prefe
override val encoding: SerializationEncoding?,
override val encodingWhitelist: EncodingWhitelist = NullEncodingWhitelist,
override val lenientCarpenterEnabled: Boolean = false,
override val preventDataLoss: Boolean = false) : SerializationContext {
override val preventDataLoss: Boolean = false,
override val customSerializers: Set<SerializationCustomSerializer<*, *>> = emptySet()) : SerializationContext {
/**
* {@inheritDoc}
*/
@ -56,6 +57,10 @@ data class SerializationContextImpl @JvmOverloads constructor(override val prefe
})
}
override fun withCustomSerializers(serializers: Set<SerializationCustomSerializer<*, *>>): SerializationContextImpl {
return copy(customSerializers = customSerializers.union(serializers))
}
override fun withPreferredSerializationVersion(magic: SerializationMagic) = copy(preferredSerializationVersion = magic)
override fun withEncoding(encoding: SerializationEncoding?) = copy(encoding = encoding)
override fun withEncodingWhitelist(encodingWhitelist: EncodingWhitelist) = copy(encodingWhitelist = encodingWhitelist)

View File

@ -22,7 +22,10 @@ import java.util.*
val AMQP_ENABLED get() = SerializationDefaults.P2P_CONTEXT.preferredSerializationVersion == amqpMagic
data class SerializationFactoryCacheKey(val classWhitelist: ClassWhitelist, val deserializationClassLoader: ClassLoader, val preventDataLoss: Boolean)
data class SerializationFactoryCacheKey(val classWhitelist: ClassWhitelist,
val deserializationClassLoader: ClassLoader,
val preventDataLoss: Boolean,
val customSerializers: Set<SerializationCustomSerializer<*, *>>)
fun SerializerFactory.addToWhitelist(vararg types: Class<*>) {
require(types.toSet().size == types.size) {
@ -43,11 +46,12 @@ interface SerializerFactoryFactory {
@KeepForDJVM
abstract class AbstractAMQPSerializationScheme(
private val cordappCustomSerializers: Set<SerializationCustomSerializer<*, *>>,
private val cordappSerializationWhitelists: Set<SerializationWhitelist>,
maybeNotConcurrentSerializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory>,
val sff: SerializerFactoryFactory = createSerializerFactoryFactory()
) : SerializationScheme {
@DeleteForDJVM
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised())
constructor(cordapps: List<Cordapp>) : this(cordapps.customSerializers, cordapps.serializationWhitelists, AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>(128).toSynchronised())
// This is a bit gross but a broader check for ConcurrentMap is not allowed inside DJVM.
private val serializerFactoriesForContexts: MutableMap<SerializationFactoryCacheKey, SerializerFactory> = if (maybeNotConcurrentSerializerFactoriesForContexts is AccessOrderLinkedHashMap<*, *>) {
@ -98,9 +102,12 @@ abstract class AbstractAMQPSerializationScheme(
@DeleteForDJVM
val List<Cordapp>.customSerializers
get() = flatMap { it.serializationCustomSerializers }.toSet()
@DeleteForDJVM
val List<Cordapp>.serializationWhitelists
get() = flatMap { it.serializationWhitelists }.toSet()
}
// Parameter "context" is unused directly but passed in by reflection. Removing it will cause failures.
private fun registerCustomSerializers(context: SerializationContext, factory: SerializerFactory) {
with(factory) {
register(publicKeySerializer)
@ -135,9 +142,6 @@ abstract class AbstractAMQPSerializationScheme(
register(net.corda.serialization.internal.amqp.custom.ContractAttachmentSerializer(this))
registerNonDeterministicSerializers(factory)
}
for (whitelistProvider in serializationWhitelists) {
factory.addToWhitelist(*whitelistProvider.whitelist.toTypedArray())
}
// If we're passed in an external list we trust that, otherwise revert to looking at the scan of the
// classpath to find custom serializers.
@ -146,6 +150,15 @@ abstract class AbstractAMQPSerializationScheme(
factory.registerExternal(CorDappCustomSerializer(customSerializer, factory))
}
} else {
// This step is registering custom serializers, which have been added after node initialisation (i.e. via attachments during transaction verification).
// Note: the order between the registration of customSerializers and cordappCustomSerializers must be preserved as-is. The reason is the following:
// Currently, the serialization infrastructure does not support multiple versions of a class (the first one that is registered dominates).
// As a result, when inside a context with attachments class loader, we prioritize serializers loaded on-demand from attachments to serializers that had been
// loaded during node initialisation, by scanning the cordapps folder.
context.customSerializers.forEach { customSerializer ->
factory.registerExternal(CorDappCustomSerializer(customSerializer, factory))
}
logger.debug("Custom Serializer list loaded - not scanning classpath")
cordappCustomSerializers.forEach { customSerializer ->
factory.registerExternal(CorDappCustomSerializer(customSerializer, factory))
@ -159,6 +172,17 @@ abstract class AbstractAMQPSerializationScheme(
}
}
private fun registerCustomWhitelists(factory: SerializerFactory) {
serializationWhitelists.forEach {
factory.addToWhitelist(*it.whitelist.toTypedArray())
}
cordappSerializationWhitelists.forEach {
it.whitelist.forEach {
clazz -> factory.addToWhitelist(clazz)
}
}
}
/*
* Register the serializers which will be excluded from the DJVM.
*/
@ -176,7 +200,7 @@ abstract class AbstractAMQPSerializationScheme(
open val publicKeySerializer: CustomSerializer<*> = net.corda.serialization.internal.amqp.custom.PublicKeySerializer
fun getSerializerFactory(context: SerializationContext): SerializerFactory {
val key = SerializationFactoryCacheKey(context.whitelist, context.deserializationClassLoader, context.preventDataLoss)
val key = SerializationFactoryCacheKey(context.whitelist, context.deserializationClassLoader, context.preventDataLoss, context.customSerializers)
// ConcurrentHashMap.get() is lock free, but computeIfAbsent is not, even if the key is in the map already.
return serializerFactoriesForContexts[key] ?: serializerFactoriesForContexts.computeIfAbsent(key) {
when (context.useCase) {
@ -187,6 +211,7 @@ abstract class AbstractAMQPSerializationScheme(
else -> sff.make(context)
}.also {
registerCustomSerializers(context, it)
registerCustomWhitelists(it)
}
}
}

View File

@ -40,7 +40,7 @@ class AbstractAMQPSerializationSchemeTest {
val factory = SerializerFactoryBuilder.build(TESTING_CONTEXT.whitelist, TESTING_CONTEXT.deserializationClassLoader)
val maxFactories = 512
val backingMap = AccessOrderLinkedHashMap<SerializationFactoryCacheKey, SerializerFactory>({ maxFactories }).toSynchronised()
val scheme = object : AbstractAMQPSerializationScheme(emptySet(), backingMap, createSerializerFactoryFactory()) {
val scheme = object : AbstractAMQPSerializationScheme(emptySet(), emptySet(), backingMap, createSerializerFactoryFactory()) {
override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory {
return factory
}

View File

@ -20,7 +20,19 @@ data class NotarySpec(
val rpcUsers: List<User> = emptyList(),
val verifierType: VerifierType = VerifierType.InMemory,
val cluster: ClusterSpec? = null
)
) {
// These extra fields are handled this way to preserve Kotlin wire compatibility wrt additional parameters with default values.
constructor(name: CordaX500Name,
validating: Boolean = true,
rpcUsers: List<User> = emptyList(),
verifierType: VerifierType = VerifierType.InMemory,
cluster: ClusterSpec? = null,
maximumHeapSize: String = "512m"): this(name, validating, rpcUsers, verifierType, cluster) {
this.maximumHeapSize = maximumHeapSize
}
var maximumHeapSize: String = "512m"
}
/**
* Abstract class specifying information about the consensus algorithm used for a cluster of nodes.

View File

@ -488,7 +488,7 @@ class DriverDSLImpl(
return startRegisteredNode(
spec.name,
localNetworkMap,
NodeParameters(rpcUsers = spec.rpcUsers, verifierType = spec.verifierType, customOverrides = notaryConfig + customOverrides)
NodeParameters(rpcUsers = spec.rpcUsers, verifierType = spec.verifierType, customOverrides = notaryConfig + customOverrides, maximumHeapSize = spec.maximumHeapSize)
).map { listOf(it) }
}

View File

@ -31,7 +31,7 @@ class WebserverDriverTests {
fun `starting a node and independent web server works`() {
val addr = driver(DriverParameters(notarySpecs = emptyList())) {
val node = startNode(providedName = DUMMY_BANK_A_NAME).getOrThrow()
val webserverHandle = startWebserver(node).getOrThrow()
val webserverHandle = startWebserver(node, "512m").getOrThrow()
webserverMustBeUp(webserverHandle)
webserverHandle.listenAddress
}

View File

@ -199,7 +199,7 @@ class NodeWebServer(val config: WebServerConfig) {
private fun connectLocalRpcAsNodeUser(): CordaRPCOps {
log.info("Connecting to node at ${config.rpcAddress} as ${config.runAs}")
val client = CordaRPCClient(config.rpcAddress)
val client = CordaRPCClient(config.rpcAddress, classLoader = javaClass.classLoader)
val connection = client.start(config.runAs.username, config.runAs.password)
return connection.proxy
}