mirror of
synced 2025-03-16 00:55:24 +00:00
Merge pull request #1357 from corda/christians/merge-ENT-2414
OS -> Ent merge up to bc330bd
This commit is contained in:
@ -16,6 +16,7 @@ import javafx.collections.FXCollections
import javafx.collections.ObservableList
import net.corda.client.jfx.utils.*
import net.corda.core.identity.AnonymousParty
import net.corda.core.internal.buildNamed
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache.MapChange
import java.security.PublicKey
@ -40,7 +41,7 @@ class NetworkIdentityModel {
private val rpcProxy by observableValue(NodeMonitorModel::proxyObservable)
private val identityCache = Caffeine.newBuilder()
.build<PublicKey, ObservableValue<NodeInfo?>>({ publicKey ->
.buildNamed<PublicKey, ObservableValue<NodeInfo?>>("NetworkIdentityModel_identity", { publicKey ->
publicKey.let { rpcProxy.map { it?.cordaRPCOps?.nodeInfoFromParty(AnonymousParty(publicKey)) } }
val notaries = ChosenList(rpcProxy.map { FXCollections.observableList(it?.cordaRPCOps?.notaryIdentities() ?: emptyList()) }, "notaries")
@ -24,10 +24,7 @@ import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSer
import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.context.Trace.InvocationId
import net.corda.core.internal.LazyStickyPool
import net.corda.core.internal.LifeCycle
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.times
import net.corda.core.internal.*
import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.serialize
@ -41,26 +38,15 @@ import org.apache.activemq.artemis.api.core.ActiveMQException
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory
import org.apache.activemq.artemis.api.core.client.FailoverEventType
import org.apache.activemq.artemis.api.core.client.ServerLocator
import rx.Notification
import rx.Observable
import rx.subjects.UnicastSubject
import java.lang.reflect.InvocationHandler
import java.lang.reflect.Method
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
import kotlin.reflect.jvm.javaMethod
@ -183,9 +169,7 @@ class RPCClientProxyHandler(
observablesToReap.locked { observables.add(observableId) }
return Caffeine.newBuilder().
private var sessionFactory: ClientSessionFactory? = null
@ -298,56 +282,71 @@ class RPCClientProxyHandler(
// The handler for Artemis messages.
private fun artemisMessageHandler(message: ClientMessage) {
val serverToClient = RPCApi.ServerToClient.fromClientMessage(serializationContextWithObservableContext, message)
val deduplicationSequenceNumber = message.getLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME)
if (deduplicationChecker.checkDuplicateMessageId(serverToClient.deduplicationIdentity, deduplicationSequenceNumber)) {
log.info("Message duplication detected, discarding message")
fun completeExceptionally(id: InvocationId, e: Throwable, future: SettableFuture<Any?>?) {
val rpcCallSite: Throwable? = callSiteMap?.get(id)
if (rpcCallSite != null) addRpcCallSiteToThrowable(e, rpcCallSite)
future?.setException(e.cause ?: e)
log.debug { "Got message from RPC server $serverToClient" }
when (serverToClient) {
is RPCApi.ServerToClient.RpcReply -> {
val replyFuture = rpcReplyMap.remove(serverToClient.id)
if (replyFuture == null) {
log.error("RPC reply arrived to unknown RPC ID ${serverToClient.id}, this indicates an internal RPC error.")
} else {
val result = serverToClient.result
when (result) {
is Try.Success -> replyFuture.set(result.value)
is Try.Failure -> {
val rpcCallSite = callSiteMap?.get(serverToClient.id)
if (rpcCallSite != null) addRpcCallSiteToThrowable(result.exception, rpcCallSite)
is RPCApi.ServerToClient.Observation -> {
val observable = observableContext.observableMap.getIfPresent(serverToClient.id)
if (observable == null) {
log.debug("Observation ${serverToClient.content} arrived to unknown Observable with ID ${serverToClient.id}. " +
"This may be due to an observation arriving before the server was " +
"notified of observable shutdown")
} else {
// We schedule the onNext() on an executor sticky-pooled based on the Observable ID.
observationExecutorPool.run(serverToClient.id) { executor ->
executor.submit {
val content = serverToClient.content
if (content.isOnCompleted || content.isOnError) {
// Add call site information on error
if (content.isOnError) {
val rpcCallSite = callSiteMap?.get(serverToClient.id)
if (rpcCallSite != null) addRpcCallSiteToThrowable(content.throwable, rpcCallSite)
try {
// Deserialize the reply from the server, both the wrapping metadata and the actual body of the return value.
val serverToClient: RPCApi.ServerToClient = try {
RPCApi.ServerToClient.fromClientMessage(serializationContextWithObservableContext, message)
} catch (e: RPCApi.ServerToClient.FailedToDeserializeReply) {
// Might happen if something goes wrong during mapping the response to classes, evolution, class synthesis etc.
log.error("Failed to deserialize RPC body", e)
completeExceptionally(e.id, e, rpcReplyMap.remove(e.id))
val deduplicationSequenceNumber = message.getLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME)
if (deduplicationChecker.checkDuplicateMessageId(serverToClient.deduplicationIdentity, deduplicationSequenceNumber)) {
log.info("Message duplication detected, discarding message")
log.debug { "Got message from RPC server $serverToClient" }
when (serverToClient) {
is RPCApi.ServerToClient.RpcReply -> {
val replyFuture = rpcReplyMap.remove(serverToClient.id)
if (replyFuture == null) {
log.error("RPC reply arrived to unknown RPC ID ${serverToClient.id}, this indicates an internal RPC error.")
} else {
val result: Try<Any?> = serverToClient.result
when (result) {
is Try.Success -> replyFuture.set(result.value)
is Try.Failure -> {
completeExceptionally(serverToClient.id, result.exception, replyFuture)
is RPCApi.ServerToClient.Observation -> {
val observable: UnicastSubject<Notification<*>>? = observableContext.observableMap.getIfPresent(serverToClient.id)
if (observable == null) {
log.debug("Observation ${serverToClient.content} arrived to unknown Observable with ID ${serverToClient.id}. " +
"This may be due to an observation arriving before the server was " +
"notified of observable shutdown")
} else {
// We schedule the onNext() on an executor sticky-pooled based on the Observable ID.
observationExecutorPool.run(serverToClient.id) { executor ->
executor.submit {
val content = serverToClient.content
if (content.isOnCompleted || content.isOnError) {
// Add call site information on error
if (content.isOnError) {
val rpcCallSite = callSiteMap?.get(serverToClient.id)
if (rpcCallSite != null) addRpcCallSiteToThrowable(content.throwable, rpcCallSite)
} finally {
@ -97,6 +97,9 @@ dependencies {
// Guava: Google utilities library.
testCompile "com.google.guava:guava:$guava_version"
// For caches rather than guava
compile "com.github.ben-manes.caffeine:caffeine:$caffeine_version"
// Smoke tests do NOT have any Node code on the classpath!
smokeTestCompile project(':smoke-test-utils')
smokeTestCompile "org.assertj:assertj-core:${assertj_version}"
Normal file
Normal file
@ -0,0 +1,41 @@
package net.corda.core.internal
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.LoadingCache
* Restrict the allowed characters of a cache name - this ensures that each cache has a name, and that
* the name can be used to create a file name or a metric name.
internal fun checkCacheName(name: String) {
private val allowedChars = Regex("^[0-9A-Za-z_.]*\$")
/* buildNamed is the central helper method to build caffeine caches in Corda.
* This allows to easily add tweaks to all caches built in Corda, and also forces
* cache users to give their cache a (meaningful) name that can be used e.g. for
* capturing cache traces etc.
* Currently it is not used in this version of CORDA, but there are plans to do so.
fun <K, V> Caffeine<in K, in V>.buildNamed(name: String): Cache<K, V> {
return this.build<K, V>()
fun <K, V> Caffeine<in K, in V>.buildNamed(name: String, loadFunc: (K) -> V): LoadingCache<K, V> {
return this.build<K, V>(loadFunc)
fun <K, V> Caffeine<in K, in V>.buildNamed(name: String, loader: CacheLoader<K, V>): LoadingCache<K, V> {
return this.build<K, V>(loader)
@ -0,0 +1,24 @@
package net.corda.core.internal
import org.junit.Test
import kotlin.test.assertEquals
class NamedCacheTest {
fun checkNameHelper(name: String, throws: Boolean) {
var exceptionThrown = false
try {
} catch (e: Exception) {
exceptionThrown = true
assertEquals(throws, exceptionThrown)
fun TestCheckCacheName() {
checkNameHelper("abc_123.234", false)
checkNameHelper("", true)
checkNameHelper("abc 123", true)
checkNameHelper("abc/323", true)
@ -1,20 +1,10 @@
buildscript {
// Shaded version of ASM to avoid conflict with root project.
ext.asm_version = '6.1.1'
ext.deterministic_rt_version = '1.0-20180625.120901-7'
repositories {
dependencies {
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.3'
plugins {
id 'com.github.johnrengelman.shadow' version '2.0.4'
plugins {
id 'com.github.johnrengelman.shadow' version '2.0.3'
ext {
// Shaded version of ASM to avoid conflict with root project.
asm_version = '6.1.1'
dependencies {
@ -32,47 +22,16 @@ dependencies {
// Classpath scanner
compile "io.github.lukehutch:fast-classpath-scanner:$fast_classpath_scanner_version"
// Deterministic runtime - used in whitelist generation
runtime "net.corda:deterministic-rt:$deterministic_rt_version:api"
// Test utilities
testCompile "junit:junit:$junit_version"
testCompile "org.assertj:assertj-core:$assertj_version"
repositories {
maven {
url "$artifactory_contextUrl/corda-dev"
task generateWhitelist(type: JavaExec) {
// This is an example of how a whitelist can be generated from a JAR. In most applications though, it is recommended
// thet the minimal set whitelist is used.
def jars = configurations.runtime.collect {
}.findAll {
classpath = sourceSets.main.runtimeClasspath
main = 'net.corda.djvm.tools.cli.Program'
args = ['whitelist', 'generate', '-o', 'src/main/resources/jdk8-deterministic.dat.gz'] + jars
jar {
manifest {
'Automatic-Module-Name': 'net.corda.djvm'
jar.enabled = false
shadowJar {
baseName = "djvm"
classifier = ""
exclude 'deterministic-rt*.jar'
dependencies {
@ -87,3 +46,4 @@ shadowJar {
assemble.dependsOn shadowJar
@ -1,16 +1,15 @@
buildscript {
repositories {
dependencies {
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.3'
plugins {
id 'com.github.johnrengelman.shadow'
repositories {
maven {
url "$artifactory_contextUrl/corda-dev"
plugins {
id 'com.github.johnrengelman.shadow'
configurations {
dependencies {
@ -23,23 +22,29 @@ dependencies {
compile "info.picocli:picocli:$picocli_version"
compile "io.github.lukehutch:fast-classpath-scanner:$fast_classpath_scanner_version"
compile project(path: ":djvm", configuration: "shadow")
// Deterministic runtime - used in whitelist generation
deterministicRt project(path: ':jdk8u-deterministic', configuration: 'jdk')
repositories {
jar {
manifest {
'Main-Class': 'net.corda.djvm.tools.cli.Program',
'Automatic-Module-Name': 'net.corda.djvm',
'Build-Date': new Date().format("yyyy-MM-dd'T'HH:mm:ssZ")
jar.enabled = false
shadowJar {
baseName = "corda-djvm"
classifier = 'cli'
manifest {
'Automatic-Module-Name': 'net.corda.djvm',
'Main-Class': 'net.corda.djvm.tools.cli.Program',
'Build-Date': new Date().format("yyyy-MM-dd'T'HH:mm:ssZ")
assemble.dependsOn shadowJar
task generateWhitelist(type: JavaExec, dependsOn: shadowJar) {
// This is an example of how a whitelist can be generated from a JAR. In most applications though, it is recommended
// that the minimal set whitelist is used.
main = '-jar'
args = [shadowJar.outputs.files.singleFile, 'whitelist', 'generate', '-o', "$buildDir/jdk8-deterministic.dat.gz", configurations.deterministicRt.files[0] ]
Binary file not shown.
Binary file not shown.
@ -6,6 +6,8 @@ release, see :doc:`upgrade-notes`.
* Removed experimental feature `CordformDefinition`
* Vault query fix: support query by parent classes of Contract State classes (see https://github.com/corda/corda/issues/3714)
* Added ``registerResponderFlow`` method to ``StartedMockNode``, to support isolated testing of responder flow behaviour.
@ -45,3 +45,7 @@ task copyJdk(type: Copy) {
assemble.dependsOn copyJdk
jar.enabled = false
artifacts {
jdk file: file(rt_jar), type: 'jar', builtBy: copyJdk
@ -36,8 +36,6 @@ dependencies {
// TODO: Remove this dependency and the code that requires it
compile "commons-fileupload:commons-fileupload:$fileupload_version"
compile "net.corda.plugins:cordform-common:$gradle_plugins_version"
// TypeSafe Config: for simple and human friendly config files.
compile "com.typesafe:config:$typesafe_config_version"
@ -24,7 +24,7 @@ import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.Try
import org.apache.activemq.artemis.api.core.ActiveMQBuffer
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.management.CoreNotificationType
import org.apache.activemq.artemis.api.core.management.ManagementHelper
import org.apache.activemq.artemis.reader.MessageUtil
@ -222,6 +222,11 @@ object RPCApi {
* Thrown if the RPC reply body couldn't be deserialized.
class FailedToDeserializeReply(val id: InvocationId, cause: Throwable) : RuntimeException("Failed to deserialize RPC reply: ${cause.message}", cause)
companion object {
private fun Any.safeSerialize(context: SerializationContext, wrap: (Throwable) -> Any) = try {
serialize(context = context)
@ -236,10 +241,18 @@ object RPCApi {
RPCApi.ServerToClient.Tag.RPC_REPLY -> {
val id = message.invocationId(RPC_ID_FIELD_NAME, RPC_ID_TIMESTAMP_FIELD_NAME) ?: throw IllegalStateException("Cannot parse invocation id from client message.")
val poolWithIdContext = context.withProperty(RpcRequestOrObservableIdKey, id)
// The result here is a Try<> that represents the attempt to try the operation on the server side.
// If anything goes wrong with deserialisation of the response, we propagate it differently because
// we also need to pass through the invocation and dedupe IDs.
val result: Try<Any?> = try {
message.getBodyAsByteArray().deserialize(context = poolWithIdContext)
} catch (e: Exception) {
throw FailedToDeserializeReply(id, e)
id = id,
deduplicationIdentity = deduplicationIdentity,
result = message.getBodyAsByteArray().deserialize(context = poolWithIdContext)
result = result
RPCApi.ServerToClient.Tag.OBSERVATION -> {
@ -12,6 +12,7 @@ package net.corda.nodeapi.internal
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import net.corda.core.internal.buildNamed
import java.time.Duration
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
@ -19,11 +20,11 @@ import java.util.concurrent.atomic.AtomicLong
* A class allowing the deduplication of a strictly incrementing sequence number.
class DeduplicationChecker(cacheExpiry: Duration) {
class DeduplicationChecker(cacheExpiry: Duration, name: String = "DeduplicationChecker") {
// dedupe identity -> watermark cache
private val watermarkCache = Caffeine.newBuilder()
.expireAfterAccess(cacheExpiry.toNanos(), TimeUnit.NANOSECONDS)
.buildNamed("${name}_watermark", WatermarkCacheLoader)
private object WatermarkCacheLoader : CacheLoader<Any, AtomicLong> {
override fun load(key: Any) = AtomicLong(-1)
@ -0,0 +1,4 @@
package net.corda.nodeapi.internal
// TODO: Add to Corda node.conf to allow customisation
const val NODE_INFO_DIRECTORY = "additional-node-infos"
@ -12,7 +12,6 @@ package net.corda.nodeapi.internal.network
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import net.corda.cordform.CordformNode
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.*
@ -56,11 +55,11 @@ import kotlin.streams.toList
// TODO Move this to tools:bootstrapper
class NetworkBootstrapper
internal constructor(private val initSerEnv: Boolean,
private val embeddedCordaJar: () -> InputStream,
private val nodeInfosGenerator: (List<Path>) -> List<Path>,
private val contractsJarConverter: (Path) -> ContractsJar) {
internal constructor(private val initSerEnv: Boolean,
private val embeddedCordaJar: () -> InputStream,
private val nodeInfosGenerator: (List<Path>) -> List<Path>,
private val contractsJarConverter: (Path) -> ContractsJar) {
constructor() : this(
initSerEnv = true,
@ -112,7 +111,7 @@ class NetworkBootstrapper
throw IllegalStateException("Error while generating node info file. Please check the logs in $logsDir.")
check(process.exitValue() == 0) { "Error while generating node info file. Please check the logs in $logsDir." }
check(process.exitValue() == 0) { "Error while generating node info file. Please check the logs in $logsDir." }
return nodeDir.list { paths ->
paths.filter { it.fileName.toString().startsWith(NODE_INFO_FILE_NAME_PREFIX) }.findFirst().get()
@ -279,7 +278,7 @@ class NetworkBootstrapper
private fun distributeNodeInfos(nodeDirs: List<Path>, nodeInfoFiles: List<Path>) {
for (nodeDir in nodeDirs) {
val additionalNodeInfosDir = (nodeDir / CordformNode.NODE_INFO_DIRECTORY).createDirectories()
val additionalNodeInfosDir = (nodeDir / NODE_INFO_DIRECTORY).createDirectories()
for (nodeInfoFile in nodeInfoFiles) {
nodeInfoFile.copyToDirectory(additionalNodeInfosDir, REPLACE_EXISTING)
@ -374,10 +373,10 @@ class NetworkBootstrapper
private fun NodeInfo.notaryIdentity(): Party {
return when (legalIdentities.size) {
// Single node notaries have just one identity like all other nodes. This identity is the notary identity
// Single node notaries have just one identity like all other nodes. This identity is the notary identity
1 -> legalIdentities[0]
// Nodes which are part of a distributed notary have a second identity which is the composite identity of the
// cluster and is shared by all the other members. This is the notary identity.
// Nodes which are part of a distributed notary have a second identity which is the composite identity of the
// cluster and is shared by all the other members. This is the notary identity.
2 -> legalIdentities[1]
else -> throw IllegalArgumentException("Not sure how to get the notary identity in this scenerio: $this")
@ -10,9 +10,9 @@
package net.corda.nodeapi.internal.network
import net.corda.cordform.CordformNode
import net.corda.core.internal.*
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.NODE_INFO_DIRECTORY
import rx.Observable
import rx.Scheduler
import rx.Subscription
@ -106,10 +106,11 @@ class NodeInfoFilesCopier(scheduler: Scheduler = Schedulers.io()) : AutoCloseabl
private fun poll() {
nodeDataMapBox.locked {
for (nodeData in values) {
nodeData.nodeDir.list { paths -> paths
.filter { it.isRegularFile() }
.filter { it.fileName.toString().startsWith(NODE_INFO_FILE_NAME_PREFIX) }
.forEach { processPath(nodeData, it) }
nodeData.nodeDir.list { paths ->
.filter { it.isRegularFile() }
.filter { it.fileName.toString().startsWith(NODE_INFO_FILE_NAME_PREFIX) }
.forEach { processPath(nodeData, it) }
@ -159,7 +160,7 @@ class NodeInfoFilesCopier(scheduler: Scheduler = Schedulers.io()) : AutoCloseabl
* Convenience holder for all the paths and files relative to a single node.
private class NodeData(val nodeDir: Path) {
val additionalNodeInfoDirectory: Path = nodeDir.resolve(CordformNode.NODE_INFO_DIRECTORY)
val additionalNodeInfoDirectory: Path = nodeDir.resolve(NODE_INFO_DIRECTORY)
// Map from Path to its lastModifiedTime.
val previouslySeenFiles = mutableMapOf<Path, FileTime>()
@ -11,6 +11,7 @@
package net.corda.nodeapi.internal.persistence
import com.github.benmanes.caffeine.cache.Caffeine
import net.corda.core.internal.buildNamed
import net.corda.core.internal.castIfPossible
import net.corda.core.schemas.MappedSchema
import net.corda.core.utilities.contextLogger
@ -72,7 +73,7 @@ class HibernateConfiguration(
private val sessionFactories = Caffeine.newBuilder().maximumSize(databaseConfig.mappedSchemaCacheSize).build<Set<MappedSchema>, SessionFactory>()
private val sessionFactories = Caffeine.newBuilder().maximumSize(databaseConfig.mappedSchemaCacheSize).buildNamed<Set<MappedSchema>, SessionFactory>("HibernateConfiguration_sessionFactories")
val sessionFactoryForRegisteredSchemas = schemas.let {
logger.info("Init HibernateConfiguration for schemas: $it")
@ -1,7 +1,6 @@
package net.corda.nodeapi.internal.network
import com.typesafe.config.ConfigFactory
import net.corda.cordform.CordformNode.NODE_INFO_DIRECTORY
import net.corda.core.crypto.secureRandomBytes
import net.corda.core.crypto.sha256
import net.corda.core.identity.CordaX500Name
@ -11,6 +10,7 @@ import net.corda.core.node.NodeInfo
import net.corda.core.serialization.serialize
import net.corda.node.services.config.NotaryConfig
import net.corda.nodeapi.internal.DEV_ROOT_CA
import net.corda.nodeapi.internal.NODE_INFO_DIRECTORY
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.config.parseAs
import net.corda.nodeapi.internal.config.toConfig
@ -10,11 +10,11 @@
package net.corda.nodeapi.internal.network
import net.corda.cordform.CordformNode
import net.corda.core.internal.div
import net.corda.core.internal.list
import net.corda.core.internal.write
import net.corda.nodeapi.eventually
import net.corda.nodeapi.internal.NODE_INFO_DIRECTORY
import org.assertj.core.api.Assertions.assertThat
import org.junit.Before
import org.junit.Rule
@ -44,12 +44,12 @@ class NodeInfoFilesCopierTest {
private val rootPath get() = folder.root.toPath()
private val scheduler = TestScheduler()
private fun nodeDir(nodeBaseDir : String) = rootPath.resolve(nodeBaseDir).resolve(ORGANIZATION.toLowerCase())
private fun nodeDir(nodeBaseDir: String) = rootPath.resolve(nodeBaseDir).resolve(ORGANIZATION.toLowerCase())
private val node1RootPath by lazy { nodeDir(NODE_1_PATH) }
private val node2RootPath by lazy { nodeDir(NODE_2_PATH) }
private val node1AdditionalNodeInfoPath by lazy { node1RootPath.resolve(CordformNode.NODE_INFO_DIRECTORY) }
private val node2AdditionalNodeInfoPath by lazy { node2RootPath.resolve(CordformNode.NODE_INFO_DIRECTORY) }
private val node1AdditionalNodeInfoPath by lazy { node1RootPath.resolve(NODE_INFO_DIRECTORY) }
private val node2AdditionalNodeInfoPath by lazy { node2RootPath.resolve(NODE_INFO_DIRECTORY) }
private lateinit var nodeInfoFilesCopier: NodeInfoFilesCopier
@ -82,6 +82,7 @@ dependencies {
compile project(':client:rpc')
compile project(':tools:shell')
compile "net.corda.plugins:cordform-common:$gradle_plugins_version"
// Log4J: logging framework (with SLF4J bindings)
@ -11,6 +11,7 @@
package net.corda.node.services.network
import net.corda.cordform.CordformNode
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.transpose
@ -19,6 +20,9 @@ import net.corda.core.node.NodeInfo
import net.corda.core.serialization.serialize
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.config.configureDevKeyAndTrustStores
import net.corda.nodeapi.internal.NODE_INFO_DIRECTORY
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
import net.corda.nodeapi.internal.network.SignedNetworkParameters
@ -252,7 +256,7 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
private fun NodeHandle.onlySees(vararg nodes: NodeInfo) {
// Make sure the nodes aren't getting the node infos from their additional directories
val nodeInfosDir = baseDirectory / CordformNode.NODE_INFO_DIRECTORY
val nodeInfosDir = baseDirectory / NODE_INFO_DIRECTORY
if (nodeInfosDir.exists()) {
@ -15,6 +15,7 @@ import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import com.google.common.primitives.Ints
import net.corda.core.context.AuthServiceId
import net.corda.core.internal.buildNamed
import net.corda.core.internal.uncheckedCast
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.DataSourceFactory
@ -318,7 +319,7 @@ private class CaffeineCacheManager(val maxSize: Long,
return Caffeine.newBuilder()
.expireAfterWrite(timeToLiveSeconds, TimeUnit.SECONDS)
.build<K, V>()
.buildNamed<K, V>("RPCSecurityManagerShiroCache_$name")
@ -45,6 +45,7 @@ class PersistentIdentityService : SingletonSerializeAsToken(), IdentityServiceIn
fun createPKMap(): AppendOnlyPersistentMap<SecureHash, PartyAndCertificate, PersistentIdentity, String> {
return AppendOnlyPersistentMap(
toPersistentEntityKey = { it.toString() },
fromPersistentEntity = {
@ -61,6 +62,7 @@ class PersistentIdentityService : SingletonSerializeAsToken(), IdentityServiceIn
fun createX500Map(): AppendOnlyPersistentMap<CordaX500Name, SecureHash, PersistentIdentityNames, String> {
return AppendOnlyPersistentMap(
toPersistentEntityKey = { it.toString() },
fromPersistentEntity = { Pair(CordaX500Name.parse(it.name), SecureHash.parse(it.publicKeyHash)) },
toPersistentEntity = { key: CordaX500Name, value: SecureHash ->
@ -58,6 +58,7 @@ class PersistentKeyManagementService(val identityService: PersistentIdentityServ
private companion object {
fun createKeyMap(): AppendOnlyPersistentMap<PublicKey, PrivateKey, PersistentKey, String> {
return AppendOnlyPersistentMap(
toPersistentEntityKey = { it.toStringShort() },
fromPersistentEntity = {
@ -42,6 +42,7 @@ class P2PMessageDeduplicator(private val database: CordaPersistence) {
private fun createProcessedMessages(): AppendOnlyPersistentMap<DeduplicationId, MessageMeta, ProcessedMessage, String> {
return AppendOnlyPersistentMap(
toPersistentEntityKey = { it.toString },
fromPersistentEntity = { Pair(DeduplicationId(it.id), MessageMeta(it.insertionTime, it.hash, it.seqNo)) },
toPersistentEntity = { key: DeduplicationId, value: MessageMeta ->
@ -23,6 +23,7 @@ import net.corda.core.context.Trace
import net.corda.core.context.Trace.InvocationId
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.LifeCycle
import net.corda.core.internal.buildNamed
import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationDefaults
@ -163,7 +164,7 @@ class RPCServer(
log.debug { "Unsubscribing from Observable with id $key because of $cause" }
return Caffeine.newBuilder().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()).build()
return Caffeine.newBuilder().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()).buildNamed("RPCServer_observableSubscription")
fun start(activeMqServerControl: ActiveMQServerControl) {
@ -10,7 +10,6 @@
package net.corda.node.services.network
import net.corda.cordform.CordformNode
import net.corda.core.crypto.SecureHash
import net.corda.core.internal.*
import net.corda.core.node.NodeInfo
@ -21,6 +20,7 @@ import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.seconds
import net.corda.node.serialization.amqp.AMQPServerSerializationScheme
import net.corda.nodeapi.internal.NODE_INFO_DIRECTORY
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier
@ -74,7 +74,8 @@ class NodeInfoWatcher(private val nodePath: Path,
internal data class NodeInfoFromFile(val nodeInfohash: SecureHash, val lastModified: FileTime)
private val nodeInfosDir = nodePath / CordformNode.NODE_INFO_DIRECTORY
private val nodeInfosDir = nodePath / NODE_INFO_DIRECTORY
private val nodeInfoFilesMap = HashMap<Path, NodeInfoFromFile>()
val processedNodeInfoHashes: Set<SecureHash> get() = nodeInfoFilesMap.values.map { it.nodeInfohash }.toSet()
@ -84,7 +85,7 @@ class NodeInfoWatcher(private val nodePath: Path,
* Read all the files contained in [nodePath] / [CordformNode.NODE_INFO_DIRECTORY] and keep watching
* Read all the files contained in [nodePath] / [NODE_INFO_DIRECTORY] and keep watching
* the folder for further updates.
* We simply list the directory content every 5 seconds, the Java implementation of WatchService has been proven to
@ -132,7 +132,9 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence,
override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo> = nodesByKeyCache[identityKey]!!
private val nodesByKeyCache = NonInvalidatingCache<PublicKey, List<NodeInfo>>(1024) { key ->
private val nodesByKeyCache = NonInvalidatingCache<PublicKey, List<NodeInfo>>(
1024) { key ->
database.transaction { queryByIdentityKey(session, key) }
@ -150,7 +152,9 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence,
return identityByLegalNameCache.get(name)!!.orElse(null)
private val identityByLegalNameCache = NonInvalidatingCache<CordaX500Name, Optional<PartyAndCertificate>>(1024) { name ->
private val identityByLegalNameCache = NonInvalidatingCache<CordaX500Name, Optional<PartyAndCertificate>>(
1024) { name ->
Optional.ofNullable(database.transaction { queryIdentityByLegalName(session, name) })
@ -63,6 +63,7 @@ class DBTransactionStorage(cacheSizeBytes: Long, private val database: CordaPers
fun createTransactionsMap(maxSizeInBytes: Long)
: AppendOnlyPersistentMapBase<SecureHash, TxCacheValue, DBTransaction, String> {
return WeightBasedAppendOnlyPersistentMap<SecureHash, TxCacheValue, DBTransaction, String>(
name = "DBTransactionStorage_transactions",
toPersistentEntityKey = { it.toString() },
fromPersistentEntity = {
@ -216,6 +216,7 @@ class NodeAttachmentService(
// a problem somewhere else or this needs to be revisited.
private val attachmentContentCache = NonInvalidatingWeightBasedCache(
name = "NodeAttachmentService_attachmentContent",
maxWeight = attachmentContentCacheSize,
weigher = Weigher<SecureHash, Optional<Pair<Attachment, ByteArray>>> { key, value -> key.size + if (value.isPresent) value.get().second.size else 0 },
loadFunction = { Optional.ofNullable(loadAttachmentContent(it)) }
@ -236,7 +237,9 @@ class NodeAttachmentService(
private val attachmentCache = NonInvalidatingCache<SecureHash, Optional<Attachment>>(attachmentCacheBound) { key ->
private val attachmentCache = NonInvalidatingCache<SecureHash, Optional<Attachment>>(
attachmentCacheBound) { key ->
@ -58,6 +58,7 @@ class FlowsDrainingModeOperationsImpl(readPhysicalNodeId: () -> String, private
internal val map = PersistentMap(
{ key -> key },
{ entity -> entity.key to entity.value!! },
@ -112,6 +112,7 @@ class BFTNonValidatingNotaryService(
private fun createMap(): AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef> {
return AppendOnlyPersistentMap(
toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
fromPersistentEntity = {
//TODO null check will become obsolete after making DB/JPA columns not nullable
@ -87,6 +87,7 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
private val log = contextLogger()
fun createMap(): AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef> =
toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
fromPersistentEntity = {
//TODO null check will become obsolete after making DB/JPA columns not nullable
@ -71,6 +71,7 @@ class RaftUniquenessProvider(
private val log = contextLogger()
fun createMap(): AppendOnlyPersistentMap<StateRef, Pair<Long, SecureHash>, CommittedState, PersistentStateRef> =
toPersistentEntityKey = { PersistentStateRef(it) },
fromPersistentEntity = {
val txId = it.id.txId
@ -38,6 +38,7 @@ class ContractUpgradeServiceImpl : ContractUpgradeService, SingletonSerializeAsT
private companion object {
fun createContractUpgradesMap(): PersistentMap<String, String, DBContractUpgrade, String> {
return PersistentMap(
toPersistentEntityKey = { it },
fromPersistentEntity = { Pair(it.stateRef, it.upgradedContractClassName ?: "") },
toPersistentEntity = { key: String, value: String ->
@ -319,6 +319,7 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
// Open for tests to override
open class AppendOnlyPersistentMap<K, V, E, out EK>(
name: String,
toPersistentEntityKey: (K) -> EK,
fromPersistentEntity: (E) -> Pair<K, V>,
toPersistentEntity: (key: K, value: V) -> E,
@ -331,6 +332,7 @@ open class AppendOnlyPersistentMap<K, V, E, out EK>(
persistentEntityClass) {
//TODO determine cacheBound based on entity class later or with node config allowing tuning, or using some heuristic based on heap size
override val cache = NonInvalidatingCache(
name = name,
bound = cacheBound,
loadFunction = { key: K ->
// This gets called if a value is read and the cache has no Transactional for this key yet.
@ -363,6 +365,7 @@ open class AppendOnlyPersistentMap<K, V, E, out EK>(
// Same as above, but with weighted values (e.g. memory footprint sensitive).
class WeightBasedAppendOnlyPersistentMap<K, V, E, out EK>(
name: String,
toPersistentEntityKey: (K) -> EK,
fromPersistentEntity: (E) -> Pair<K, V>,
toPersistentEntity: (key: K, value: V) -> E,
@ -375,6 +378,7 @@ class WeightBasedAppendOnlyPersistentMap<K, V, E, out EK>(
persistentEntityClass) {
override val cache = NonInvalidatingWeightBasedCache(
maxWeight = maxWeight,
weigher = Weigher { key, value -> weighingFunc(key, value) },
loadFunction = { key: K ->
@ -14,18 +14,19 @@ import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.LoadingCache
import com.github.benmanes.caffeine.cache.Weigher
import net.corda.core.internal.buildNamed
class NonInvalidatingCache<K, V> private constructor(
val cache: LoadingCache<K, V>
) : LoadingCache<K, V> by cache {
constructor(bound: Long, loadFunction: (K) -> V) :
this(buildCache(bound, loadFunction))
constructor(name: String, bound: Long, loadFunction: (K) -> V) :
this(buildCache(name, bound, loadFunction))
private companion object {
private fun <K, V> buildCache(bound: Long, loadFunction: (K) -> V): LoadingCache<K, V> {
private fun <K, V> buildCache(name: String, bound: Long, loadFunction: (K) -> V): LoadingCache<K, V> {
val builder = Caffeine.newBuilder().maximumSize(bound)
return builder.build(NonInvalidatingCacheLoader(loadFunction))
return builder.buildNamed(name, NonInvalidatingCacheLoader(loadFunction))
@ -42,13 +43,13 @@ class NonInvalidatingCache<K, V> private constructor(
class NonInvalidatingWeightBasedCache<K, V> private constructor(
val cache: LoadingCache<K, V>
) : LoadingCache<K, V> by cache {
constructor (maxWeight: Long, weigher: Weigher<K, V>, loadFunction: (K) -> V) :
this(buildCache(maxWeight, weigher, loadFunction))
constructor (name: String, maxWeight: Long, weigher: Weigher<K, V>, loadFunction: (K) -> V) :
this(buildCache(name, maxWeight, weigher, loadFunction))
private companion object {
private fun <K, V> buildCache(maxWeight: Long, weigher: Weigher<K, V>, loadFunction: (K) -> V): LoadingCache<K, V> {
private fun <K, V> buildCache(name: String, maxWeight: Long, weigher: Weigher<K, V>, loadFunction: (K) -> V): LoadingCache<K, V> {
val builder = Caffeine.newBuilder().maximumWeight(maxWeight).weigher(weigher)
return builder.build(NonInvalidatingCache.NonInvalidatingCacheLoader(loadFunction))
return builder.buildNamed(name, NonInvalidatingCache.NonInvalidatingCacheLoader(loadFunction))
@ -15,20 +15,21 @@ import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.LoadingCache
import com.github.benmanes.caffeine.cache.RemovalListener
import net.corda.core.internal.buildNamed
class NonInvalidatingUnboundCache<K, V> private constructor(
val cache: LoadingCache<K, V>
) : LoadingCache<K, V> by cache {
constructor(loadFunction: (K) -> V, removalListener: RemovalListener<K, V> = RemovalListener { _, _, _ -> },
constructor(name: String, loadFunction: (K) -> V, removalListener: RemovalListener<K, V> = RemovalListener { _, _, _ -> },
keysToPreload: () -> Iterable<K> = { emptyList() }) :
this(buildCache(loadFunction, removalListener, keysToPreload))
this(buildCache(name, loadFunction, removalListener, keysToPreload))
private companion object {
private fun <K, V> buildCache(loadFunction: (K) -> V, removalListener: RemovalListener<K, V>,
private fun <K, V> buildCache(name: String, loadFunction: (K) -> V, removalListener: RemovalListener<K, V>,
keysToPreload: () -> Iterable<K>): LoadingCache<K, V> {
val builder = Caffeine.newBuilder().removalListener(removalListener).executor(SameThreadExecutor.getExecutor())
return builder.build(NonInvalidatingCacheLoader(loadFunction)).apply {
return builder.buildNamed(name, NonInvalidatingCacheLoader(loadFunction)).apply {
@ -20,6 +20,7 @@ import java.util.*
* Implements an unbound caching layer on top of a table accessed via Hibernate mapping.
class PersistentMap<K : Any, V, E, out EK>(
name: String,
val toPersistentEntityKey: (K) -> EK,
val fromPersistentEntity: (E) -> Pair<K, V>,
val toPersistentEntity: (key: K, value: V) -> E,
@ -31,6 +32,7 @@ class PersistentMap<K : Any, V, E, out EK>(
private val cache = NonInvalidatingUnboundCache(
loadFunction = { key -> Optional.ofNullable(loadValue(key)) },
removalListener = ExplicitRemoval(toPersistentEntityKey, persistentEntityClass)
@ -13,7 +13,6 @@ package net.corda.node.services.network
import com.google.common.jimfs.Configuration.unix
import com.google.common.jimfs.Jimfs
import com.nhaarman.mockito_kotlin.*
import net.corda.cordform.CordformNode.NODE_INFO_DIRECTORY
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.sign
@ -25,6 +24,7 @@ import net.corda.core.node.NodeInfo
import net.corda.core.serialization.serialize
import net.corda.core.utilities.millis
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.nodeapi.internal.NODE_INFO_DIRECTORY
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier
@ -12,11 +12,11 @@ package net.corda.node.services.network
import com.google.common.jimfs.Configuration
import com.google.common.jimfs.Jimfs
import net.corda.cordform.CordformNode
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.internal.size
import net.corda.core.node.services.KeyManagementService
import net.corda.nodeapi.internal.NODE_INFO_DIRECTORY
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier
import net.corda.testing.core.ALICE_NAME
@ -61,7 +61,7 @@ class NodeInfoWatcherTest {
val identityService = makeTestIdentityService()
keyManagementService = MockKeyManagementService(identityService)
nodeInfoWatcher = NodeInfoWatcher(tempFolder.root.toPath(), scheduler)
nodeInfoPath = tempFolder.root.toPath() / CordformNode.NODE_INFO_DIRECTORY
nodeInfoPath = tempFolder.root.toPath() / NODE_INFO_DIRECTORY
@ -271,6 +271,7 @@ class AppendOnlyPersistentMapTest(var scenario: Scenario) {
class TestMap : AppendOnlyPersistentMap<Long, String, PersistentMapEntry, Long>(
toPersistentEntityKey = { it },
fromPersistentEntity = { Pair(it.key, it.value) },
toPersistentEntity = { key: Long, value: String ->
@ -16,6 +16,7 @@ class PersistentMapTests {
//create a test map using an existing db table
private fun createTestMap(): PersistentMap<String, String, ContractUpgradeServiceImpl.DBContractUpgrade, String> {
return PersistentMap(
toPersistentEntityKey = { it },
fromPersistentEntity = { Pair(it.stateRef, it.upgradedContractClassName ?: "") },
toPersistentEntity = { key: String, value: String ->
@ -17,21 +17,6 @@ apply plugin: 'net.corda.plugins.cordapp'
apply plugin: 'net.corda.plugins.cordformation'
apply plugin: 'maven-publish'
sourceSets {
integrationTest {
kotlin {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integration-test/kotlin')
configurations {
integrationTestCompile.extendsFrom testCompile
integrationTestRuntime.extendsFrom testRuntime
dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
@ -58,13 +43,46 @@ dependencies {
task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar']) {
// CordformationDefinition is an experimental feature
definitionClass = 'net.corda.bank.BankOfCordaCordform'
task integrationTest(type: Test, dependsOn: []) {
testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath
nodeDefaults {
cordapp project(':finance')
node {
name "O=Notary Service,L=Zurich,C=CH"
notary = [validating: true]
p2pPort 10002
rpcSettings {
address "localhost:10003"
adminAddress "localhost:10004"
extraConfig = [h2Settings: [address: "localhost:10016"]]
node {
name "O=BankOfCorda,L=London,C=GB"
p2pPort 10005
rpcSettings {
address "localhost:10006"
adminAddress "localhost:10015"
webPort 10007
rpcUsers = [[user: "bankUser", password: "test", permissions: ["ALL"]]]
extraConfig = [
custom : [issuableCurrencies: ["USD"]],
h2Settings: [address: "localhost:10017"]
node {
name "O=BigCorporation,L=New York,C=US"
p2pPort 10008
rpcSettings {
address "localhost:10009"
adminAddress "localhost:10011"
webPort 10010
rpcUsers = [[user: "bigCorpUser", password: "test", permissions: ["ALL"]]]
extraConfig = [
h2Settings: [address: "localhost:10018"]
idea {
@ -1,51 +0,0 @@
* R3 Proprietary and Confidential
* Copyright (c) 2018 R3 Limited. All rights reserved.
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
package net.corda.bank
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.contracts.withoutIssuer
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.finance.DOLLARS
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.utils.sumCash
import net.corda.testing.core.BOC_NAME
import net.corda.testing.internal.IntegrationTest
import net.corda.testing.internal.IntegrationTestSchemas
import net.corda.testing.node.internal.demorun.nodeRunner
import org.assertj.core.api.Assertions.assertThat
import org.junit.ClassRule
import org.junit.Test
class BankOfCordaCordformTest : IntegrationTest() {
companion object {
val databaseSchemas = IntegrationTestSchemas("NotaryService", "BankOfCorda", BIGCORP_NAME.organisation)
fun `run demo`() {
BankOfCordaCordform().nodeRunner().scanPackages(listOf("net.corda.finance")).deployAndRunNodesAndThen {
CordaRPCClient(NetworkHostAndPort("localhost", BOC_RPC_PORT)).use(BOC_RPC_USER, BOC_RPC_PWD) {
assertThat(it.proxy.vaultQuery(Cash.State::class.java).states).isEmpty() // All of the issued cash is transferred
CordaRPCClient(NetworkHostAndPort("localhost", BIGCORP_RPC_PORT)).use(BIGCORP_RPC_USER, BIGCORP_RPC_PWD) {
val cashStates = it.proxy.vaultQuery(Cash.State::class.java).states.map { it.state.data }
val knownOwner = it.proxy.wellKnownPartyFromAnonymous(cashStates.map { it.owner }.toSet().single())
val totalCash = cashStates.sumCash()
@ -1,159 +0,0 @@
* R3 Proprietary and Confidential
* Copyright (c) 2018 R3 Limited. All rights reserved.
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
package net.corda.bank
import joptsimple.OptionParser
import net.corda.bank.api.BankOfCordaClientApi
import net.corda.bank.api.BankOfCordaWebApi.IssueRequestParams
import net.corda.cordform.CordappDependency
import net.corda.cordform.CordformContext
import net.corda.cordform.CordformDefinition
import net.corda.core.contracts.Amount
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.VisibleForTesting
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.Permissions.Companion.all
import net.corda.node.services.config.NotaryConfig
import net.corda.testing.node.internal.demorun.*
import net.corda.testing.core.BOC_NAME
import net.corda.testing.node.User
import java.util.*
import kotlin.system.exitProcess
val BIGCORP_NAME = CordaX500Name(organisation = "BigCorporation", locality = "New York", country = "US")
private val NOTARY_NAME = CordaX500Name(organisation = "Notary Service", locality = "Zurich", country = "CH")
const val BOC_RPC_PORT = 10006
const val BIGCORP_RPC_PORT = 10009
private const val BOC_RPC_ADMIN_PORT = 10015
private const val BOC_WEB_PORT = 10007
const val BOC_RPC_USER = "bankUser"
const val BOC_RPC_PWD = "test"
const val BIGCORP_RPC_USER = "bigCorpUser"
const val BIGCORP_RPC_PWD = "test"
class BankOfCordaCordform : CordformDefinition() {
init {
node {
notary(NotaryConfig(validating = true))
rpcSettings {
extraConfig = mapOf("h2Settings" to mapOf("address" to "localhost:10016"))
node {
extraConfig = mapOf("custom" to mapOf("issuableCurrencies" to listOf("USD")),
"h2Settings" to mapOf("address" to "localhost:10017"))
rpcSettings {
rpcUsers(User(BOC_RPC_USER, BOC_RPC_PWD, setOf(all())))
node {
rpcSettings {
rpcUsers(User(BIGCORP_RPC_USER, BIGCORP_RPC_PWD, setOf(all())))
extraConfig = mapOf("h2Settings" to mapOf("address" to "localhost:10018"))
override fun setup(context: CordformContext) = Unit
override fun getCordappDependencies(): List<CordappDependency> {
return listOf(CordappDependency(":finance"))
object DeployNodes {
fun main(args: Array<String>) {
object IssueCash {
fun main(args: Array<String>) {
val parser = OptionParser()
val roleArg = parser.accepts("role").withRequiredArg().ofType(Role::class.java).describedAs("[ISSUER|ISSUE_CASH_RPC|ISSUE_CASH_WEB]")
val quantity = parser.accepts("quantity").withOptionalArg().ofType(Long::class.java)
val currency = parser.accepts("currency").withOptionalArg().ofType(String::class.java).describedAs("[GBP|USD|CHF|EUR]")
val options = try {
} catch (e: Exception) {
val role = options.valueOf(roleArg)!!
val amount = Amount(options.valueOf(quantity), Currency.getInstance(options.valueOf(currency)))
when (role) {
println("Requesting Cash via RPC ...")
val result = requestRpcIssue(amount)
println("Success!! Your transaction receipt is ${result.tx.id}")
println("Requesting Cash via Web ...")
println("Successfully processed Cash Issue request")
fun requestRpcIssue(amount: Amount<Currency>): SignedTransaction {
return BankOfCordaClientApi.requestRPCIssue(NetworkHostAndPort("localhost", BOC_RPC_PORT), createParams(amount, NOTARY_NAME))
private fun requestWebIssue(amount: Amount<Currency>) {
BankOfCordaClientApi.requestWebIssue(NetworkHostAndPort("localhost", BOC_WEB_PORT), createParams(amount, NOTARY_NAME))
private fun createParams(amount: Amount<Currency>, notaryName: CordaX500Name): IssueRequestParams {
return IssueRequestParams(amount, BIGCORP_NAME, "1", BOC_NAME, notaryName)
private fun printHelp(parser: OptionParser) {
Usage: bank-of-corda --role ISSUER
bank-of-corda --role (ISSUE_CASH_RPC|ISSUE_CASH_WEB) --quantity <quantity> --currency <currency>
Please refer to the documentation in docs/build/index.html for more info.
enum class Role {
@ -0,0 +1,77 @@
package net.corda.bank
import joptsimple.OptionParser
import net.corda.bank.api.BankOfCordaClientApi
import net.corda.bank.api.BankOfCordaWebApi
import net.corda.core.contracts.Amount
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.VisibleForTesting
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.testing.core.BOC_NAME
import java.util.*
import kotlin.system.exitProcess
object IssueCash {
private val NOTARY_NAME = CordaX500Name(organisation = "Notary Service", locality = "Zurich", country = "CH")
private val BIGCORP_NAME = CordaX500Name(organisation = "BigCorporation", locality = "New York", country = "US")
private const val BOC_RPC_PORT = 10006
private const val BOC_WEB_PORT = 10007
fun main(args: Array<String>) {
val parser = OptionParser()
val roleArg = parser.accepts("role").withRequiredArg().ofType(Role::class.java).describedAs("[ISSUER|ISSUE_CASH_RPC|ISSUE_CASH_WEB]")
val quantity = parser.accepts("quantity").withOptionalArg().ofType(Long::class.java)
val currency = parser.accepts("currency").withOptionalArg().ofType(String::class.java).describedAs("[GBP|USD|CHF|EUR]")
val options = try {
} catch (e: Exception) {
val role = options.valueOf(roleArg)!!
val amount = Amount(options.valueOf(quantity), Currency.getInstance(options.valueOf(currency)))
when (role) {
println("Requesting Cash via RPC ...")
val result = requestRpcIssue(amount)
println("Success!! Your transaction receipt is ${result.tx.id}")
println("Requesting Cash via Web ...")
println("Successfully processed Cash Issue request")
fun requestRpcIssue(amount: Amount<Currency>): SignedTransaction {
return BankOfCordaClientApi.requestRPCIssue(NetworkHostAndPort("localhost", BOC_RPC_PORT), createParams(amount, NOTARY_NAME))
private fun requestWebIssue(amount: Amount<Currency>) {
BankOfCordaClientApi.requestWebIssue(NetworkHostAndPort("localhost", BOC_WEB_PORT), createParams(amount, NOTARY_NAME))
private fun createParams(amount: Amount<Currency>, notaryName: CordaX500Name): BankOfCordaWebApi.IssueRequestParams {
return BankOfCordaWebApi.IssueRequestParams(amount, BIGCORP_NAME, "1", BOC_NAME, notaryName)
private fun printHelp(parser: OptionParser) {
Usage: bank-of-corda --role ISSUER
bank-of-corda --role (ISSUE_CASH_RPC|ISSUE_CASH_WEB) --quantity <quantity> --currency <currency>
Please refer to the documentation in docs/build/index.html for more info.
enum class Role {
@ -10,8 +10,6 @@
package net.corda.bank.api
import net.corda.bank.BOC_RPC_PWD
import net.corda.bank.BOC_RPC_USER
import net.corda.bank.api.BankOfCordaWebApi.IssueRequestParams
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.messaging.startFlow
@ -26,6 +24,9 @@ import net.corda.testing.http.HttpApi
* Interface for communicating with Bank of Corda node
object BankOfCordaClientApi {
const val BOC_RPC_USER = "bankUser"
const val BOC_RPC_PWD = "test"
@ -50,7 +51,8 @@ object BankOfCordaClientApi {
// Resolve parties via RPC
val issueToParty = rpc.wellKnownPartyFromX500Name(params.issueToPartyName)
?: throw IllegalStateException("Unable to locate ${params.issueToPartyName} in Network Map Service")
val notaryLegalIdentity = rpc.notaryIdentities().firstOrNull { it.name == params.notaryName } ?: throw IllegalStateException("Couldn't locate notary ${params.notaryName} in NetworkMapCache")
val notaryLegalIdentity = rpc.notaryIdentities().firstOrNull { it.name == params.notaryName }
?: throw IllegalStateException("Couldn't locate notary ${params.notaryName} in NetworkMapCache")
val anonymous = true
val issuerBankPartyRef = OpaqueBytes.of(params.issuerBankPartyRef.toByte())
@ -59,19 +59,229 @@ publishing {
task deployNodes(dependsOn: ['deployNodesSingle', 'deployNodesRaft', 'deployNodesBFT', 'deployNodesCustom'])
task deployNodesSingle(type: Cordform, dependsOn: 'jar') {
definitionClass = 'net.corda.notarydemo.SingleNotaryCordform'
directory file("$buildDir/nodes/nodesSingle")
nodeDefaults {
extraConfig = [h2Settings: [address: "localhost:0"]]
node {
name "O=Alice Corp,L=Madrid,C=ES"
p2pPort 10002
rpcSettings {
address "localhost:10003"
adminAddress "localhost:10103"
rpcUsers = [[user: "demou", password: "demop", permissions: ["ALL"]]]
node {
name "O=Bob Plc,L=Rome,C=IT"
p2pPort 10005
rpcSettings {
address "localhost:10006"
adminAddress "localhost:10106"
node {
name "O=Notary Service,L=Zurich,C=CH"
p2pPort 10009
rpcSettings {
address "localhost:10010"
adminAddress "localhost:10110"
notary = [validating: true]
task deployNodesCustom(type: Cordform, dependsOn: 'jar') {
definitionClass = 'net.corda.notarydemo.CustomNotaryCordform'
directory file("$buildDir/nodes/nodesCustom")
nodeDefaults {
extraConfig = [h2Settings: [address: "localhost:0"]]
node {
name "O=Alice Corp,L=Madrid,C=ES"
p2pPort 10002
rpcSettings {
address "localhost:10003"
adminAddress "localhost:10103"
rpcUsers = [[user: "demou", password: "demop", permissions: ["ALL"]]]
node {
name "O=Bob Plc,L=Rome,C=IT"
p2pPort 10005
rpcSettings {
address "localhost:10006"
adminAddress "localhost:10106"
node {
name "O=Notary Service,L=Zurich,C=CH"
p2pPort 10009
rpcSettings {
address "localhost:10010"
adminAddress "localhost:10110"
notary = [validating: true, "custom": true]
task deployNodesRaft(type: Cordform, dependsOn: 'jar') {
definitionClass = 'net.corda.notarydemo.RaftNotaryCordform'
directory file("$buildDir/nodes/nodesRaft")
nodeDefaults {
extraConfig = [h2Settings: [address: "localhost:0"]]
node {
name "O=Alice Corp,L=Madrid,C=ES"
p2pPort 10002
rpcSettings {
address "localhost:10003"
adminAddress "localhost:10103"
rpcUsers = [[user: "demou", password: "demop", permissions: ["ALL"]]]
node {
name "O=Bob Plc,L=Rome,C=IT"
p2pPort 10005
rpcSettings {
address "localhost:10006"
adminAddress "localhost:10106"
node {
name "O=Notary Service 0,L=Zurich,C=CH"
p2pPort 10009
rpcSettings {
address "localhost:10010"
adminAddress "localhost:10110"
notary = [
validating: true,
serviceLegalName: "O=Raft,L=Zurich,C=CH",
raft: [
nodeAddress: "localhost:10008"
node {
name "O=Notary Service 1,L=Zurich,C=CH"
p2pPort 10013
rpcSettings {
address "localhost:10014"
adminAddress "localhost:10114"
notary = [
validating: true,
serviceLegalName: "O=Raft,L=Zurich,C=CH",
raft: [
nodeAddress: "localhost:10012",
clusterAddresses: ["localhost:10008"]
node {
name "O=Notary Service 2,L=Zurich,C=CH"
p2pPort 10017
rpcSettings {
address "localhost:10018"
adminAddress "localhost:10118"
notary = [
validating: true,
serviceLegalName: "O=Raft,L=Zurich,C=CH",
raft: [
nodeAddress: "localhost:10016",
clusterAddresses: ["localhost:10008"]
task deployNodesBFT(type: Cordform, dependsOn: 'jar') {
definitionClass = 'net.corda.notarydemo.BFTNotaryCordform'
def clusterAddresses = ["localhost:11000", "localhost:11010", "localhost:11020", "localhost:11030"]
directory file("$buildDir/nodes/nodesBFT")
nodeDefaults {
extraConfig = [h2Settings: [address: "localhost:0"]]
node {
name "O=Alice Corp,L=Madrid,C=ES"
p2pPort 10002
rpcSettings {
address "localhost:10003"
adminAddress "localhost:10103"
rpcUsers = [[user: "demou", password: "demop", permissions: ["ALL"]]]
node {
name "O=Bob Plc,L=Rome,C=IT"
p2pPort 10005
rpcSettings {
address "localhost:10006"
adminAddress "localhost:10106"
node {
name "O=Notary Service 0,L=Zurich,C=CH"
p2pPort 10009
rpcSettings {
address "localhost:10010"
adminAddress "localhost:10110"
notary = [
validating: false,
serviceLegalName: "O=BFT,L=Zurich,C=CH",
bftSMaRt: [
replicaId: 0,
clusterAddresses: clusterAddresses
node {
name "O=Notary Service 1,L=Zurich,C=CH"
p2pPort 10013
rpcSettings {
address "localhost:10014"
adminAddress "localhost:10114"
notary = [
validating: false,
serviceLegalName: "O=BFT,L=Zurich,C=CH",
bftSMaRt: [
replicaId: 0,
clusterAddresses: clusterAddresses
node {
name "O=Notary Service 2,L=Zurich,C=CH"
p2pPort 10017
rpcSettings {
address "localhost:10018"
adminAddress "localhost:10118"
notary = [
validating: false,
serviceLegalName: "O=BFT,L=Zurich,C=CH",
bftSMaRt: [
replicaId: 0,
clusterAddresses: clusterAddresses
node {
name "O=Notary Service 3,L=Zurich,C=CH"
p2pPort 10021
rpcSettings {
address "localhost:10022"
adminAddress "localhost:10122"
notary = [
validating: false,
serviceLegalName: "O=BFT,L=Zurich,C=CH",
bftSMaRt: [
replicaId: 0,
clusterAddresses: clusterAddresses
task notarise(type: JavaExec) {
@ -1,103 +0,0 @@
* R3 Proprietary and Confidential
* Copyright (c) 2018 R3 Limited. All rights reserved.
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
package net.corda.notarydemo
import net.corda.cordform.CordformContext
import net.corda.cordform.CordformDefinition
import net.corda.cordform.CordformNode
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.config.BFTSMaRtConfiguration
import net.corda.node.services.config.NotaryConfig
import net.corda.node.services.transactions.minCorrectReplicas
import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.testing.node.internal.demorun.*
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import java.nio.file.Paths
fun main(args: Array<String>) = BFTNotaryCordform().nodeRunner().deployAndRunNodes()
private const val clusterSize = 4 // Minimum size that tolerates a faulty replica.
private val notaryNames = createNotaryNames(clusterSize)
// This is not the intended final design for how to use CordformDefinition, please treat this as experimental and DO
// NOT use this as a design to copy.
class BFTNotaryCordform : CordformDefinition() {
private val clusterName = CordaX500Name("BFT", "Zurich", "CH")
init {
nodesDirectory = Paths.get("build", "nodes", "nodesBFT")
node {
rpcSettings {
extraConfig = mapOf("h2Settings" to mapOf("address" to "localhost:0"))
node {
rpcSettings {
extraConfig = mapOf("h2Settings" to mapOf("address" to "localhost:0"))
val clusterAddresses = (0 until clusterSize).map { NetworkHostAndPort("localhost", 11000 + it * 10) }
fun notaryNode(replicaId: Int, configure: CordformNode.() -> Unit) = node {
notary(NotaryConfig(validating = false, serviceLegalName = clusterName, bftSMaRt = BFTSMaRtConfiguration(replicaId, clusterAddresses)))
extraConfig = mapOf("h2Settings" to mapOf("address" to "localhost:0"))
notaryNode(0) {
rpcSettings {
notaryNode(1) {
rpcSettings {
notaryNode(2) {
rpcSettings {
notaryNode(3) {
rpcSettings {
override fun setup(context: CordformContext) {
@ -1,19 +0,0 @@
* R3 Proprietary and Confidential
* Copyright (c) 2018 R3 Limited. All rights reserved.
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
package net.corda.notarydemo
import net.corda.testing.node.internal.demorun.nodeRunner
fun main(args: Array<String>) {
listOf(SingleNotaryCordform(), RaftNotaryCordform(), BFTNotaryCordform()).map { it.nodeRunner() }.forEach {
@ -1,62 +0,0 @@
* R3 Proprietary and Confidential
* Copyright (c) 2018 R3 Limited. All rights reserved.
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
package net.corda.notarydemo
import net.corda.cordform.CordformContext
import net.corda.cordform.CordformDefinition
import net.corda.node.services.config.NotaryConfig
import net.corda.testing.node.internal.demorun.*
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import java.nio.file.Paths
fun main(args: Array<String>) = CustomNotaryCordform().nodeRunner().deployAndRunNodes()
class CustomNotaryCordform : CordformDefinition() {
init {
nodesDirectory = Paths.get("build", "nodes", "nodesCustom")
node {
rpcSettings {
extraConfig = mapOf("h2Settings" to mapOf("address" to "localhost:0"))
node {
rpcSettings {
extraConfig = mapOf("h2Settings" to mapOf("address" to "localhost:0"))
node {
rpcSettings {
notary(NotaryConfig(validating = true, custom = true))
extraConfig = mapOf("h2Settings" to mapOf("address" to "localhost:0"))
override fun setup(context: CordformContext) {}
@ -27,7 +27,7 @@ import java.util.concurrent.Future
fun main(args: Array<String>) {
val address = NetworkHostAndPort("localhost", 10003)
println("Connecting to the recipient node ($address)")
CordaRPCClient(address).start(notaryDemoUser.username, notaryDemoUser.password).use {
CordaRPCClient(address).start("demou", "demop").use {
@ -1,96 +0,0 @@
* R3 Proprietary and Confidential
* Copyright (c) 2018 R3 Limited. All rights reserved.
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
package net.corda.notarydemo
import net.corda.cordform.CordformContext
import net.corda.cordform.CordformDefinition
import net.corda.cordform.CordformNode
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.config.NotaryConfig
import net.corda.node.services.config.RaftConfig
import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.node.internal.demorun.*
import java.nio.file.Paths
fun main(args: Array<String>) = RaftNotaryCordform().nodeRunner().deployAndRunNodes()
internal fun createNotaryNames(clusterSize: Int) = (0 until clusterSize).map { CordaX500Name("Notary Service $it", "Zurich", "CH") }
private val notaryNames = createNotaryNames(3)
// This is not the intended final design for how to use CordformDefinition, please treat this as experimental and DO
// NOT use this as a design to copy.
class RaftNotaryCordform : CordformDefinition() {
private val clusterName = CordaX500Name("Raft", "Zurich", "CH")
init {
nodesDirectory = Paths.get("build", "nodes", "nodesRaft")
node {
rpcSettings {
extraConfig = mapOf("h2Settings" to mapOf("address" to "localhost:0"))
node {
rpcSettings {
extraConfig = mapOf("h2Settings" to mapOf("address" to "localhost:0"))
fun notaryNode(index: Int, nodePort: Int, clusterPort: Int? = null, configure: CordformNode.() -> Unit) = node {
val clusterAddresses = if (clusterPort != null) listOf(NetworkHostAndPort("localhost", clusterPort)) else emptyList()
notary(NotaryConfig(validating = true, serviceLegalName = clusterName, raft = RaftConfig(NetworkHostAndPort("localhost", nodePort), clusterAddresses)))
extraConfig = mapOf("h2Settings" to mapOf("address" to "localhost:0"))
notaryNode(0, 10008) {
rpcSettings {
notaryNode(1, 10012, 10008) {
rpcSettings {
notaryNode(2, 10016, 10008) {
rpcSettings {
override fun setup(context: CordformContext) {
@ -1,68 +0,0 @@
* R3 Proprietary and Confidential
* Copyright (c) 2018 R3 Limited. All rights reserved.
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
package net.corda.notarydemo
import net.corda.cordform.CordformContext
import net.corda.cordform.CordformDefinition
import net.corda.node.services.Permissions.Companion.all
import net.corda.node.services.config.NotaryConfig
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.node.User
import net.corda.testing.node.internal.demorun.*
import java.nio.file.Paths
fun main(args: Array<String>) = SingleNotaryCordform().nodeRunner().deployAndRunNodes()
val notaryDemoUser = User("demou", "demop", setOf(all()))
// This is not the intended final design for how to use CordformDefinition, please treat this as experimental and DO
// NOT use this as a design to copy.
class SingleNotaryCordform : CordformDefinition() {
init {
nodesDirectory = Paths.get("build", "nodes", "nodesSingle")
node {
rpcSettings {
extraConfig = mapOf("h2Settings" to mapOf("address" to "localhost:0"))
node {
rpcSettings {
extraConfig = mapOf("h2Settings" to mapOf("address" to "localhost:0"))
node {
rpcSettings {
notary(NotaryConfig(validating = true))
extraConfig = mapOf("h2Settings" to mapOf("address" to "localhost:0"))
override fun setup(context: CordformContext) {}
@ -16,6 +16,7 @@ import net.corda.core.DeleteForDJVM
import net.corda.core.KeepForDJVM
import net.corda.core.contracts.Attachment
import net.corda.core.crypto.SecureHash
import net.corda.core.internal.buildNamed
import net.corda.core.internal.copyBytes
import net.corda.core.serialization.*
import net.corda.core.utilities.ByteSequence
@ -86,7 +87,7 @@ data class SerializationContextImpl @JvmOverloads constructor(override val prefe
internal class AttachmentsClassLoaderBuilder(private val properties: Map<Any, Any>, private val deserializationClassLoader: ClassLoader) {
private val cache: Cache<List<SecureHash>, AttachmentsClassLoader> = Caffeine.newBuilder().weakValues().maximumSize(1024).build()
private val cache: Cache<List<SecureHash>, AttachmentsClassLoader> = Caffeine.newBuilder().weakValues().maximumSize(1024).buildNamed("SerializationScheme_attachmentClassloader")
fun build(attachmentHashes: List<SecureHash>): AttachmentsClassLoader? {
val serializationContext = properties[serializationContextKey] as? SerializeAsTokenContext ?: return null // Some tests don't set one.
@ -111,7 +111,7 @@ class DeserializationInput constructor(
} catch (nse: NotSerializableException) {
throw nse
} catch (t: Throwable) {
throw NotSerializableException("Unexpected throwable: ${t.message}").apply { initCause(t) }
throw NotSerializableException("Internal deserialization failure: ${t.javaClass.name}: ${t.message}").apply { initCause(t) }
} finally {
@ -36,7 +36,6 @@ sourceSets {
dependencies {
compile project(':test-utils')
compile "net.corda.plugins:cordform-common:$gradle_plugins_version"
// Integration test helpers
testCompile "org.assertj:assertj-core:$assertj_version"
@ -10,15 +10,12 @@
package net.corda.testing.node.internal
import com.google.common.collect.HashMultimap
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigRenderOptions
import com.typesafe.config.ConfigValueFactory
import net.corda.client.rpc.internal.createCordaRPCClientWithInternalSslAndClassLoader
import net.corda.cordform.CordformContext
import net.corda.cordform.CordformNode
import net.corda.core.concurrent.CordaFuture
import net.corda.core.concurrent.firstOf
import net.corda.core.identity.CordaX500Name
@ -40,6 +37,7 @@ import net.corda.node.utilities.registration.NodeRegistrationHelper
import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.addShutdownHook
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
import net.corda.nodeapi.internal.config.parseAs
import net.corda.nodeapi.internal.config.toConfig
@ -60,8 +58,6 @@ import net.corda.testing.internal.setGlobalSerialization
import net.corda.testing.node.ClusterSpec
import net.corda.testing.node.NotarySpec
import net.corda.testing.node.User
import net.corda.testing.node.internal.DriverDSLImpl.ClusterType.NON_VALIDATING_RAFT
import net.corda.testing.node.internal.DriverDSLImpl.ClusterType.VALIDATING_RAFT
import net.corda.testing.node.internal.DriverDSLImpl.Companion.cordappsInCurrentAndAdditionalPackages
import okhttp3.OkHttpClient
import okhttp3.Request
@ -344,97 +340,6 @@ class DriverDSLImpl(
NON_VALIDATING_BFT(false, CordaX500Name("BFT", "Zurich", "CH"))
// TODO remove this
internal fun startCordformNodes(cordforms: List<CordformNode>): CordaFuture<*> {
check(notarySpecs.isEmpty()) { "Specify notaries in the CordformDefinition" }
check(compatibilityZone == null) { "Cordform nodes cannot be run with compatibilityZoneURL" }
val clusterNodes = HashMultimap.create<ClusterType, CordaX500Name>()
val notaryInfos = ArrayList<NotaryInfo>()
// Go though the node definitions and pick out the notaries so that we can generate their identities to be used
// in the network parameters
for (cordform in cordforms) {
if (cordform.notary == null) continue
val name = CordaX500Name.parse(cordform.name)
val notaryConfig = ConfigFactory.parseMap(cordform.notary).parseAs<NotaryConfig>()
// We need to first group the nodes that form part of a cluster. We assume for simplicity that nodes of the
// same cluster type and validating flag are part of the same cluster.
when {
notaryConfig.raft != null -> {
val key = if (notaryConfig.validating) VALIDATING_RAFT else NON_VALIDATING_RAFT
clusterNodes.put(key, name)
notaryConfig.bftSMaRt != null -> clusterNodes.put(ClusterType.NON_VALIDATING_BFT, name)
else -> {
// We have all we need here to generate the identity for single node notaries
val identity = DevIdentityGenerator.installKeyStoreWithNodeIdentity(baseDirectory(name), legalName = name)
notaryInfos += NotaryInfo(identity, notaryConfig.validating)
clusterNodes.asMap().forEach { type, nodeNames ->
val identity = if (type == ClusterType.NON_VALIDATING_RAFT || type == ClusterType.VALIDATING_RAFT) {
dirs = nodeNames.map { baseDirectory(it) },
notaryName = type.clusterName
} else {
dirs = nodeNames.map { baseDirectory(it) },
notaryName = type.clusterName
notaryInfos += NotaryInfo(identity, type.validating)
val localNetworkMap = LocalNetworkMap(notaryInfos)
return cordforms.map {
val startedNode = startCordformNode(it, localNetworkMap)
if (it.webAddress != null) {
// Start a webserver if an address for it was specified
startedNode.flatMap { startWebserver(it) }
} else {
// TODO remove this
private fun startCordformNode(cordform: CordformNode, localNetworkMap: LocalNetworkMap): CordaFuture<NodeHandle> {
val name = CordaX500Name.parse(cordform.name)
// TODO We shouldn't have to allocate an RPC or web address if they're not specified. We're having to do this because of startNodeInternal
val rpcAddress = if (cordform.rpcAddress == null) {
val overrides = mutableMapOf<String, Any>("rpcSettings.address" to portAllocation.nextHostAndPort().toString())
cordform.config.apply {
overrides += "rpcSettings.adminAddress" to portAllocation.nextHostAndPort().toString()
} else {
val overrides = mutableMapOf<String, Any>()
cordform.config.apply {
overrides += "rpcSettings.adminAddress" to portAllocation.nextHostAndPort().toString()
val webAddress = cordform.webAddress?.let { NetworkHostAndPort.parse(it) } ?: portAllocation.nextHostAndPort()
val notary = if (cordform.notary != null) mapOf("notary" to cordform.notary) else emptyMap()
val rpcUsers = cordform.rpcUsers
val rawConfig = cordform.config + rpcAddress + notary + mapOf(
"rpcUsers" to if (rpcUsers.isEmpty()) defaultRpcUserList else rpcUsers
val typesafe = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
configOverrides = rawConfig.toNodeOnly()
val config = NodeConfig(typesafe).checkAndOverrideForInMemoryDB()
return startNodeInternal(config, webAddress, null, "512m", localNetworkMap, emptySet())
private fun queryWebserver(handle: NodeHandle, process: Process): WebserverHandle {
val protocol = if ((handle as NodeHandleInternal).useHTTPS) "https://" else "http://"
@ -1045,7 +950,7 @@ private class NetworkVisibilityController {
interface InternalDriverDSL : DriverDSL, CordformContext {
interface InternalDriverDSL : DriverDSL {
private companion object {
private val DEFAULT_POLL_INTERVAL = 500.millis
private const val DEFAULT_WARN_COUNT = 120
@ -1053,7 +958,7 @@ interface InternalDriverDSL : DriverDSL, CordformContext {
val shutdownManager: ShutdownManager
override fun baseDirectory(nodeName: String): Path = baseDirectory(CordaX500Name.parse(nodeName))
fun baseDirectory(nodeName: String): Path = baseDirectory(CordaX500Name.parse(nodeName))
* Polls a function until it returns a non-null value. Note that there is no timeout on the polling.
@ -1287,4 +1192,3 @@ fun DriverDSL.startNode(providedName: CordaX500Name, devMode: Boolean, parameter
return startNode(parameters, providedName = providedName, customOverrides = customOverrides)
@ -1,88 +0,0 @@
* R3 Proprietary and Confidential
* Copyright (c) 2018 R3 Limited. All rights reserved.
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
package net.corda.testing.node.internal.demorun
import net.corda.cordform.CordformDefinition
import net.corda.cordform.CordformNode
import net.corda.core.internal.deleteRecursively
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.testing.driver.JmxPolicy
import net.corda.testing.driver.PortAllocation
import net.corda.testing.node.internal.DriverDSLImpl.Companion.cordappsInCurrentAndAdditionalPackages
import net.corda.testing.node.internal.internalDriver
* Creates a demo runner for this cordform definition
fun CordformDefinition.nodeRunner() = CordformNodeRunner(this)
* A node runner creates and runs nodes for a given [[CordformDefinition]].
class CordformNodeRunner(private val cordformDefinition: CordformDefinition) {
private var extraPackagesToScan = emptyList<String>()
* Builder method to sets the extra cordapp scan packages
fun scanPackages(packages: List<String>): CordformNodeRunner {
extraPackagesToScan = packages
return this
fun clean() {
System.err.println("Deleting: ${cordformDefinition.nodesDirectory}")
* Deploy the nodes specified in the given [CordformDefinition]. This will block until all the nodes and webservers
* have terminated.
fun deployAndRunNodes() {
runNodes(waitForAllNodesToFinish = true) { }
* Deploy the nodes specified in the given [CordformDefinition] and then execute the given [block] once all the nodes
* and webservers are up. After execution all these processes will be terminated.
fun deployAndRunNodesAndThen(block: () -> Unit) {
runNodes(waitForAllNodesToFinish = false, block = block)
private fun runNodes(waitForAllNodesToFinish: Boolean, block: () -> Unit) {
val nodes = cordformDefinition.nodeConfigurers.map { configurer -> CordformNode().also { configurer.accept(it) } }
val maxPort = nodes
.flatMap { listOf(it.p2pAddress, it.rpcAddress, it.webAddress) }
.mapNotNull { address -> address?.let { NetworkHostAndPort.parse(it).port } }
jmxPolicy = JmxPolicy(true),
driverDirectory = cordformDefinition.nodesDirectory,
// Notaries are manually specified in Cordform so we don't want the driver automatically starting any
notarySpecs = emptyList(),
// Start from after the largest port used to prevent port clash
portAllocation = PortAllocation.Incremental(maxPort + 1),
waitForAllNodesToFinish = waitForAllNodesToFinish,
cordappsForAllNodes = cordappsInCurrentAndAdditionalPackages(extraPackagesToScan)
) {
startCordformNodes(nodes).getOrThrow() // Only proceed once everything is up and running
println("All nodes and webservers are ready...")
@ -1,44 +0,0 @@
* R3 Proprietary and Confidential
* Copyright (c) 2018 R3 Limited. All rights reserved.
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
package net.corda.testing.node.internal.demorun
import net.corda.cordform.CordformDefinition
import net.corda.cordform.CordformNode
import net.corda.cordform.RpcSettings
import net.corda.cordform.SslOptions
import net.corda.core.identity.CordaX500Name
import net.corda.node.services.config.NotaryConfig
import net.corda.nodeapi.internal.config.toConfig
import net.corda.testing.node.User
fun CordformDefinition.node(configure: CordformNode.() -> Unit) {
addNode { cordformNode -> cordformNode.configure() }
fun CordformNode.name(name: CordaX500Name) = name(name.toString())
fun CordformNode.rpcUsers(vararg users: User) {
rpcUsers = users.map { it.toConfig().root().unwrapped() }
fun CordformNode.notary(notaryConfig: NotaryConfig) {
notary = notaryConfig.toConfig().root().unwrapped()
fun CordformNode.rpcSettings(configure: RpcSettings.() -> Unit) {
fun RpcSettings.ssl(configure: SslOptions.() -> Unit) {
@ -66,8 +66,6 @@ dependencies {
// Controls FX: more java FX components http://fxexperience.com/controlsfx/
compile "org.controlsfx:controlsfx:$controlsfx_version"
compile "net.corda.plugins:cordform-common:$gradle_plugins_version"
compile project(':client:rpc')
compile project(':finance')
@ -24,6 +24,7 @@ import javafx.scene.image.WritableImage
import javafx.scene.paint.Color
import javafx.scene.text.TextAlignment
import net.corda.core.crypto.SecureHash
import net.corda.core.internal.buildNamed
* (The MIT License)
@ -85,7 +86,7 @@ object IdenticonRenderer {
private const val renderingSize = 30.0
private val cache = Caffeine.newBuilder().build(CacheLoader<SecureHash, Image> { key ->
private val cache = Caffeine.newBuilder().buildNamed("IdentIconRenderer_image", CacheLoader<SecureHash, Image> { key ->
key.let { render(key.hashCode(), renderingSize) }
@ -1,92 +1,87 @@
* R3 Proprietary and Confidential
* Copyright (c) 2018 R3 Limited. All rights reserved.
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
package net.corda.notarytest
import net.corda.cordform.CordformContext
import net.corda.cordform.CordformDefinition
import net.corda.cordform.CordformNode
import net.corda.core.identity.CordaX500Name
import net.corda.node.services.Permissions
import net.corda.node.services.config.NotaryConfig
import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.testing.node.User
import net.corda.testing.node.internal.demorun.*
fun main(args: Array<String>) = JDBCNotaryCordform().nodeRunner().deployAndRunNodes()
internal val notaryDemoUser = User("demou", "demop", setOf(Permissions.all()))
class JDBCNotaryCordform : CordformDefinition() {
private val clusterName = CordaX500Name("Mysql Notary", "Zurich", "CH")
private val notaryNames = createNotaryNames(3)
private fun createNotaryNames(clusterSize: Int) = (0 until clusterSize).map {
CordaX500Name("Notary Service $it", "Zurich", "CH")
init {
fun notaryNode(index: Int, configure: CordformNode.() -> Unit) = node {
validating = true,
custom = true
extraConfig = mapOf("custom" to
"mysql" to mapOf(
"dataSource" to mapOf(
// Update the db address/port accordingly
"jdbcUrl" to "jdbc:mysql://localhost:330${6 + index}/corda?rewriteBatchedStatements=true&useSSL=false&failOverReadOnly=false",
"username" to "corda",
"password" to "awesome",
"autoCommit" to "false")
"graphiteAddress" to "performance-metrics.northeurope.cloudapp.azure.com:2004"
notaryNode(0) {
rpcSettings {
notaryNode(1) {
rpcSettings {
notaryNode(2) {
rpcSettings {
override fun setup(context: CordformContext) {
notaryNames.map { context.baseDirectory(it.toString()) },
// * R3 Proprietary and Confidential
// *
// * Copyright (c) 2018 R3 Limited. All rights reserved.
// *
// * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
// *
// * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
// */
//package net.corda.notarytest
//import net.corda.cordform.CordformContext
//import net.corda.cordform.CordformDefinition
//import net.corda.cordform.CordformNode
//import net.corda.core.identity.CordaX500Name
//import net.corda.node.services.config.NotaryConfig
//import net.corda.nodeapi.internal.DevIdentityGenerator
//fun main(args: Array<String>) = JDBCNotaryCordform().nodeRunner().deployAndRunNodes()
//class JDBCNotaryCordform : CordformDefinition() {
// private val clusterName = CordaX500Name("Mysql Notary", "Zurich", "CH")
// private val notaryNames = createNotaryNames(3)
// private fun createNotaryNames(clusterSize: Int) = (0 until clusterSize).map {
// CordaX500Name("Notary Service $it", "Zurich", "CH")
// }
// init {
// fun notaryNode(index: Int, configure: CordformNode.() -> Unit) = node {
// name(notaryNames[index])
// notary(
// NotaryConfig(
// validating = true,
// custom = true
// )
// )
// extraConfig = mapOf("custom" to
// mapOf(
// "mysql" to mapOf(
// "dataSource" to mapOf(
// // Update the db address/port accordingly
// "jdbcUrl" to "jdbc:mysql://localhost:330${6 + index}/corda?rewriteBatchedStatements=true&useSSL=false&failOverReadOnly=false",
// "username" to "corda",
// "password" to "awesome",
// "autoCommit" to "false")
// ),
// "graphiteAddress" to "performance-metrics.northeurope.cloudapp.azure.com:2004"
// )
// )
// configure()
// }
// notaryNode(0) {
// p2pPort(10009)
// rpcSettings {
// address("localhost:10010")
// adminAddress("localhost:10110")
// }
// rpcUsers(notaryDemoUser)
// }
// notaryNode(1) {
// p2pPort(10013)
// rpcSettings {
// address("localhost:10014")
// adminAddress("localhost:10114")
// }
// rpcUsers(notaryDemoUser)
// }
// notaryNode(2) {
// p2pPort(10017)
// rpcSettings {
// address("localhost:10018")
// adminAddress("localhost:10118")
// }
// rpcUsers(notaryDemoUser)
// }
// }
// override fun setup(context: CordformContext) {
// DevIdentityGenerator.generateDistributedNotarySingularIdentity(
// notaryNames.map { context.baseDirectory(it.toString()) },
// clusterName
// )
// }
@ -15,7 +15,9 @@ import net.corda.client.rpc.CordaRPCClient
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.Permissions
import net.corda.notarytest.service.JDBCLoadTestFlow
import net.corda.testing.node.User
import java.util.concurrent.TimeUnit
/** The number of test flows to run on each notary node */
@ -62,3 +64,5 @@ private fun run(rpc: CordaRPCOps, inputStateCount: Int? = null): List<Long> {
internal val notaryDemoUser = User("demou", "demop", setOf(Permissions.all()))
Reference in New Issue
Block a user