mirror of
synced 2025-02-21 09:51:57 +00:00
driver, node: Allow in-process starting of nodes, add a couple of node performance tests
This commit is contained in:
@ -1,34 +1,27 @@
package net.corda.client.rpc
import com.codahale.metrics.ConsoleReporter
import com.codahale.metrics.Gauge
import com.codahale.metrics.JmxReporter
import com.codahale.metrics.MetricRegistry
import com.google.common.base.Stopwatch
import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.messaging.RPCOps
import net.corda.core.minutes
import net.corda.core.seconds
import net.corda.core.utilities.Rate
import net.corda.core.utilities.div
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.testing.RPCDriverExposedDSLInterface
import net.corda.testing.driver.ShutdownManager
import net.corda.testing.measure
import net.corda.testing.performance.startPublishingFixedRateInjector
import net.corda.testing.performance.startReporter
import net.corda.testing.performance.startTightLoopInjector
import net.corda.testing.rpcDriver
import org.junit.Ignore
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import java.time.Duration
import java.util.*
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import javax.management.ObjectName
import kotlin.concurrent.thread
import kotlin.concurrent.withLock
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
@Ignore("Only use this locally for profiling")
@ -106,10 +99,9 @@ class RPCPerformanceTests : AbstractRPCTest() {
val numberOfRequests = overallTraffic / (2 * inputOutputSize)
val timings = Collections.synchronizedList(ArrayList<Long>())
val executor = Executors.newFixedThreadPool(8)
val totalElapsed = Stopwatch.createStarted().apply {
executor = executor,
parallelism = 8,
numberOfInjections = numberOfRequests.toInt(),
queueBound = 100
) {
@ -119,7 +111,6 @@ class RPCPerformanceTests : AbstractRPCTest() {
requestPerSecond = 1000000.0 * numberOfRequests.toDouble() / totalElapsed.toDouble(),
averageIndividualMs = timings.average() / 1000.0,
@ -135,7 +126,7 @@ class RPCPerformanceTests : AbstractRPCTest() {
fun `consumption rate`() {
rpcDriver {
val metricRegistry = startReporter()
val metricRegistry = startReporter(shutdownManager)
val proxy = testProxy(
reapInterval = 1.seconds,
@ -148,14 +139,13 @@ class RPCPerformanceTests : AbstractRPCTest() {
producerPoolBound = 8
metricRegistry = metricRegistry,
parallelism = 8,
overallDuration = 5.minutes,
injectionRate = 20000L / TimeUnit.SECONDS,
queueSizeMetricName = "$mode.QueueSize",
workDurationMetricName = "$mode.WorkDuration",
shutdownManager = this.shutdownManager,
work = {
proxy.ops.simpleReply(ByteArray(4096), 4096)
@ -177,19 +167,17 @@ class RPCPerformanceTests : AbstractRPCTest() {
consumerPoolSize = 1
val executor = Executors.newFixedThreadPool(clientParallelism)
val numberOfMessages = 1000
val bigSize = 10_000_000
val elapsed = Stopwatch.createStarted().apply {
executor = executor,
parallelism = clientParallelism,
numberOfInjections = numberOfMessages,
queueBound = 4
) {
proxy.ops.simpleReply(ByteArray(bigSize), 0)
Mbps = bigSize.toDouble() * numberOfMessages.toDouble() / elapsed * (1000000.0 / (1024.0 * 1024.0))
@ -197,120 +185,3 @@ class RPCPerformanceTests : AbstractRPCTest() {
fun measurePerformancePublishMetrics(
metricRegistry: MetricRegistry,
parallelism: Int,
overallDuration: Duration,
injectionRate: Rate,
queueSizeMetricName: String,
workDurationMetricName: String,
shutdownManager: ShutdownManager,
work: () -> Unit
) {
val workSemaphore = Semaphore(0)
metricRegistry.register(queueSizeMetricName, Gauge { workSemaphore.availablePermits() })
val workDurationTimer = metricRegistry.timer(workDurationMetricName)
val executor = Executors.newSingleThreadScheduledExecutor()
val workExecutor = Executors.newFixedThreadPool(parallelism)
val timings = Collections.synchronizedList(ArrayList<Long>())
for (i in 1 .. parallelism) {
workExecutor.submit {
try {
while (true) {
workDurationTimer.time {
Stopwatch.createStarted().apply {
} catch (throwable: Throwable) {
val injector = executor.scheduleAtFixedRate(
workSemaphore.release((injectionRate * TimeUnit.SECONDS).toInt())
shutdownManager.registerShutdown {
workExecutor.awaitTermination(1, TimeUnit.SECONDS)
executor.awaitTermination(1, TimeUnit.SECONDS)
fun startInjectorWithBoundedQueue(
executor: ExecutorService,
numberOfInjections: Int,
queueBound: Int,
work: () -> Unit
) {
val remainingLatch = CountDownLatch(numberOfInjections)
val queuedCount = AtomicInteger(0)
val lock = ReentrantLock()
val canQueueAgain = lock.newCondition()
val injectorShutdown = AtomicBoolean(false)
val injector = thread(name = "injector") {
while (true) {
if (injectorShutdown.get()) break
executor.submit {
if (queuedCount.decrementAndGet() < queueBound / 2) {
lock.withLock {
if (queuedCount.incrementAndGet() > queueBound) {
lock.withLock {
fun RPCDriverExposedDSLInterface.startReporter(): MetricRegistry {
val metricRegistry = MetricRegistry()
val jmxReporter = thread {
createsObjectNamesWith { _, domain, name ->
// Make the JMX hierarchy a bit better organised.
val category = name.substringBefore('.')
val subName = name.substringAfter('.', "")
if (subName == "")
val consoleReporter = thread {
ConsoleReporter.forRegistry(metricRegistry).build().start(1, TimeUnit.SECONDS)
shutdownManager.registerShutdown {
return metricRegistry
@ -0,0 +1,125 @@
package net.corda.node
import co.paralleluniverse.fibers.Suspendable
import com.google.common.base.Stopwatch
import com.google.common.util.concurrent.Futures
import net.corda.core.contracts.DOLLARS
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow
import net.corda.core.minutes
import net.corda.core.node.services.ServiceInfo
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.utilities.div
import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow
import net.corda.node.services.startFlowPermission
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.nodeapi.User
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.driver
import net.corda.testing.performance.startPublishingFixedRateInjector
import net.corda.testing.performance.startReporter
import net.corda.testing.performance.startTightLoopInjector
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import java.lang.management.ManagementFactory
import java.util.*
import java.util.concurrent.TimeUnit
import kotlin.streams.toList
private fun checkQuasarAgent() {
if (!(ManagementFactory.getRuntimeMXBean().inputArguments.any { it.contains("quasar") })) {
throw IllegalStateException("No quasar agent")
@Ignore("Run these locally")
class NodePerformanceTests {
class EmptyFlow : FlowLogic<Unit>() {
override fun call() {
private data class FlowMeasurementResult(
val flowPerSecond: Double,
val averageMs: Double
fun before() {
fun `empty flow per second`() {
driver(startNodesInProcess = true) {
val a = startNode(rpcUsers = listOf(User("A", "A", setOf(startFlowPermission<EmptyFlow>())))).get()
a.rpcClientToNode().use("A", "A") { connection ->
val timings = Collections.synchronizedList(ArrayList<Long>())
val N = 10000
val overallTiming = Stopwatch.createStarted().apply {
parallelism = 8,
numberOfInjections = N,
queueBound = 50
) {
val timing = Stopwatch.createStarted().apply {
flowPerSecond = N / (overallTiming * 0.000001),
averageMs = timings.average() * 0.001
fun `empty flow rate`() {
driver(startNodesInProcess = true) {
val a = startNode(rpcUsers = listOf(User("A", "A", setOf(startFlowPermission<EmptyFlow>())))).get()
a as NodeHandle.InProcess
val metricRegistry = startReporter(shutdownManager, a.node.services.monitoringService.metrics)
a.rpcClientToNode().use("A", "A") { connection ->
startPublishingFixedRateInjector(metricRegistry, 8, 5.minutes, 2000L / TimeUnit.SECONDS) {
fun `self pay rate`() {
driver(startNodesInProcess = true) {
val a = startNode(
rpcUsers = listOf(User("A", "A", setOf(startFlowPermission<CashIssueFlow>(), startFlowPermission<CashPaymentFlow>()))),
advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type))
a as NodeHandle.InProcess
val metricRegistry = startReporter(shutdownManager, a.node.services.monitoringService.metrics)
a.rpcClientToNode().use("A", "A") { connection ->
val doneFutures = (1..100).toList().parallelStream().map {
connection.proxy.startFlow(::CashIssueFlow, 1.DOLLARS, OpaqueBytes.of(0), a.nodeInfo.legalIdentity, a.nodeInfo.notaryIdentity).returnValue
startPublishingFixedRateInjector(metricRegistry, 8, 5.minutes, 100L / TimeUnit.SECONDS) {
connection.proxy.startFlow(::CashPaymentFlow, 1.DOLLARS, a.nodeInfo.legalIdentity).returnValue.get()
@ -29,7 +29,7 @@ import kotlin.test.assertEquals
class DistributedServiceTests : DriverBasedTest() {
lateinit var alice: NodeHandle
lateinit var notaries: List<NodeHandle>
lateinit var notaries: List<NodeHandle.OutOfProcess>
lateinit var aliceProxy: CordaRPCOps
lateinit var raftNotaryIdentity: Party
lateinit var notaryStateMachines: Observable<Pair<NodeInfo, StateMachineUpdate>>
@ -52,7 +52,7 @@ class DistributedServiceTests : DriverBasedTest() {
alice = aliceFuture.get()
val (notaryIdentity, notaryNodes) = notariesFuture.get()
raftNotaryIdentity = notaryIdentity
notaries = notaryNodes
notaries = notaryNodes.map { it as NodeHandle.OutOfProcess }
assertEquals(notaries.size, clusterSize)
assertEquals(notaries.size, notaries.map { it.nodeInfo.legalIdentity }.toSet().size)
@ -25,10 +25,10 @@ class DriverTests {
private val executorService: ScheduledExecutorService = Executors.newScheduledThreadPool(2)
private fun nodeMustBeUp(handleFuture: ListenableFuture<NodeHandle>) = handleFuture.getOrThrow().apply {
private fun nodeMustBeUp(handleFuture: ListenableFuture<out NodeHandle>) = handleFuture.getOrThrow().apply {
val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address)
// Check that the port is bound
addressMustBeBound(executorService, hostAndPort, process)
addressMustBeBound(executorService, hostAndPort, (this as? NodeHandle.OutOfProcess)?.process)
private fun nodeMustBeDown(handle: NodeHandle) {
@ -224,6 +224,7 @@ fun <A> rpcDriver(
systemProperties: Map<String, String> = emptyMap(),
useTestClock: Boolean = false,
networkMapStartStrategy: NetworkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = false),
startNodesInProcess: Boolean = false,
dsl: RPCDriverExposedDSLInterface.() -> A
) = genericDriver(
driverDsl = RPCDriverDSL(
@ -234,7 +235,8 @@ fun <A> rpcDriver(
driverDirectory = driverDirectory.toAbsolutePath(),
useTestClock = useTestClock,
networkMapStartStrategy = networkMapStartStrategy,
isDebug = isDebug
isDebug = isDebug,
startNodesInProcess = startNodesInProcess
coerce = { it },
@ -19,17 +19,21 @@ import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.core.utilities.*
import net.corda.node.internal.Node
import net.corda.node.internal.NodeStartup
import net.corda.node.serialization.NodeClock
import net.corda.node.services.config.*
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.utilities.ServiceIdentityGenerator
import net.corda.node.utilities.TestClock
import net.corda.nodeapi.ArtemisMessagingComponent
import net.corda.nodeapi.User
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.nodeapi.config.parseAs
import net.corda.nodeapi.internal.ShutdownHook
import net.corda.nodeapi.internal.addShutdownHook
import net.corda.testing.MOCK_VERSION_INFO
import okhttp3.OkHttpClient
import okhttp3.Request
import org.bouncycastle.asn1.x500.X500Name
@ -38,6 +42,7 @@ import java.io.File
import java.net.*
import java.nio.file.Path
import java.nio.file.Paths
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.time.ZoneOffset.UTC
@ -47,6 +52,7 @@ import java.util.concurrent.*
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
@ -54,6 +60,8 @@ import java.util.concurrent.atomic.AtomicInteger
* The process the driver is run in behaves as an Artemis client and starts up other processes. Namely it first
* bootstraps a network map service to allow the specified nodes to connect to, then starts up the actual nodes.
* TODO this file is getting way too big, it should be split into several files.
private val log: Logger = loggerFor<DriverDSL>()
@ -66,19 +74,25 @@ interface DriverDSLExposedInterface : CordformContext {
* Starts a [net.corda.node.internal.Node] in a separate process.
* @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something
* random. Note that this must be unique as the driver uses it as a primary key!
* random. Note that this must be unique as the driver uses it as a primary key!
* @param advertisedServices The set of services to be advertised by the node. Defaults to empty set.
* @param verifierType The type of transaction verifier to use. See: [VerifierType]
* @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list.
* @param startInSameProcess Determines if the node should be started inside the same process the Driver is running
* in. If null the Driver-level value will be used.
* @return The [NodeInfo] of the started up node retrieved from the network map service.
fun startNode(providedName: X500Name? = null,
advertisedServices: Set<ServiceInfo> = emptySet(),
rpcUsers: List<User> = emptyList(),
verifierType: VerifierType = VerifierType.InMemory,
customOverrides: Map<String, Any?> = emptyMap()): ListenableFuture<NodeHandle>
customOverrides: Map<String, Any?> = emptyMap(),
startInSameProcess: Boolean? = null): ListenableFuture<out NodeHandle>
fun startNodes(nodes: List<CordformNode>): List<ListenableFuture<NodeHandle>>
fun startNodes(
nodes: List<CordformNode>,
startInSameProcess: Boolean? = null
): List<ListenableFuture<out NodeHandle>>
* Starts a distributed notary cluster.
@ -88,6 +102,8 @@ interface DriverDSLExposedInterface : CordformContext {
* @param type The advertised notary service type. Currently the only supported type is [RaftValidatingNotaryService.type].
* @param verifierType The type of transaction verifier to use. See: [VerifierType]
* @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list.
* @param startInSameProcess Determines if the node should be started inside the same process the Driver is running
* in. If null the Driver-level value will be used.
* @return The [Party] identity of the distributed notary service, and the [NodeInfo]s of the notaries in the cluster.
fun startNotaryCluster(
@ -95,7 +111,8 @@ interface DriverDSLExposedInterface : CordformContext {
clusterSize: Int = 3,
type: ServiceType = RaftValidatingNotaryService.type,
verifierType: VerifierType = VerifierType.InMemory,
rpcUsers: List<User> = emptyList()): Future<Pair<Party, List<NodeHandle>>>
rpcUsers: List<User> = emptyList(),
startInSameProcess: Boolean? = null): ListenableFuture<Pair<Party, List<NodeHandle>>>
* Starts a web server for a node
@ -107,8 +124,10 @@ interface DriverDSLExposedInterface : CordformContext {
* Starts a network map service node. Note that only a single one should ever be running, so you will probably want
* to set networkMapStartStrategy to Dedicated(false) in your [driver] call.
* @param startInProcess Determines if the node should be started inside this process. If null the Driver-level
* value will be used.
fun startDedicatedNetworkMapService(): ListenableFuture<Unit>
fun startDedicatedNetworkMapService(startInProcess: Boolean? = null): ListenableFuture<out NodeHandle>
fun waitForAllNodesToFinish()
@ -138,13 +157,30 @@ interface DriverDSLInternalInterface : DriverDSLExposedInterface {
fun shutdown()
data class NodeHandle(
val nodeInfo: NodeInfo,
val rpc: CordaRPCOps,
val configuration: FullNodeConfiguration,
val webAddress: HostAndPort,
val process: Process
) {
sealed class NodeHandle {
abstract val nodeInfo: NodeInfo
abstract val rpc: CordaRPCOps
abstract val configuration: FullNodeConfiguration
abstract val webAddress: HostAndPort
data class OutOfProcess(
override val nodeInfo: NodeInfo,
override val rpc: CordaRPCOps,
override val configuration: FullNodeConfiguration,
override val webAddress: HostAndPort,
val debugPort: Int?,
val process: Process
) : NodeHandle()
data class InProcess(
override val nodeInfo: NodeInfo,
override val rpc: CordaRPCOps,
override val configuration: FullNodeConfiguration,
override val webAddress: HostAndPort,
val node: Node,
val nodeThread: Thread
) : NodeHandle()
fun rpcClientToNode(): CordaRPCClient = CordaRPCClient(configuration.rpcAddress!!)
@ -186,14 +222,15 @@ sealed class PortAllocation {
* The driver implicitly bootstraps a [NetworkMapService].
* @param isDebug Indicates whether the spawned nodes should start in jdwt debug mode and have debug level logging.
* @param driverDirectory The base directory node directories go into, defaults to "build/<timestamp>/". The node
* directories themselves are "<baseDirectory>/<legalName>/", where legalName defaults to "<randomName>-<messagingPort>"
* and may be specified in [DriverDSL.startNode].
* @param portAllocation The port allocation strategy to use for the messaging and the web server addresses. Defaults to incremental.
* @param debugPortAllocation The port allocation strategy to use for jvm debugging. Defaults to incremental.
* @param systemProperties A Map of extra system properties which will be given to each new node. Defaults to empty.
* @param useTestClock If true the test clock will be used in Node.
* @param isDebug Indicates whether the spawned nodes should start in jdwt debug mode and have debug level logging.
* @param networkMapStartStrategy Determines whether a network map node is started automatically.
* @param startNodesInProcess Provides the default behaviour of whether new nodes should start inside this process or
* not. Note that this may be overridden in [DriverDSLExposedInterface.startNode].
* @param dsl The dsl itself.
* @return The value returned in the [dsl] closure.
@ -206,6 +243,7 @@ fun <A> driver(
systemProperties: Map<String, String> = emptyMap(),
useTestClock: Boolean = false,
networkMapStartStrategy: NetworkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = true),
startNodesInProcess: Boolean = false,
dsl: DriverDSLExposedInterface.() -> A
) = genericDriver(
driverDsl = DriverDSL(
@ -215,6 +253,7 @@ fun <A> driver(
driverDirectory = driverDirectory.toAbsolutePath(),
useTestClock = useTestClock,
networkMapStartStrategy = networkMapStartStrategy,
startNodesInProcess = startNodesInProcess,
isDebug = isDebug
coerce = { it },
@ -259,13 +298,13 @@ class ListenProcessDeathException(message: String) : Exception(message)
* @throws ListenProcessDeathException if [listenProcess] dies before the check succeeds, i.e. the check can't succeed as intended.
fun addressMustBeBound(executorService: ScheduledExecutorService, hostAndPort: HostAndPort, listenProcess: Process) {
fun addressMustBeBound(executorService: ScheduledExecutorService, hostAndPort: HostAndPort, listenProcess: Process? = null) {
addressMustBeBoundFuture(executorService, hostAndPort, listenProcess).getOrThrow()
fun addressMustBeBoundFuture(executorService: ScheduledExecutorService, hostAndPort: HostAndPort, listenProcess: Process): ListenableFuture<Unit> {
fun addressMustBeBoundFuture(executorService: ScheduledExecutorService, hostAndPort: HostAndPort, listenProcess: Process? = null): ListenableFuture<Unit> {
return poll(executorService, "address $hostAndPort to bind") {
if (!listenProcess.isAlive) {
if (listenProcess != null && !listenProcess.isAlive) {
throw ListenProcessDeathException("The process that was expected to listen on $hostAndPort has died with status: ${listenProcess.exitValue()}")
try {
@ -337,6 +376,19 @@ class ShutdownManager(private val executorService: ExecutorService) {
private val state = ThreadBox(State())
companion object {
inline fun <A> run(providedExecutorService: ExecutorService? = null, block: ShutdownManager.() -> A): A {
val executorService = providedExecutorService ?: Executors.newScheduledThreadPool(1)
val shutdownManager = ShutdownManager(executorService)
try {
return block(shutdownManager)
} finally {
providedExecutorService ?: executorService.shutdown()
fun shutdown() {
val shutdownFutures = state.locked {
if (isShutdown) {
@ -421,13 +473,15 @@ class DriverDSL(
val driverDirectory: Path,
val useTestClock: Boolean,
val isDebug: Boolean,
val networkMapStartStrategy: NetworkMapStartStrategy
val networkMapStartStrategy: NetworkMapStartStrategy,
val startNodesInProcess: Boolean
) : DriverDSLInternalInterface {
private val dedicatedNetworkMapAddress = portAllocation.nextHostAndPort()
var _executorService: ListeningScheduledExecutorService? = null
private var _executorService: ListeningScheduledExecutorService? = null
val executorService get() = _executorService!!
var _shutdownManager: ShutdownManager? = null
private var _shutdownManager: ShutdownManager? = null
override val shutdownManager get() = _shutdownManager!!
private var callerPackage: String? = null
class State {
val processes = ArrayList<ListenableFuture<Process>>()
@ -500,57 +554,50 @@ class DriverDSL(
advertisedServices: Set<ServiceInfo>,
rpcUsers: List<User>,
verifierType: VerifierType,
customOverrides: Map<String, Any?>): ListenableFuture<NodeHandle> {
customOverrides: Map<String, Any?>,
startInSameProcess: Boolean?
): ListenableFuture<out NodeHandle> {
val p2pAddress = portAllocation.nextHostAndPort()
val rpcAddress = portAllocation.nextHostAndPort()
val webAddress = portAllocation.nextHostAndPort()
// TODO: Derive name from the full picked name, don't just wrap the common name
val name = providedName ?: X509Utilities.getX509Name("${oneOf(names).commonName}-${p2pAddress.port}","London","demo@r3.com",null)
return startNode(p2pAddress, webAddress, name, configOf(
"myLegalName" to name.toString(),
"p2pAddress" to p2pAddress.toString(),
"rpcAddress" to rpcAddress.toString(),
"webAddress" to webAddress.toString(),
"extraAdvertisedServiceIds" to advertisedServices.map { it.toString() },
"networkMapService" to networkMapServiceConfigLookup(emptyList())(name),
"useTestClock" to useTestClock,
"rpcUsers" to rpcUsers.map { it.toMap() },
"verifierType" to verifierType.name
) + customOverrides)
private fun startNode(p2pAddress: HostAndPort, webAddress: HostAndPort, nodeName: X500Name, configOverrides: Config) = run {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(nodeName),
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
configOverrides = configOverrides)
val configuration = config.parseAs<FullNodeConfiguration>()
val processFuture = startNode(executorService, configuration, config, quasarJarPath, debugPort, systemProperties)
processFuture.flatMap { process ->
// We continue to use SSL enabled port for RPC when its for node user.
establishRpc(p2pAddress, configuration).flatMap { rpc ->
rpc.waitUntilRegisteredWithNetworkMap().map {
NodeHandle(rpc.nodeIdentity(), rpc, configuration, webAddress, process)
configOverrides = configOf(
"myLegalName" to name.toString(),
"p2pAddress" to p2pAddress.toString(),
"rpcAddress" to rpcAddress.toString(),
"webAddress" to webAddress.toString(),
"extraAdvertisedServiceIds" to advertisedServices.map { it.toString() },
"networkMapService" to networkMapServiceConfigLookup(emptyList())(name),
"useTestClock" to useTestClock,
"rpcUsers" to rpcUsers.map { it.toMap() },
"verifierType" to verifierType.name
) + customOverrides
return startNodeInternal(config, webAddress, startInSameProcess)
override fun startNodes(nodes: List<CordformNode>): List<ListenableFuture<NodeHandle>> {
override fun startNodes(nodes: List<CordformNode>, startInSameProcess: Boolean?): List<ListenableFuture<out NodeHandle>> {
val networkMapServiceConfigLookup = networkMapServiceConfigLookup(nodes)
return nodes.map {
val p2pAddress = HostAndPort.fromString(it.config.getString("p2pAddress")); portAllocation.nextHostAndPort()
return nodes.map { node ->
portAllocation.nextHostAndPort() // rpcAddress
val webAddress = portAllocation.nextHostAndPort()
val name = X500Name(it.name)
startNode(p2pAddress, webAddress, name, it.config + mapOf(
"extraAdvertisedServiceIds" to it.advertisedServices,
"networkMapService" to networkMapServiceConfigLookup(name),
"rpcUsers" to it.rpcUsers,
"notaryClusterAddresses" to it.notaryClusterAddresses
val name = X500Name(node.name)
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
configOverrides = node.config + mapOf(
"extraAdvertisedServiceIds" to node.advertisedServices,
"networkMapService" to networkMapServiceConfigLookup(name),
"rpcUsers" to node.rpcUsers,
"notaryClusterAddresses" to node.notaryClusterAddresses
startNodeInternal(config, webAddress, startInSameProcess)
@ -559,7 +606,8 @@ class DriverDSL(
clusterSize: Int,
type: ServiceType,
verifierType: VerifierType,
rpcUsers: List<User>
rpcUsers: List<User>,
startInSameProcess: Boolean?
): ListenableFuture<Pair<Party, List<NodeHandle>>> {
val nodeNames = (0 until clusterSize).map { DUMMY_NOTARY.name.appendToCommonName(" $it") }
val paths = nodeNames.map { baseDirectory(it) }
@ -568,7 +616,14 @@ class DriverDSL(
val notaryClusterAddress = portAllocation.nextHostAndPort()
// Start the first node that will bootstrap the cluster
val firstNotaryFuture = startNode(nodeNames.first(), advertisedServices, rpcUsers, verifierType, mapOf("notaryNodeAddress" to notaryClusterAddress.toString()))
val firstNotaryFuture = startNode(
providedName = nodeNames.first(),
advertisedServices = advertisedServices,
rpcUsers = rpcUsers,
verifierType = verifierType,
customOverrides = mapOf("notaryNodeAddress" to notaryClusterAddress.toString()),
startInSameProcess = startInSameProcess
// All other nodes will join the cluster
val restNotaryFutures = nodeNames.drop(1).map {
val nodeAddress = portAllocation.nextHostAndPort()
@ -613,6 +668,12 @@ class DriverDSL(
Executors.newScheduledThreadPool(2, ThreadFactoryBuilder().setNameFormat("driver-pool-thread-%d").build())
_shutdownManager = ShutdownManager(executorService)
callerPackage = Exception()
.first { it.fileName != "Driver.kt" }
.let { Class.forName(it.className).`package`.name }
// We set this property so that in-process nodes find cordapps. Out-of-process nodes need this passed in when started.
System.setProperty("net.corda.node.cordapp.scan.package", callerPackage)
if (networkMapStartStrategy.startDedicated) {
startDedicatedNetworkMapService().andForget(log) // Allow it to start concurrently with other nodes.
@ -620,9 +681,8 @@ class DriverDSL(
override fun baseDirectory(nodeName: X500Name): Path = driverDirectory / nodeName.commonName.replace(WHITESPACE, "")
override fun startDedicatedNetworkMapService(): ListenableFuture<Unit> {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val apiAddress = portAllocation.nextHostAndPort().toString()
override fun startDedicatedNetworkMapService(startInProcess: Boolean?): ListenableFuture<out NodeHandle> {
val webAddress = portAllocation.nextHostAndPort()
val networkMapLegalName = networkMapStartStrategy.legalName
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(networkMapLegalName),
@ -631,16 +691,45 @@ class DriverDSL(
"myLegalName" to networkMapLegalName.toString(),
// TODO: remove the webAddress as NMS doesn't need to run a web server. This will cause all
// node port numbers to be shifted, so all demos and docs need to be updated accordingly.
"webAddress" to apiAddress,
"webAddress" to webAddress.toString(),
"p2pAddress" to dedicatedNetworkMapAddress.toString(),
"useTestClock" to useTestClock
return startNodeInternal(config, webAddress, startInProcess)
log.info("Starting network-map-service")
val startNode = startNode(executorService, config.parseAs<FullNodeConfiguration>(), config, quasarJarPath, debugPort, systemProperties)
return startNode.flatMap { addressMustBeBoundFuture(executorService, dedicatedNetworkMapAddress, it) }
private fun startNodeInternal(config: Config, webAddress: HostAndPort, startInProcess: Boolean?): ListenableFuture<out NodeHandle> {
val nodeConfiguration = config.parseAs<FullNodeConfiguration>()
// Get the package of the caller of the driver and pass this to the node for CorDapp scanning
if (startInProcess ?: startNodesInProcess) {
val nodeAndThreadFuture = startInProcessNode(executorService, nodeConfiguration, config)
nodeAndThreadFuture.map { (node, thread) -> {
} }
return nodeAndThreadFuture.flatMap { (node, thread) ->
establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration).flatMap { rpc ->
rpc.waitUntilRegisteredWithNetworkMap().map {
NodeHandle.InProcess(rpc.nodeIdentity(), rpc, nodeConfiguration, webAddress, node, thread)
} else {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val processFuture = startOutOfProcessNode(executorService, nodeConfiguration, config, quasarJarPath, debugPort, systemProperties, callerPackage!!)
return processFuture.flatMap { process ->
// We continue to use SSL enabled port for RPC when its for node user.
establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration).flatMap { rpc ->
rpc.waitUntilRegisteredWithNetworkMap().map {
NodeHandle.OutOfProcess(rpc.nodeIdentity(), rpc, nodeConfiguration, webAddress, debugPort, process)
override fun <A> pollUntilNonNull(pollName: String, pollInterval: Duration, warnCount: Int, check: () -> A?): ListenableFuture<A> {
@ -658,20 +747,37 @@ class DriverDSL(
private fun <A> oneOf(array: Array<A>) = array[Random().nextInt(array.size)]
private fun startNode(
private fun startInProcessNode(
executorService: ListeningScheduledExecutorService,
nodeConf: FullNodeConfiguration,
config: Config
): ListenableFuture<Pair<Node, Thread>> {
return executorService.submit<Pair<Node, Thread>> {
log.info("Starting in-process Node ${nodeConf.myLegalName.commonName}")
// Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config)
val clock: Clock = if (nodeConf.useTestClock) TestClock() else NodeClock()
// TODO pass the version in?
val node = Node(nodeConf, nodeConf.calculateServices(), MOCK_VERSION_INFO, clock)
val nodeThread = thread(name = nodeConf.myLegalName.commonName) {
node to nodeThread
}.flatMap { nodeAndThread -> addressMustBeBoundFuture(executorService, nodeConf.p2pAddress).map { nodeAndThread } }
private fun startOutOfProcessNode(
executorService: ListeningScheduledExecutorService,
nodeConf: FullNodeConfiguration,
config: Config,
quasarJarPath: String,
debugPort: Int?,
overriddenSystemProperties: Map<String, String>
overriddenSystemProperties: Map<String, String>,
callerPackage: String
): ListenableFuture<Process> {
// Get the package of the caller of the driver and pass this to the node for CorDapp scanning
val callerPackage = Exception()
.first { it.fileName != "Driver.kt" }
.let { Class.forName(it.className).`package`.name }
val processFuture = executorService.submit<Process> {
log.info("Starting out-of-process Node ${nodeConf.myLegalName.commonName}")
// Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config)
@ -0,0 +1,108 @@
package net.corda.testing.performance
import com.codahale.metrics.Gauge
import com.codahale.metrics.MetricRegistry
import com.google.common.base.Stopwatch
import net.corda.core.utilities.Rate
import net.corda.testing.driver.ShutdownManager
import java.time.Duration
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.thread
import kotlin.concurrent.withLock
fun startTightLoopInjector(
parallelism: Int,
numberOfInjections: Int,
queueBound: Int,
work: () -> Unit
) {
ShutdownManager.run {
val executor = Executors.newFixedThreadPool(parallelism)
registerShutdown { executor.shutdown() }
val remainingLatch = CountDownLatch(numberOfInjections)
val queuedCount = AtomicInteger(0)
val lock = ReentrantLock()
val canQueueAgain = lock.newCondition()
val injector = thread(name = "injector") {
val leftToSubmit = AtomicInteger(numberOfInjections)
while (true) {
if (leftToSubmit.getAndDecrement() == 0) break
executor.submit {
if (queuedCount.decrementAndGet() < queueBound / 2) {
lock.withLock {
if (queuedCount.incrementAndGet() > queueBound) {
lock.withLock {
registerShutdown { injector.interrupt() }
fun startPublishingFixedRateInjector(
metricRegistry: MetricRegistry,
parallelism: Int,
overallDuration: Duration,
injectionRate: Rate,
queueSizeMetricName: String = "QueueSize",
workDurationMetricName: String = "WorkDuration",
work: () -> Unit
) {
val workSemaphore = Semaphore(0)
metricRegistry.register(queueSizeMetricName, Gauge { workSemaphore.availablePermits() })
val workDurationTimer = metricRegistry.timer(workDurationMetricName)
ShutdownManager.run {
val executor = Executors.newSingleThreadScheduledExecutor()
registerShutdown { executor.shutdown() }
val workExecutor = Executors.newFixedThreadPool(parallelism)
registerShutdown { workExecutor.shutdown() }
val timings = Collections.synchronizedList(ArrayList<Long>())
for (i in 1..parallelism) {
workExecutor.submit {
try {
while (true) {
workDurationTimer.time {
Stopwatch.createStarted().apply {
} catch (throwable: Throwable) {
val injector = executor.scheduleAtFixedRate(
workSemaphore.release((injectionRate * TimeUnit.SECONDS).toInt())
registerShutdown {
@ -0,0 +1,34 @@
package net.corda.testing.performance
import com.codahale.metrics.ConsoleReporter
import com.codahale.metrics.JmxReporter
import com.codahale.metrics.MetricRegistry
import net.corda.testing.driver.ShutdownManager
import java.util.concurrent.TimeUnit
import javax.management.ObjectName
import kotlin.concurrent.thread
fun startReporter(shutdownManager: ShutdownManager, metricRegistry: MetricRegistry = MetricRegistry()): MetricRegistry {
val jmxReporter = thread {
createsObjectNamesWith { _, domain, name ->
// Make the JMX hierarchy a bit better organised.
val category = name.substringBefore('.')
val subName = name.substringAfter('.', "")
if (subName == "")
shutdownManager.registerShutdown { jmxReporter.interrupt() }
val consoleReporter = thread {
ConsoleReporter.forRegistry(metricRegistry).build().start(1, TimeUnit.SECONDS)
shutdownManager.registerShutdown { consoleReporter.interrupt() }
return metricRegistry
@ -80,6 +80,7 @@ fun <A> verifierDriver(
systemProperties: Map<String, String> = emptyMap(),
useTestClock: Boolean = false,
networkMapStartStrategy: NetworkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = false),
startNodesInProcess: Boolean = false,
dsl: VerifierExposedDSLInterface.() -> A
) = genericDriver(
driverDsl = VerifierDriverDSL(
@ -90,7 +91,8 @@ fun <A> verifierDriver(
driverDirectory = driverDirectory.toAbsolutePath(),
useTestClock = useTestClock,
networkMapStartStrategy = networkMapStartStrategy,
isDebug = isDebug
isDebug = isDebug,
startNodesInProcess = startNodesInProcess
coerce = { it },
Reference in New Issue
Block a user