diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt index 8366391462..362f001232 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt @@ -1,34 +1,27 @@ package net.corda.client.rpc -import com.codahale.metrics.ConsoleReporter -import com.codahale.metrics.Gauge -import com.codahale.metrics.JmxReporter -import com.codahale.metrics.MetricRegistry import com.google.common.base.Stopwatch import net.corda.client.rpc.internal.RPCClientConfiguration import net.corda.core.messaging.RPCOps import net.corda.core.minutes import net.corda.core.seconds -import net.corda.core.utilities.Rate import net.corda.core.utilities.div import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.testing.RPCDriverExposedDSLInterface import net.corda.testing.driver.ShutdownManager import net.corda.testing.measure +import net.corda.testing.performance.startPublishingFixedRateInjector +import net.corda.testing.performance.startReporter +import net.corda.testing.performance.startTightLoopInjector import net.corda.testing.rpcDriver import org.junit.Ignore import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized -import java.time.Duration import java.util.* -import java.util.concurrent.* -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.locks.ReentrantLock -import javax.management.ObjectName -import kotlin.concurrent.thread -import kotlin.concurrent.withLock +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit @Ignore("Only use this locally for profiling") @RunWith(Parameterized::class) @@ -106,10 +99,9 @@ class RPCPerformanceTests : AbstractRPCTest() { val numberOfRequests = overallTraffic / (2 * inputOutputSize) val timings = Collections.synchronizedList(ArrayList()) - val executor = Executors.newFixedThreadPool(8) val totalElapsed = Stopwatch.createStarted().apply { - startInjectorWithBoundedQueue( - executor = executor, + startTightLoopInjector( + parallelism = 8, numberOfInjections = numberOfRequests.toInt(), queueBound = 100 ) { @@ -119,7 +111,6 @@ class RPCPerformanceTests : AbstractRPCTest() { timings.add(elapsed) } }.stop().elapsed(TimeUnit.MICROSECONDS) - executor.shutdownNow() SimpleRPCResult( requestPerSecond = 1000000.0 * numberOfRequests.toDouble() / totalElapsed.toDouble(), averageIndividualMs = timings.average() / 1000.0, @@ -135,7 +126,7 @@ class RPCPerformanceTests : AbstractRPCTest() { @Test fun `consumption rate`() { rpcDriver { - val metricRegistry = startReporter() + val metricRegistry = startReporter(shutdownManager) val proxy = testProxy( RPCClientConfiguration.default.copy( reapInterval = 1.seconds, @@ -148,14 +139,13 @@ class RPCPerformanceTests : AbstractRPCTest() { producerPoolBound = 8 ) ) - measurePerformancePublishMetrics( + startPublishingFixedRateInjector( metricRegistry = metricRegistry, parallelism = 8, overallDuration = 5.minutes, injectionRate = 20000L / TimeUnit.SECONDS, queueSizeMetricName = "$mode.QueueSize", workDurationMetricName = "$mode.WorkDuration", - shutdownManager = this.shutdownManager, work = { proxy.ops.simpleReply(ByteArray(4096), 4096) } @@ -177,19 +167,17 @@ class RPCPerformanceTests : AbstractRPCTest() { consumerPoolSize = 1 ) ) - val executor = Executors.newFixedThreadPool(clientParallelism) val numberOfMessages = 1000 val bigSize = 10_000_000 val elapsed = Stopwatch.createStarted().apply { - startInjectorWithBoundedQueue( - executor = executor, + startTightLoopInjector( + parallelism = clientParallelism, numberOfInjections = numberOfMessages, queueBound = 4 ) { proxy.ops.simpleReply(ByteArray(bigSize), 0) } }.stop().elapsed(TimeUnit.MICROSECONDS) - executor.shutdownNow() BigMessagesResult( Mbps = bigSize.toDouble() * numberOfMessages.toDouble() / elapsed * (1000000.0 / (1024.0 * 1024.0)) ) @@ -197,120 +185,3 @@ class RPCPerformanceTests : AbstractRPCTest() { }.forEach(::println) } } - -fun measurePerformancePublishMetrics( - metricRegistry: MetricRegistry, - parallelism: Int, - overallDuration: Duration, - injectionRate: Rate, - queueSizeMetricName: String, - workDurationMetricName: String, - shutdownManager: ShutdownManager, - work: () -> Unit -) { - val workSemaphore = Semaphore(0) - metricRegistry.register(queueSizeMetricName, Gauge { workSemaphore.availablePermits() }) - val workDurationTimer = metricRegistry.timer(workDurationMetricName) - val executor = Executors.newSingleThreadScheduledExecutor() - val workExecutor = Executors.newFixedThreadPool(parallelism) - val timings = Collections.synchronizedList(ArrayList()) - for (i in 1 .. parallelism) { - workExecutor.submit { - try { - while (true) { - workSemaphore.acquire() - workDurationTimer.time { - timings.add( - Stopwatch.createStarted().apply { - work() - }.stop().elapsed(TimeUnit.MICROSECONDS) - ) - } - } - } catch (throwable: Throwable) { - throwable.printStackTrace() - } - } - } - val injector = executor.scheduleAtFixedRate( - { - workSemaphore.release((injectionRate * TimeUnit.SECONDS).toInt()) - }, - 0, - 1, - TimeUnit.SECONDS - ) - shutdownManager.registerShutdown { - injector.cancel(true) - workExecutor.shutdownNow() - executor.shutdownNow() - workExecutor.awaitTermination(1, TimeUnit.SECONDS) - executor.awaitTermination(1, TimeUnit.SECONDS) - } - Thread.sleep(overallDuration.toMillis()) -} - -fun startInjectorWithBoundedQueue( - executor: ExecutorService, - numberOfInjections: Int, - queueBound: Int, - work: () -> Unit -) { - val remainingLatch = CountDownLatch(numberOfInjections) - val queuedCount = AtomicInteger(0) - val lock = ReentrantLock() - val canQueueAgain = lock.newCondition() - val injectorShutdown = AtomicBoolean(false) - val injector = thread(name = "injector") { - while (true) { - if (injectorShutdown.get()) break - executor.submit { - work() - if (queuedCount.decrementAndGet() < queueBound / 2) { - lock.withLock { - canQueueAgain.signal() - } - } - remainingLatch.countDown() - } - if (queuedCount.incrementAndGet() > queueBound) { - lock.withLock { - canQueueAgain.await() - } - } - } - } - remainingLatch.await() - injectorShutdown.set(true) - injector.join() -} - -fun RPCDriverExposedDSLInterface.startReporter(): MetricRegistry { - val metricRegistry = MetricRegistry() - val jmxReporter = thread { - JmxReporter. - forRegistry(metricRegistry). - inDomain("net.corda"). - createsObjectNamesWith { _, domain, name -> - // Make the JMX hierarchy a bit better organised. - val category = name.substringBefore('.') - val subName = name.substringAfter('.', "") - if (subName == "") - ObjectName("$domain:name=$category") - else - ObjectName("$domain:type=$category,name=$subName") - }. - build(). - start() - } - val consoleReporter = thread { - ConsoleReporter.forRegistry(metricRegistry).build().start(1, TimeUnit.SECONDS) - } - shutdownManager.registerShutdown { - jmxReporter.interrupt() - consoleReporter.interrupt() - jmxReporter.join() - consoleReporter.join() - } - return metricRegistry -} diff --git a/node/src/integration-test/kotlin/net/corda/node/NodePerformanceTests.kt b/node/src/integration-test/kotlin/net/corda/node/NodePerformanceTests.kt new file mode 100644 index 0000000000..3b42fbeae0 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/NodePerformanceTests.kt @@ -0,0 +1,125 @@ +package net.corda.node + +import co.paralleluniverse.fibers.Suspendable +import com.google.common.base.Stopwatch +import com.google.common.util.concurrent.Futures +import net.corda.core.contracts.DOLLARS +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC +import net.corda.core.messaging.startFlow +import net.corda.core.minutes +import net.corda.core.node.services.ServiceInfo +import net.corda.core.serialization.OpaqueBytes +import net.corda.core.utilities.div +import net.corda.flows.CashIssueFlow +import net.corda.flows.CashPaymentFlow +import net.corda.node.services.startFlowPermission +import net.corda.node.services.transactions.SimpleNotaryService +import net.corda.nodeapi.User +import net.corda.testing.driver.NodeHandle +import net.corda.testing.driver.driver +import net.corda.testing.performance.startPublishingFixedRateInjector +import net.corda.testing.performance.startReporter +import net.corda.testing.performance.startTightLoopInjector +import org.junit.Before +import org.junit.Ignore +import org.junit.Test +import java.lang.management.ManagementFactory +import java.util.* +import java.util.concurrent.TimeUnit +import kotlin.streams.toList + + +private fun checkQuasarAgent() { + if (!(ManagementFactory.getRuntimeMXBean().inputArguments.any { it.contains("quasar") })) { + throw IllegalStateException("No quasar agent") + } +} + +@Ignore("Run these locally") +class NodePerformanceTests { + @StartableByRPC + class EmptyFlow : FlowLogic() { + @Suspendable + override fun call() { + } + } + + private data class FlowMeasurementResult( + val flowPerSecond: Double, + val averageMs: Double + ) + + @Before + fun before() { + checkQuasarAgent() + } + + @Test + fun `empty flow per second`() { + driver(startNodesInProcess = true) { + val a = startNode(rpcUsers = listOf(User("A", "A", setOf(startFlowPermission())))).get() + + a.rpcClientToNode().use("A", "A") { connection -> + val timings = Collections.synchronizedList(ArrayList()) + val N = 10000 + val overallTiming = Stopwatch.createStarted().apply { + startTightLoopInjector( + parallelism = 8, + numberOfInjections = N, + queueBound = 50 + ) { + val timing = Stopwatch.createStarted().apply { + connection.proxy.startFlow(::EmptyFlow).returnValue.get() + }.stop().elapsed(TimeUnit.MICROSECONDS) + timings.add(timing) + } + }.stop().elapsed(TimeUnit.MICROSECONDS) + println( + FlowMeasurementResult( + flowPerSecond = N / (overallTiming * 0.000001), + averageMs = timings.average() * 0.001 + ) + ) + } + } + } + + @Test + fun `empty flow rate`() { + driver(startNodesInProcess = true) { + val a = startNode(rpcUsers = listOf(User("A", "A", setOf(startFlowPermission())))).get() + a as NodeHandle.InProcess + val metricRegistry = startReporter(shutdownManager, a.node.services.monitoringService.metrics) + a.rpcClientToNode().use("A", "A") { connection -> + startPublishingFixedRateInjector(metricRegistry, 8, 5.minutes, 2000L / TimeUnit.SECONDS) { + connection.proxy.startFlow(::EmptyFlow).returnValue.get() + } + } + } + } + + @Test + fun `self pay rate`() { + driver(startNodesInProcess = true) { + val a = startNode( + rpcUsers = listOf(User("A", "A", setOf(startFlowPermission(), startFlowPermission()))), + advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)) + ).get() + a as NodeHandle.InProcess + val metricRegistry = startReporter(shutdownManager, a.node.services.monitoringService.metrics) + a.rpcClientToNode().use("A", "A") { connection -> + println("ISSUING") + val doneFutures = (1..100).toList().parallelStream().map { + connection.proxy.startFlow(::CashIssueFlow, 1.DOLLARS, OpaqueBytes.of(0), a.nodeInfo.legalIdentity, a.nodeInfo.notaryIdentity).returnValue + }.toList() + Futures.allAsList(doneFutures).get() + println("STARTING PAYMENT") + startPublishingFixedRateInjector(metricRegistry, 8, 5.minutes, 100L / TimeUnit.SECONDS) { + connection.proxy.startFlow(::CashPaymentFlow, 1.DOLLARS, a.nodeInfo.legalIdentity).returnValue.get() + } + } + + } + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt index 62f8b1fc04..f5ec2cbeae 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt @@ -29,7 +29,7 @@ import kotlin.test.assertEquals class DistributedServiceTests : DriverBasedTest() { lateinit var alice: NodeHandle - lateinit var notaries: List + lateinit var notaries: List lateinit var aliceProxy: CordaRPCOps lateinit var raftNotaryIdentity: Party lateinit var notaryStateMachines: Observable> @@ -52,7 +52,7 @@ class DistributedServiceTests : DriverBasedTest() { alice = aliceFuture.get() val (notaryIdentity, notaryNodes) = notariesFuture.get() raftNotaryIdentity = notaryIdentity - notaries = notaryNodes + notaries = notaryNodes.map { it as NodeHandle.OutOfProcess } assertEquals(notaries.size, clusterSize) assertEquals(notaries.size, notaries.map { it.nodeInfo.legalIdentity }.toSet().size) diff --git a/test-utils/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt b/test-utils/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt index 9c9182d278..40079f3bdf 100644 --- a/test-utils/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt +++ b/test-utils/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt @@ -25,10 +25,10 @@ class DriverTests { private val executorService: ScheduledExecutorService = Executors.newScheduledThreadPool(2) - private fun nodeMustBeUp(handleFuture: ListenableFuture) = handleFuture.getOrThrow().apply { + private fun nodeMustBeUp(handleFuture: ListenableFuture) = handleFuture.getOrThrow().apply { val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address) // Check that the port is bound - addressMustBeBound(executorService, hostAndPort, process) + addressMustBeBound(executorService, hostAndPort, (this as? NodeHandle.OutOfProcess)?.process) } private fun nodeMustBeDown(handle: NodeHandle) { diff --git a/test-utils/src/main/kotlin/net/corda/testing/RPCDriver.kt b/test-utils/src/main/kotlin/net/corda/testing/RPCDriver.kt index 06c186ef5b..925df7a4ad 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/RPCDriver.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/RPCDriver.kt @@ -224,6 +224,7 @@ fun rpcDriver( systemProperties: Map = emptyMap(), useTestClock: Boolean = false, networkMapStartStrategy: NetworkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = false), + startNodesInProcess: Boolean = false, dsl: RPCDriverExposedDSLInterface.() -> A ) = genericDriver( driverDsl = RPCDriverDSL( @@ -234,7 +235,8 @@ fun rpcDriver( driverDirectory = driverDirectory.toAbsolutePath(), useTestClock = useTestClock, networkMapStartStrategy = networkMapStartStrategy, - isDebug = isDebug + isDebug = isDebug, + startNodesInProcess = startNodesInProcess ) ), coerce = { it }, diff --git a/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt b/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt index 8aa09cea21..53798b4c37 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt @@ -19,17 +19,21 @@ import net.corda.core.node.NodeInfo import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceType import net.corda.core.utilities.* +import net.corda.node.internal.Node import net.corda.node.internal.NodeStartup +import net.corda.node.serialization.NodeClock import net.corda.node.services.config.* import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.RaftValidatingNotaryService import net.corda.node.utilities.ServiceIdentityGenerator +import net.corda.node.utilities.TestClock import net.corda.nodeapi.ArtemisMessagingComponent import net.corda.nodeapi.User import net.corda.nodeapi.config.SSLConfiguration import net.corda.nodeapi.config.parseAs import net.corda.nodeapi.internal.ShutdownHook import net.corda.nodeapi.internal.addShutdownHook +import net.corda.testing.MOCK_VERSION_INFO import okhttp3.OkHttpClient import okhttp3.Request import org.bouncycastle.asn1.x500.X500Name @@ -38,6 +42,7 @@ import java.io.File import java.net.* import java.nio.file.Path import java.nio.file.Paths +import java.time.Clock import java.time.Duration import java.time.Instant import java.time.ZoneOffset.UTC @@ -47,6 +52,7 @@ import java.util.concurrent.* import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.atomic.AtomicInteger +import kotlin.concurrent.thread /** @@ -54,6 +60,8 @@ import java.util.concurrent.atomic.AtomicInteger * * The process the driver is run in behaves as an Artemis client and starts up other processes. Namely it first * bootstraps a network map service to allow the specified nodes to connect to, then starts up the actual nodes. + * + * TODO this file is getting way too big, it should be split into several files. */ private val log: Logger = loggerFor() @@ -66,19 +74,25 @@ interface DriverDSLExposedInterface : CordformContext { * Starts a [net.corda.node.internal.Node] in a separate process. * * @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something - * random. Note that this must be unique as the driver uses it as a primary key! + * random. Note that this must be unique as the driver uses it as a primary key! * @param advertisedServices The set of services to be advertised by the node. Defaults to empty set. * @param verifierType The type of transaction verifier to use. See: [VerifierType] * @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list. + * @param startInSameProcess Determines if the node should be started inside the same process the Driver is running + * in. If null the Driver-level value will be used. * @return The [NodeInfo] of the started up node retrieved from the network map service. */ fun startNode(providedName: X500Name? = null, advertisedServices: Set = emptySet(), rpcUsers: List = emptyList(), verifierType: VerifierType = VerifierType.InMemory, - customOverrides: Map = emptyMap()): ListenableFuture + customOverrides: Map = emptyMap(), + startInSameProcess: Boolean? = null): ListenableFuture - fun startNodes(nodes: List): List> + fun startNodes( + nodes: List, + startInSameProcess: Boolean? = null + ): List> /** * Starts a distributed notary cluster. @@ -88,6 +102,8 @@ interface DriverDSLExposedInterface : CordformContext { * @param type The advertised notary service type. Currently the only supported type is [RaftValidatingNotaryService.type]. * @param verifierType The type of transaction verifier to use. See: [VerifierType] * @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list. + * @param startInSameProcess Determines if the node should be started inside the same process the Driver is running + * in. If null the Driver-level value will be used. * @return The [Party] identity of the distributed notary service, and the [NodeInfo]s of the notaries in the cluster. */ fun startNotaryCluster( @@ -95,7 +111,8 @@ interface DriverDSLExposedInterface : CordformContext { clusterSize: Int = 3, type: ServiceType = RaftValidatingNotaryService.type, verifierType: VerifierType = VerifierType.InMemory, - rpcUsers: List = emptyList()): Future>> + rpcUsers: List = emptyList(), + startInSameProcess: Boolean? = null): ListenableFuture>> /** * Starts a web server for a node @@ -107,8 +124,10 @@ interface DriverDSLExposedInterface : CordformContext { /** * Starts a network map service node. Note that only a single one should ever be running, so you will probably want * to set networkMapStartStrategy to Dedicated(false) in your [driver] call. + * @param startInProcess Determines if the node should be started inside this process. If null the Driver-level + * value will be used. */ - fun startDedicatedNetworkMapService(): ListenableFuture + fun startDedicatedNetworkMapService(startInProcess: Boolean? = null): ListenableFuture fun waitForAllNodesToFinish() @@ -138,13 +157,30 @@ interface DriverDSLInternalInterface : DriverDSLExposedInterface { fun shutdown() } -data class NodeHandle( - val nodeInfo: NodeInfo, - val rpc: CordaRPCOps, - val configuration: FullNodeConfiguration, - val webAddress: HostAndPort, - val process: Process -) { +sealed class NodeHandle { + abstract val nodeInfo: NodeInfo + abstract val rpc: CordaRPCOps + abstract val configuration: FullNodeConfiguration + abstract val webAddress: HostAndPort + + data class OutOfProcess( + override val nodeInfo: NodeInfo, + override val rpc: CordaRPCOps, + override val configuration: FullNodeConfiguration, + override val webAddress: HostAndPort, + val debugPort: Int?, + val process: Process + ) : NodeHandle() + + data class InProcess( + override val nodeInfo: NodeInfo, + override val rpc: CordaRPCOps, + override val configuration: FullNodeConfiguration, + override val webAddress: HostAndPort, + val node: Node, + val nodeThread: Thread + ) : NodeHandle() + fun rpcClientToNode(): CordaRPCClient = CordaRPCClient(configuration.rpcAddress!!) } @@ -186,6 +222,7 @@ sealed class PortAllocation { * * The driver implicitly bootstraps a [NetworkMapService]. * + * @param isDebug Indicates whether the spawned nodes should start in jdwt debug mode and have debug level logging. * @param driverDirectory The base directory node directories go into, defaults to "build//". The node * directories themselves are "//", where legalName defaults to "-" * and may be specified in [DriverDSL.startNode]. @@ -193,7 +230,9 @@ sealed class PortAllocation { * @param debugPortAllocation The port allocation strategy to use for jvm debugging. Defaults to incremental. * @param systemProperties A Map of extra system properties which will be given to each new node. Defaults to empty. * @param useTestClock If true the test clock will be used in Node. - * @param isDebug Indicates whether the spawned nodes should start in jdwt debug mode and have debug level logging. + * @param networkMapStartStrategy Determines whether a network map node is started automatically. + * @param startNodesInProcess Provides the default behaviour of whether new nodes should start inside this process or + * not. Note that this may be overridden in [DriverDSLExposedInterface.startNode]. * @param dsl The dsl itself. * @return The value returned in the [dsl] closure. */ @@ -206,6 +245,7 @@ fun driver( systemProperties: Map = emptyMap(), useTestClock: Boolean = false, networkMapStartStrategy: NetworkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = true), + startNodesInProcess: Boolean = false, dsl: DriverDSLExposedInterface.() -> A ) = genericDriver( driverDsl = DriverDSL( @@ -215,6 +255,7 @@ fun driver( driverDirectory = driverDirectory.toAbsolutePath(), useTestClock = useTestClock, networkMapStartStrategy = networkMapStartStrategy, + startNodesInProcess = startNodesInProcess, isDebug = isDebug ), coerce = { it }, @@ -259,13 +300,13 @@ class ListenProcessDeathException(message: String) : Exception(message) /** * @throws ListenProcessDeathException if [listenProcess] dies before the check succeeds, i.e. the check can't succeed as intended. */ -fun addressMustBeBound(executorService: ScheduledExecutorService, hostAndPort: HostAndPort, listenProcess: Process) { +fun addressMustBeBound(executorService: ScheduledExecutorService, hostAndPort: HostAndPort, listenProcess: Process? = null) { addressMustBeBoundFuture(executorService, hostAndPort, listenProcess).getOrThrow() } -fun addressMustBeBoundFuture(executorService: ScheduledExecutorService, hostAndPort: HostAndPort, listenProcess: Process): ListenableFuture { +fun addressMustBeBoundFuture(executorService: ScheduledExecutorService, hostAndPort: HostAndPort, listenProcess: Process? = null): ListenableFuture { return poll(executorService, "address $hostAndPort to bind") { - if (!listenProcess.isAlive) { + if (listenProcess != null && !listenProcess.isAlive) { throw ListenProcessDeathException("The process that was expected to listen on $hostAndPort has died with status: ${listenProcess.exitValue()}") } try { @@ -337,6 +378,19 @@ class ShutdownManager(private val executorService: ExecutorService) { private val state = ThreadBox(State()) + companion object { + inline fun run(providedExecutorService: ExecutorService? = null, block: ShutdownManager.() -> A): A { + val executorService = providedExecutorService ?: Executors.newScheduledThreadPool(1) + val shutdownManager = ShutdownManager(executorService) + try { + return block(shutdownManager) + } finally { + shutdownManager.shutdown() + providedExecutorService ?: executorService.shutdown() + } + } + } + fun shutdown() { val shutdownFutures = state.locked { if (isShutdown) { @@ -421,13 +475,15 @@ class DriverDSL( val driverDirectory: Path, val useTestClock: Boolean, val isDebug: Boolean, - val networkMapStartStrategy: NetworkMapStartStrategy + val networkMapStartStrategy: NetworkMapStartStrategy, + val startNodesInProcess: Boolean ) : DriverDSLInternalInterface { private val dedicatedNetworkMapAddress = portAllocation.nextHostAndPort() - var _executorService: ListeningScheduledExecutorService? = null + private var _executorService: ListeningScheduledExecutorService? = null val executorService get() = _executorService!! - var _shutdownManager: ShutdownManager? = null + private var _shutdownManager: ShutdownManager? = null override val shutdownManager get() = _shutdownManager!! + private val callerPackage = getCallerPackage() class State { val processes = ArrayList>() @@ -500,57 +556,50 @@ class DriverDSL( advertisedServices: Set, rpcUsers: List, verifierType: VerifierType, - customOverrides: Map): ListenableFuture { + customOverrides: Map, + startInSameProcess: Boolean? + ): ListenableFuture { val p2pAddress = portAllocation.nextHostAndPort() val rpcAddress = portAllocation.nextHostAndPort() val webAddress = portAllocation.nextHostAndPort() // TODO: Derive name from the full picked name, don't just wrap the common name val name = providedName ?: X509Utilities.getX509Name("${oneOf(names).commonName}-${p2pAddress.port}","London","demo@r3.com",null) - return startNode(p2pAddress, webAddress, name, configOf( - "myLegalName" to name.toString(), - "p2pAddress" to p2pAddress.toString(), - "rpcAddress" to rpcAddress.toString(), - "webAddress" to webAddress.toString(), - "extraAdvertisedServiceIds" to advertisedServices.map { it.toString() }, - "networkMapService" to networkMapServiceConfigLookup(emptyList())(name), - "useTestClock" to useTestClock, - "rpcUsers" to rpcUsers.map { it.toMap() }, - "verifierType" to verifierType.name - ) + customOverrides) - } - - private fun startNode(p2pAddress: HostAndPort, webAddress: HostAndPort, nodeName: X500Name, configOverrides: Config) = run { - val debugPort = if (isDebug) debugPortAllocation.nextPort() else null val config = ConfigHelper.loadConfig( - baseDirectory = baseDirectory(nodeName), + baseDirectory = baseDirectory(name), allowMissingConfig = true, - configOverrides = configOverrides) - val configuration = config.parseAs() - val processFuture = startNode(executorService, configuration, config, quasarJarPath, debugPort, systemProperties) - registerProcess(processFuture) - processFuture.flatMap { process -> - // We continue to use SSL enabled port for RPC when its for node user. - establishRpc(p2pAddress, configuration).flatMap { rpc -> - rpc.waitUntilRegisteredWithNetworkMap().map { - NodeHandle(rpc.nodeIdentity(), rpc, configuration, webAddress, process) - } - } - } + configOverrides = configOf( + "myLegalName" to name.toString(), + "p2pAddress" to p2pAddress.toString(), + "rpcAddress" to rpcAddress.toString(), + "webAddress" to webAddress.toString(), + "extraAdvertisedServiceIds" to advertisedServices.map { it.toString() }, + "networkMapService" to networkMapServiceConfigLookup(emptyList())(name), + "useTestClock" to useTestClock, + "rpcUsers" to rpcUsers.map { it.toMap() }, + "verifierType" to verifierType.name + ) + customOverrides + ) + return startNodeInternal(config, webAddress, startInSameProcess) } - override fun startNodes(nodes: List): List> { + override fun startNodes(nodes: List, startInSameProcess: Boolean?): List> { val networkMapServiceConfigLookup = networkMapServiceConfigLookup(nodes) - return nodes.map { - val p2pAddress = HostAndPort.fromString(it.config.getString("p2pAddress")); portAllocation.nextHostAndPort() + return nodes.map { node -> portAllocation.nextHostAndPort() // rpcAddress val webAddress = portAllocation.nextHostAndPort() - val name = X500Name(it.name) - startNode(p2pAddress, webAddress, name, it.config + mapOf( - "extraAdvertisedServiceIds" to it.advertisedServices, - "networkMapService" to networkMapServiceConfigLookup(name), - "rpcUsers" to it.rpcUsers, - "notaryClusterAddresses" to it.notaryClusterAddresses - )) + val name = X500Name(node.name) + + val config = ConfigHelper.loadConfig( + baseDirectory = baseDirectory(name), + allowMissingConfig = true, + configOverrides = node.config + mapOf( + "extraAdvertisedServiceIds" to node.advertisedServices, + "networkMapService" to networkMapServiceConfigLookup(name), + "rpcUsers" to node.rpcUsers, + "notaryClusterAddresses" to node.notaryClusterAddresses + ) + ) + startNodeInternal(config, webAddress, startInSameProcess) } } @@ -559,7 +608,8 @@ class DriverDSL( clusterSize: Int, type: ServiceType, verifierType: VerifierType, - rpcUsers: List + rpcUsers: List, + startInSameProcess: Boolean? ): ListenableFuture>> { val nodeNames = (0 until clusterSize).map { DUMMY_NOTARY.name.appendToCommonName(" $it") } val paths = nodeNames.map { baseDirectory(it) } @@ -568,7 +618,14 @@ class DriverDSL( val notaryClusterAddress = portAllocation.nextHostAndPort() // Start the first node that will bootstrap the cluster - val firstNotaryFuture = startNode(nodeNames.first(), advertisedServices, rpcUsers, verifierType, mapOf("notaryNodeAddress" to notaryClusterAddress.toString())) + val firstNotaryFuture = startNode( + providedName = nodeNames.first(), + advertisedServices = advertisedServices, + rpcUsers = rpcUsers, + verifierType = verifierType, + customOverrides = mapOf("notaryNodeAddress" to notaryClusterAddress.toString()), + startInSameProcess = startInSameProcess + ) // All other nodes will join the cluster val restNotaryFutures = nodeNames.drop(1).map { val nodeAddress = portAllocation.nextHostAndPort() @@ -613,6 +670,8 @@ class DriverDSL( Executors.newScheduledThreadPool(2, ThreadFactoryBuilder().setNameFormat("driver-pool-thread-%d").build()) ) _shutdownManager = ShutdownManager(executorService) + // We set this property so that in-process nodes find cordapps. Out-of-process nodes need this passed in when started. + System.setProperty("net.corda.node.cordapp.scan.package", callerPackage) if (networkMapStartStrategy.startDedicated) { startDedicatedNetworkMapService().andForget(log) // Allow it to start concurrently with other nodes. } @@ -620,9 +679,8 @@ class DriverDSL( override fun baseDirectory(nodeName: X500Name): Path = driverDirectory / nodeName.commonName.replace(WHITESPACE, "") - override fun startDedicatedNetworkMapService(): ListenableFuture { - val debugPort = if (isDebug) debugPortAllocation.nextPort() else null - val apiAddress = portAllocation.nextHostAndPort().toString() + override fun startDedicatedNetworkMapService(startInProcess: Boolean?): ListenableFuture { + val webAddress = portAllocation.nextHostAndPort() val networkMapLegalName = networkMapStartStrategy.legalName val config = ConfigHelper.loadConfig( baseDirectory = baseDirectory(networkMapLegalName), @@ -631,16 +689,44 @@ class DriverDSL( "myLegalName" to networkMapLegalName.toString(), // TODO: remove the webAddress as NMS doesn't need to run a web server. This will cause all // node port numbers to be shifted, so all demos and docs need to be updated accordingly. - "webAddress" to apiAddress, + "webAddress" to webAddress.toString(), "p2pAddress" to dedicatedNetworkMapAddress.toString(), "useTestClock" to useTestClock ) ) + return startNodeInternal(config, webAddress, startInProcess) + } - log.info("Starting network-map-service") - val startNode = startNode(executorService, config.parseAs(), config, quasarJarPath, debugPort, systemProperties) - registerProcess(startNode) - return startNode.flatMap { addressMustBeBoundFuture(executorService, dedicatedNetworkMapAddress, it) } + private fun startNodeInternal(config: Config, webAddress: HostAndPort, startInProcess: Boolean?): ListenableFuture { + val nodeConfiguration = config.parseAs() + if (startInProcess ?: startNodesInProcess) { + val nodeAndThreadFuture = startInProcessNode(executorService, nodeConfiguration, config) + shutdownManager.registerShutdown( + nodeAndThreadFuture.map { (node, thread) -> { + node.stop() + thread.interrupt() + } } + ) + return nodeAndThreadFuture.flatMap { (node, thread) -> + establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration).flatMap { rpc -> + rpc.waitUntilRegisteredWithNetworkMap().map { + NodeHandle.InProcess(rpc.nodeIdentity(), rpc, nodeConfiguration, webAddress, node, thread) + } + } + } + } else { + val debugPort = if (isDebug) debugPortAllocation.nextPort() else null + val processFuture = startOutOfProcessNode(executorService, nodeConfiguration, config, quasarJarPath, debugPort, systemProperties, callerPackage) + registerProcess(processFuture) + return processFuture.flatMap { process -> + // We continue to use SSL enabled port for RPC when its for node user. + establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration).flatMap { rpc -> + rpc.waitUntilRegisteredWithNetworkMap().map { + NodeHandle.OutOfProcess(rpc.nodeIdentity(), rpc, nodeConfiguration, webAddress, debugPort, process) + } + } + } + } } override fun pollUntilNonNull(pollName: String, pollInterval: Duration, warnCount: Int, check: () -> A?): ListenableFuture { @@ -658,20 +744,37 @@ class DriverDSL( private fun oneOf(array: Array) = array[Random().nextInt(array.size)] - private fun startNode( + private fun startInProcessNode( + executorService: ListeningScheduledExecutorService, + nodeConf: FullNodeConfiguration, + config: Config + ): ListenableFuture> { + return executorService.submit> { + log.info("Starting in-process Node ${nodeConf.myLegalName.commonName}") + // Write node.conf + writeConfig(nodeConf.baseDirectory, "node.conf", config) + val clock: Clock = if (nodeConf.useTestClock) TestClock() else NodeClock() + // TODO pass the version in? + val node = Node(nodeConf, nodeConf.calculateServices(), MOCK_VERSION_INFO, clock) + node.start() + val nodeThread = thread(name = nodeConf.myLegalName.commonName) { + node.run() + } + node to nodeThread + }.flatMap { nodeAndThread -> addressMustBeBoundFuture(executorService, nodeConf.p2pAddress).map { nodeAndThread } } + } + + private fun startOutOfProcessNode( executorService: ListeningScheduledExecutorService, nodeConf: FullNodeConfiguration, config: Config, quasarJarPath: String, debugPort: Int?, - overriddenSystemProperties: Map + overriddenSystemProperties: Map, + callerPackage: String ): ListenableFuture { - // Get the package of the caller of the driver and pass this to the node for CorDapp scanning - val callerPackage = Exception() - .stackTrace - .first { it.fileName != "Driver.kt" } - .let { Class.forName(it.className).`package`.name } val processFuture = executorService.submit { + log.info("Starting out-of-process Node ${nodeConf.myLegalName.commonName}") // Write node.conf writeConfig(nodeConf.baseDirectory, "node.conf", config) @@ -726,6 +829,13 @@ class DriverDSL( ) }.flatMap { process -> addressMustBeBoundFuture(executorService, handle.webAddress, process).map { process } } } + + private fun getCallerPackage(): String { + return Exception() + .stackTrace + .first { it.fileName != "Driver.kt" } + .let { Class.forName(it.className).`package`.name } + } } } diff --git a/test-utils/src/main/kotlin/net/corda/testing/performance/Injectors.kt b/test-utils/src/main/kotlin/net/corda/testing/performance/Injectors.kt new file mode 100644 index 0000000000..5690d0dba9 --- /dev/null +++ b/test-utils/src/main/kotlin/net/corda/testing/performance/Injectors.kt @@ -0,0 +1,108 @@ +package net.corda.testing.performance + +import com.codahale.metrics.Gauge +import com.codahale.metrics.MetricRegistry +import com.google.common.base.Stopwatch +import net.corda.core.utilities.Rate +import net.corda.testing.driver.ShutdownManager +import java.time.Duration +import java.util.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import java.util.concurrent.Semaphore +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.locks.ReentrantLock +import kotlin.concurrent.thread +import kotlin.concurrent.withLock + +fun startTightLoopInjector( + parallelism: Int, + numberOfInjections: Int, + queueBound: Int, + work: () -> Unit +) { + ShutdownManager.run { + val executor = Executors.newFixedThreadPool(parallelism) + registerShutdown { executor.shutdown() } + val remainingLatch = CountDownLatch(numberOfInjections) + val queuedCount = AtomicInteger(0) + val lock = ReentrantLock() + val canQueueAgain = lock.newCondition() + val injector = thread(name = "injector") { + val leftToSubmit = AtomicInteger(numberOfInjections) + while (true) { + if (leftToSubmit.getAndDecrement() == 0) break + executor.submit { + work() + if (queuedCount.decrementAndGet() < queueBound / 2) { + lock.withLock { + canQueueAgain.signal() + } + } + remainingLatch.countDown() + } + if (queuedCount.incrementAndGet() > queueBound) { + lock.withLock { + canQueueAgain.await() + } + } + } + } + registerShutdown { injector.interrupt() } + remainingLatch.await() + injector.join() + } +} + +fun startPublishingFixedRateInjector( + metricRegistry: MetricRegistry, + parallelism: Int, + overallDuration: Duration, + injectionRate: Rate, + queueSizeMetricName: String = "QueueSize", + workDurationMetricName: String = "WorkDuration", + work: () -> Unit +) { + val workSemaphore = Semaphore(0) + metricRegistry.register(queueSizeMetricName, Gauge { workSemaphore.availablePermits() }) + val workDurationTimer = metricRegistry.timer(workDurationMetricName) + ShutdownManager.run { + val executor = Executors.newSingleThreadScheduledExecutor() + registerShutdown { executor.shutdown() } + val workExecutor = Executors.newFixedThreadPool(parallelism) + registerShutdown { workExecutor.shutdown() } + val timings = Collections.synchronizedList(ArrayList()) + for (i in 1..parallelism) { + workExecutor.submit { + try { + while (true) { + workSemaphore.acquire() + workDurationTimer.time { + timings.add( + Stopwatch.createStarted().apply { + work() + }.stop().elapsed(TimeUnit.MICROSECONDS) + ) + } + } + } catch (throwable: Throwable) { + throwable.printStackTrace() + } + } + } + val injector = executor.scheduleAtFixedRate( + { + workSemaphore.release((injectionRate * TimeUnit.SECONDS).toInt()) + }, + 0, + 1, + TimeUnit.SECONDS + ) + registerShutdown { + injector.cancel(true) + } + Thread.sleep(overallDuration.toMillis()) + } +} + diff --git a/test-utils/src/main/kotlin/net/corda/testing/performance/Reporter.kt b/test-utils/src/main/kotlin/net/corda/testing/performance/Reporter.kt new file mode 100644 index 0000000000..fc85aa9aed --- /dev/null +++ b/test-utils/src/main/kotlin/net/corda/testing/performance/Reporter.kt @@ -0,0 +1,34 @@ +package net.corda.testing.performance + +import com.codahale.metrics.ConsoleReporter +import com.codahale.metrics.JmxReporter +import com.codahale.metrics.MetricRegistry +import net.corda.testing.driver.ShutdownManager +import java.util.concurrent.TimeUnit +import javax.management.ObjectName +import kotlin.concurrent.thread + +fun startReporter(shutdownManager: ShutdownManager, metricRegistry: MetricRegistry = MetricRegistry()): MetricRegistry { + val jmxReporter = thread { + JmxReporter. + forRegistry(metricRegistry). + inDomain("net.corda"). + createsObjectNamesWith { _, domain, name -> + // Make the JMX hierarchy a bit better organised. + val category = name.substringBefore('.') + val subName = name.substringAfter('.', "") + if (subName == "") + ObjectName("$domain:name=$category") + else + ObjectName("$domain:type=$category,name=$subName") + }. + build(). + start() + } + shutdownManager.registerShutdown { jmxReporter.interrupt() } + val consoleReporter = thread { + ConsoleReporter.forRegistry(metricRegistry).build().start(1, TimeUnit.SECONDS) + } + shutdownManager.registerShutdown { consoleReporter.interrupt() } + return metricRegistry +} diff --git a/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt b/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt index 7713b6b745..a343c3bd2c 100644 --- a/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt +++ b/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt @@ -80,6 +80,7 @@ fun verifierDriver( systemProperties: Map = emptyMap(), useTestClock: Boolean = false, networkMapStartStrategy: NetworkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = false), + startNodesInProcess: Boolean = false, dsl: VerifierExposedDSLInterface.() -> A ) = genericDriver( driverDsl = VerifierDriverDSL( @@ -90,7 +91,8 @@ fun verifierDriver( driverDirectory = driverDirectory.toAbsolutePath(), useTestClock = useTestClock, networkMapStartStrategy = networkMapStartStrategy, - isDebug = isDebug + isDebug = isDebug, + startNodesInProcess = startNodesInProcess ) ), coerce = { it },