Merge pull request #703 from corda/aslemmer-in-process-driver-nodes-performance-tests

driver, node: Allow in-process starting of nodes, add a couple of node performance tests
This commit is contained in:
Andras Slemmer 2017-06-20 14:57:54 +01:00 committed by GitHub
commit 4dc06ed25b
9 changed files with 474 additions and 222 deletions

View File

@ -1,34 +1,27 @@
package net.corda.client.rpc package net.corda.client.rpc
import com.codahale.metrics.ConsoleReporter
import com.codahale.metrics.Gauge
import com.codahale.metrics.JmxReporter
import com.codahale.metrics.MetricRegistry
import com.google.common.base.Stopwatch import com.google.common.base.Stopwatch
import net.corda.client.rpc.internal.RPCClientConfiguration import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.minutes import net.corda.core.minutes
import net.corda.core.seconds import net.corda.core.seconds
import net.corda.core.utilities.Rate
import net.corda.core.utilities.div import net.corda.core.utilities.div
import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.testing.RPCDriverExposedDSLInterface import net.corda.testing.RPCDriverExposedDSLInterface
import net.corda.testing.driver.ShutdownManager import net.corda.testing.driver.ShutdownManager
import net.corda.testing.measure import net.corda.testing.measure
import net.corda.testing.performance.startPublishingFixedRateInjector
import net.corda.testing.performance.startReporter
import net.corda.testing.performance.startTightLoopInjector
import net.corda.testing.rpcDriver import net.corda.testing.rpcDriver
import org.junit.Ignore import org.junit.Ignore
import org.junit.Test import org.junit.Test
import org.junit.runner.RunWith import org.junit.runner.RunWith
import org.junit.runners.Parameterized import org.junit.runners.Parameterized
import java.time.Duration
import java.util.* import java.util.*
import java.util.concurrent.* import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import javax.management.ObjectName
import kotlin.concurrent.thread
import kotlin.concurrent.withLock
@Ignore("Only use this locally for profiling") @Ignore("Only use this locally for profiling")
@RunWith(Parameterized::class) @RunWith(Parameterized::class)
@ -106,10 +99,9 @@ class RPCPerformanceTests : AbstractRPCTest() {
val numberOfRequests = overallTraffic / (2 * inputOutputSize) val numberOfRequests = overallTraffic / (2 * inputOutputSize)
val timings = Collections.synchronizedList(ArrayList<Long>()) val timings = Collections.synchronizedList(ArrayList<Long>())
val executor = Executors.newFixedThreadPool(8)
val totalElapsed = Stopwatch.createStarted().apply { val totalElapsed = Stopwatch.createStarted().apply {
startInjectorWithBoundedQueue( startTightLoopInjector(
executor = executor, parallelism = 8,
numberOfInjections = numberOfRequests.toInt(), numberOfInjections = numberOfRequests.toInt(),
queueBound = 100 queueBound = 100
) { ) {
@ -119,7 +111,6 @@ class RPCPerformanceTests : AbstractRPCTest() {
timings.add(elapsed) timings.add(elapsed)
} }
}.stop().elapsed(TimeUnit.MICROSECONDS) }.stop().elapsed(TimeUnit.MICROSECONDS)
executor.shutdownNow()
SimpleRPCResult( SimpleRPCResult(
requestPerSecond = 1000000.0 * numberOfRequests.toDouble() / totalElapsed.toDouble(), requestPerSecond = 1000000.0 * numberOfRequests.toDouble() / totalElapsed.toDouble(),
averageIndividualMs = timings.average() / 1000.0, averageIndividualMs = timings.average() / 1000.0,
@ -135,7 +126,7 @@ class RPCPerformanceTests : AbstractRPCTest() {
@Test @Test
fun `consumption rate`() { fun `consumption rate`() {
rpcDriver { rpcDriver {
val metricRegistry = startReporter() val metricRegistry = startReporter(shutdownManager)
val proxy = testProxy( val proxy = testProxy(
RPCClientConfiguration.default.copy( RPCClientConfiguration.default.copy(
reapInterval = 1.seconds, reapInterval = 1.seconds,
@ -148,14 +139,13 @@ class RPCPerformanceTests : AbstractRPCTest() {
producerPoolBound = 8 producerPoolBound = 8
) )
) )
measurePerformancePublishMetrics( startPublishingFixedRateInjector(
metricRegistry = metricRegistry, metricRegistry = metricRegistry,
parallelism = 8, parallelism = 8,
overallDuration = 5.minutes, overallDuration = 5.minutes,
injectionRate = 20000L / TimeUnit.SECONDS, injectionRate = 20000L / TimeUnit.SECONDS,
queueSizeMetricName = "$mode.QueueSize", queueSizeMetricName = "$mode.QueueSize",
workDurationMetricName = "$mode.WorkDuration", workDurationMetricName = "$mode.WorkDuration",
shutdownManager = this.shutdownManager,
work = { work = {
proxy.ops.simpleReply(ByteArray(4096), 4096) proxy.ops.simpleReply(ByteArray(4096), 4096)
} }
@ -177,19 +167,17 @@ class RPCPerformanceTests : AbstractRPCTest() {
consumerPoolSize = 1 consumerPoolSize = 1
) )
) )
val executor = Executors.newFixedThreadPool(clientParallelism)
val numberOfMessages = 1000 val numberOfMessages = 1000
val bigSize = 10_000_000 val bigSize = 10_000_000
val elapsed = Stopwatch.createStarted().apply { val elapsed = Stopwatch.createStarted().apply {
startInjectorWithBoundedQueue( startTightLoopInjector(
executor = executor, parallelism = clientParallelism,
numberOfInjections = numberOfMessages, numberOfInjections = numberOfMessages,
queueBound = 4 queueBound = 4
) { ) {
proxy.ops.simpleReply(ByteArray(bigSize), 0) proxy.ops.simpleReply(ByteArray(bigSize), 0)
} }
}.stop().elapsed(TimeUnit.MICROSECONDS) }.stop().elapsed(TimeUnit.MICROSECONDS)
executor.shutdownNow()
BigMessagesResult( BigMessagesResult(
Mbps = bigSize.toDouble() * numberOfMessages.toDouble() / elapsed * (1000000.0 / (1024.0 * 1024.0)) Mbps = bigSize.toDouble() * numberOfMessages.toDouble() / elapsed * (1000000.0 / (1024.0 * 1024.0))
) )
@ -197,120 +185,3 @@ class RPCPerformanceTests : AbstractRPCTest() {
}.forEach(::println) }.forEach(::println)
} }
} }
fun measurePerformancePublishMetrics(
metricRegistry: MetricRegistry,
parallelism: Int,
overallDuration: Duration,
injectionRate: Rate,
queueSizeMetricName: String,
workDurationMetricName: String,
shutdownManager: ShutdownManager,
work: () -> Unit
) {
val workSemaphore = Semaphore(0)
metricRegistry.register(queueSizeMetricName, Gauge { workSemaphore.availablePermits() })
val workDurationTimer = metricRegistry.timer(workDurationMetricName)
val executor = Executors.newSingleThreadScheduledExecutor()
val workExecutor = Executors.newFixedThreadPool(parallelism)
val timings = Collections.synchronizedList(ArrayList<Long>())
for (i in 1 .. parallelism) {
workExecutor.submit {
try {
while (true) {
workSemaphore.acquire()
workDurationTimer.time {
timings.add(
Stopwatch.createStarted().apply {
work()
}.stop().elapsed(TimeUnit.MICROSECONDS)
)
}
}
} catch (throwable: Throwable) {
throwable.printStackTrace()
}
}
}
val injector = executor.scheduleAtFixedRate(
{
workSemaphore.release((injectionRate * TimeUnit.SECONDS).toInt())
},
0,
1,
TimeUnit.SECONDS
)
shutdownManager.registerShutdown {
injector.cancel(true)
workExecutor.shutdownNow()
executor.shutdownNow()
workExecutor.awaitTermination(1, TimeUnit.SECONDS)
executor.awaitTermination(1, TimeUnit.SECONDS)
}
Thread.sleep(overallDuration.toMillis())
}
fun startInjectorWithBoundedQueue(
executor: ExecutorService,
numberOfInjections: Int,
queueBound: Int,
work: () -> Unit
) {
val remainingLatch = CountDownLatch(numberOfInjections)
val queuedCount = AtomicInteger(0)
val lock = ReentrantLock()
val canQueueAgain = lock.newCondition()
val injectorShutdown = AtomicBoolean(false)
val injector = thread(name = "injector") {
while (true) {
if (injectorShutdown.get()) break
executor.submit {
work()
if (queuedCount.decrementAndGet() < queueBound / 2) {
lock.withLock {
canQueueAgain.signal()
}
}
remainingLatch.countDown()
}
if (queuedCount.incrementAndGet() > queueBound) {
lock.withLock {
canQueueAgain.await()
}
}
}
}
remainingLatch.await()
injectorShutdown.set(true)
injector.join()
}
fun RPCDriverExposedDSLInterface.startReporter(): MetricRegistry {
val metricRegistry = MetricRegistry()
val jmxReporter = thread {
JmxReporter.
forRegistry(metricRegistry).
inDomain("net.corda").
createsObjectNamesWith { _, domain, name ->
// Make the JMX hierarchy a bit better organised.
val category = name.substringBefore('.')
val subName = name.substringAfter('.', "")
if (subName == "")
ObjectName("$domain:name=$category")
else
ObjectName("$domain:type=$category,name=$subName")
}.
build().
start()
}
val consoleReporter = thread {
ConsoleReporter.forRegistry(metricRegistry).build().start(1, TimeUnit.SECONDS)
}
shutdownManager.registerShutdown {
jmxReporter.interrupt()
consoleReporter.interrupt()
jmxReporter.join()
consoleReporter.join()
}
return metricRegistry
}

View File

@ -0,0 +1,125 @@
package net.corda.node
import co.paralleluniverse.fibers.Suspendable
import com.google.common.base.Stopwatch
import com.google.common.util.concurrent.Futures
import net.corda.core.contracts.DOLLARS
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow
import net.corda.core.minutes
import net.corda.core.node.services.ServiceInfo
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.utilities.div
import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow
import net.corda.node.services.startFlowPermission
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.nodeapi.User
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.driver
import net.corda.testing.performance.startPublishingFixedRateInjector
import net.corda.testing.performance.startReporter
import net.corda.testing.performance.startTightLoopInjector
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import java.lang.management.ManagementFactory
import java.util.*
import java.util.concurrent.TimeUnit
import kotlin.streams.toList
private fun checkQuasarAgent() {
if (!(ManagementFactory.getRuntimeMXBean().inputArguments.any { it.contains("quasar") })) {
throw IllegalStateException("No quasar agent")
}
}
@Ignore("Run these locally")
class NodePerformanceTests {
@StartableByRPC
class EmptyFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
}
}
private data class FlowMeasurementResult(
val flowPerSecond: Double,
val averageMs: Double
)
@Before
fun before() {
checkQuasarAgent()
}
@Test
fun `empty flow per second`() {
driver(startNodesInProcess = true) {
val a = startNode(rpcUsers = listOf(User("A", "A", setOf(startFlowPermission<EmptyFlow>())))).get()
a.rpcClientToNode().use("A", "A") { connection ->
val timings = Collections.synchronizedList(ArrayList<Long>())
val N = 10000
val overallTiming = Stopwatch.createStarted().apply {
startTightLoopInjector(
parallelism = 8,
numberOfInjections = N,
queueBound = 50
) {
val timing = Stopwatch.createStarted().apply {
connection.proxy.startFlow(::EmptyFlow).returnValue.get()
}.stop().elapsed(TimeUnit.MICROSECONDS)
timings.add(timing)
}
}.stop().elapsed(TimeUnit.MICROSECONDS)
println(
FlowMeasurementResult(
flowPerSecond = N / (overallTiming * 0.000001),
averageMs = timings.average() * 0.001
)
)
}
}
}
@Test
fun `empty flow rate`() {
driver(startNodesInProcess = true) {
val a = startNode(rpcUsers = listOf(User("A", "A", setOf(startFlowPermission<EmptyFlow>())))).get()
a as NodeHandle.InProcess
val metricRegistry = startReporter(shutdownManager, a.node.services.monitoringService.metrics)
a.rpcClientToNode().use("A", "A") { connection ->
startPublishingFixedRateInjector(metricRegistry, 8, 5.minutes, 2000L / TimeUnit.SECONDS) {
connection.proxy.startFlow(::EmptyFlow).returnValue.get()
}
}
}
}
@Test
fun `self pay rate`() {
driver(startNodesInProcess = true) {
val a = startNode(
rpcUsers = listOf(User("A", "A", setOf(startFlowPermission<CashIssueFlow>(), startFlowPermission<CashPaymentFlow>()))),
advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type))
).get()
a as NodeHandle.InProcess
val metricRegistry = startReporter(shutdownManager, a.node.services.monitoringService.metrics)
a.rpcClientToNode().use("A", "A") { connection ->
println("ISSUING")
val doneFutures = (1..100).toList().parallelStream().map {
connection.proxy.startFlow(::CashIssueFlow, 1.DOLLARS, OpaqueBytes.of(0), a.nodeInfo.legalIdentity, a.nodeInfo.notaryIdentity).returnValue
}.toList()
Futures.allAsList(doneFutures).get()
println("STARTING PAYMENT")
startPublishingFixedRateInjector(metricRegistry, 8, 5.minutes, 100L / TimeUnit.SECONDS) {
connection.proxy.startFlow(::CashPaymentFlow, 1.DOLLARS, a.nodeInfo.legalIdentity).returnValue.get()
}
}
}
}
}

View File

@ -29,7 +29,7 @@ import kotlin.test.assertEquals
class DistributedServiceTests : DriverBasedTest() { class DistributedServiceTests : DriverBasedTest() {
lateinit var alice: NodeHandle lateinit var alice: NodeHandle
lateinit var notaries: List<NodeHandle> lateinit var notaries: List<NodeHandle.OutOfProcess>
lateinit var aliceProxy: CordaRPCOps lateinit var aliceProxy: CordaRPCOps
lateinit var raftNotaryIdentity: Party lateinit var raftNotaryIdentity: Party
lateinit var notaryStateMachines: Observable<Pair<NodeInfo, StateMachineUpdate>> lateinit var notaryStateMachines: Observable<Pair<NodeInfo, StateMachineUpdate>>
@ -52,7 +52,7 @@ class DistributedServiceTests : DriverBasedTest() {
alice = aliceFuture.get() alice = aliceFuture.get()
val (notaryIdentity, notaryNodes) = notariesFuture.get() val (notaryIdentity, notaryNodes) = notariesFuture.get()
raftNotaryIdentity = notaryIdentity raftNotaryIdentity = notaryIdentity
notaries = notaryNodes notaries = notaryNodes.map { it as NodeHandle.OutOfProcess }
assertEquals(notaries.size, clusterSize) assertEquals(notaries.size, clusterSize)
assertEquals(notaries.size, notaries.map { it.nodeInfo.legalIdentity }.toSet().size) assertEquals(notaries.size, notaries.map { it.nodeInfo.legalIdentity }.toSet().size)

View File

@ -25,10 +25,10 @@ class DriverTests {
private val executorService: ScheduledExecutorService = Executors.newScheduledThreadPool(2) private val executorService: ScheduledExecutorService = Executors.newScheduledThreadPool(2)
private fun nodeMustBeUp(handleFuture: ListenableFuture<NodeHandle>) = handleFuture.getOrThrow().apply { private fun nodeMustBeUp(handleFuture: ListenableFuture<out NodeHandle>) = handleFuture.getOrThrow().apply {
val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address) val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address)
// Check that the port is bound // Check that the port is bound
addressMustBeBound(executorService, hostAndPort, process) addressMustBeBound(executorService, hostAndPort, (this as? NodeHandle.OutOfProcess)?.process)
} }
private fun nodeMustBeDown(handle: NodeHandle) { private fun nodeMustBeDown(handle: NodeHandle) {

View File

@ -224,6 +224,7 @@ fun <A> rpcDriver(
systemProperties: Map<String, String> = emptyMap(), systemProperties: Map<String, String> = emptyMap(),
useTestClock: Boolean = false, useTestClock: Boolean = false,
networkMapStartStrategy: NetworkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = false), networkMapStartStrategy: NetworkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = false),
startNodesInProcess: Boolean = false,
dsl: RPCDriverExposedDSLInterface.() -> A dsl: RPCDriverExposedDSLInterface.() -> A
) = genericDriver( ) = genericDriver(
driverDsl = RPCDriverDSL( driverDsl = RPCDriverDSL(
@ -234,7 +235,8 @@ fun <A> rpcDriver(
driverDirectory = driverDirectory.toAbsolutePath(), driverDirectory = driverDirectory.toAbsolutePath(),
useTestClock = useTestClock, useTestClock = useTestClock,
networkMapStartStrategy = networkMapStartStrategy, networkMapStartStrategy = networkMapStartStrategy,
isDebug = isDebug isDebug = isDebug,
startNodesInProcess = startNodesInProcess
) )
), ),
coerce = { it }, coerce = { it },

View File

@ -19,17 +19,21 @@ import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType import net.corda.core.node.services.ServiceType
import net.corda.core.utilities.* import net.corda.core.utilities.*
import net.corda.node.internal.Node
import net.corda.node.internal.NodeStartup import net.corda.node.internal.NodeStartup
import net.corda.node.serialization.NodeClock
import net.corda.node.services.config.* import net.corda.node.services.config.*
import net.corda.node.services.network.NetworkMapService import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.RaftValidatingNotaryService import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.utilities.ServiceIdentityGenerator import net.corda.node.utilities.ServiceIdentityGenerator
import net.corda.node.utilities.TestClock
import net.corda.nodeapi.ArtemisMessagingComponent import net.corda.nodeapi.ArtemisMessagingComponent
import net.corda.nodeapi.User import net.corda.nodeapi.User
import net.corda.nodeapi.config.SSLConfiguration import net.corda.nodeapi.config.SSLConfiguration
import net.corda.nodeapi.config.parseAs import net.corda.nodeapi.config.parseAs
import net.corda.nodeapi.internal.ShutdownHook import net.corda.nodeapi.internal.ShutdownHook
import net.corda.nodeapi.internal.addShutdownHook import net.corda.nodeapi.internal.addShutdownHook
import net.corda.testing.MOCK_VERSION_INFO
import okhttp3.OkHttpClient import okhttp3.OkHttpClient
import okhttp3.Request import okhttp3.Request
import org.bouncycastle.asn1.x500.X500Name import org.bouncycastle.asn1.x500.X500Name
@ -38,6 +42,7 @@ import java.io.File
import java.net.* import java.net.*
import java.nio.file.Path import java.nio.file.Path
import java.nio.file.Paths import java.nio.file.Paths
import java.time.Clock
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
import java.time.ZoneOffset.UTC import java.time.ZoneOffset.UTC
@ -47,6 +52,7 @@ import java.util.concurrent.*
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
/** /**
@ -54,6 +60,8 @@ import java.util.concurrent.atomic.AtomicInteger
* *
* The process the driver is run in behaves as an Artemis client and starts up other processes. Namely it first * The process the driver is run in behaves as an Artemis client and starts up other processes. Namely it first
* bootstraps a network map service to allow the specified nodes to connect to, then starts up the actual nodes. * bootstraps a network map service to allow the specified nodes to connect to, then starts up the actual nodes.
*
* TODO this file is getting way too big, it should be split into several files.
*/ */
private val log: Logger = loggerFor<DriverDSL>() private val log: Logger = loggerFor<DriverDSL>()
@ -70,15 +78,21 @@ interface DriverDSLExposedInterface : CordformContext {
* @param advertisedServices The set of services to be advertised by the node. Defaults to empty set. * @param advertisedServices The set of services to be advertised by the node. Defaults to empty set.
* @param verifierType The type of transaction verifier to use. See: [VerifierType] * @param verifierType The type of transaction verifier to use. See: [VerifierType]
* @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list. * @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list.
* @param startInSameProcess Determines if the node should be started inside the same process the Driver is running
* in. If null the Driver-level value will be used.
* @return The [NodeInfo] of the started up node retrieved from the network map service. * @return The [NodeInfo] of the started up node retrieved from the network map service.
*/ */
fun startNode(providedName: X500Name? = null, fun startNode(providedName: X500Name? = null,
advertisedServices: Set<ServiceInfo> = emptySet(), advertisedServices: Set<ServiceInfo> = emptySet(),
rpcUsers: List<User> = emptyList(), rpcUsers: List<User> = emptyList(),
verifierType: VerifierType = VerifierType.InMemory, verifierType: VerifierType = VerifierType.InMemory,
customOverrides: Map<String, Any?> = emptyMap()): ListenableFuture<NodeHandle> customOverrides: Map<String, Any?> = emptyMap(),
startInSameProcess: Boolean? = null): ListenableFuture<out NodeHandle>
fun startNodes(nodes: List<CordformNode>): List<ListenableFuture<NodeHandle>> fun startNodes(
nodes: List<CordformNode>,
startInSameProcess: Boolean? = null
): List<ListenableFuture<out NodeHandle>>
/** /**
* Starts a distributed notary cluster. * Starts a distributed notary cluster.
@ -88,6 +102,8 @@ interface DriverDSLExposedInterface : CordformContext {
* @param type The advertised notary service type. Currently the only supported type is [RaftValidatingNotaryService.type]. * @param type The advertised notary service type. Currently the only supported type is [RaftValidatingNotaryService.type].
* @param verifierType The type of transaction verifier to use. See: [VerifierType] * @param verifierType The type of transaction verifier to use. See: [VerifierType]
* @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list. * @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list.
* @param startInSameProcess Determines if the node should be started inside the same process the Driver is running
* in. If null the Driver-level value will be used.
* @return The [Party] identity of the distributed notary service, and the [NodeInfo]s of the notaries in the cluster. * @return The [Party] identity of the distributed notary service, and the [NodeInfo]s of the notaries in the cluster.
*/ */
fun startNotaryCluster( fun startNotaryCluster(
@ -95,7 +111,8 @@ interface DriverDSLExposedInterface : CordformContext {
clusterSize: Int = 3, clusterSize: Int = 3,
type: ServiceType = RaftValidatingNotaryService.type, type: ServiceType = RaftValidatingNotaryService.type,
verifierType: VerifierType = VerifierType.InMemory, verifierType: VerifierType = VerifierType.InMemory,
rpcUsers: List<User> = emptyList()): Future<Pair<Party, List<NodeHandle>>> rpcUsers: List<User> = emptyList(),
startInSameProcess: Boolean? = null): ListenableFuture<Pair<Party, List<NodeHandle>>>
/** /**
* Starts a web server for a node * Starts a web server for a node
@ -107,8 +124,10 @@ interface DriverDSLExposedInterface : CordformContext {
/** /**
* Starts a network map service node. Note that only a single one should ever be running, so you will probably want * Starts a network map service node. Note that only a single one should ever be running, so you will probably want
* to set networkMapStartStrategy to Dedicated(false) in your [driver] call. * to set networkMapStartStrategy to Dedicated(false) in your [driver] call.
* @param startInProcess Determines if the node should be started inside this process. If null the Driver-level
* value will be used.
*/ */
fun startDedicatedNetworkMapService(): ListenableFuture<Unit> fun startDedicatedNetworkMapService(startInProcess: Boolean? = null): ListenableFuture<out NodeHandle>
fun waitForAllNodesToFinish() fun waitForAllNodesToFinish()
@ -138,13 +157,30 @@ interface DriverDSLInternalInterface : DriverDSLExposedInterface {
fun shutdown() fun shutdown()
} }
data class NodeHandle( sealed class NodeHandle {
val nodeInfo: NodeInfo, abstract val nodeInfo: NodeInfo
val rpc: CordaRPCOps, abstract val rpc: CordaRPCOps
val configuration: FullNodeConfiguration, abstract val configuration: FullNodeConfiguration
val webAddress: HostAndPort, abstract val webAddress: HostAndPort
data class OutOfProcess(
override val nodeInfo: NodeInfo,
override val rpc: CordaRPCOps,
override val configuration: FullNodeConfiguration,
override val webAddress: HostAndPort,
val debugPort: Int?,
val process: Process val process: Process
) { ) : NodeHandle()
data class InProcess(
override val nodeInfo: NodeInfo,
override val rpc: CordaRPCOps,
override val configuration: FullNodeConfiguration,
override val webAddress: HostAndPort,
val node: Node,
val nodeThread: Thread
) : NodeHandle()
fun rpcClientToNode(): CordaRPCClient = CordaRPCClient(configuration.rpcAddress!!) fun rpcClientToNode(): CordaRPCClient = CordaRPCClient(configuration.rpcAddress!!)
} }
@ -186,6 +222,7 @@ sealed class PortAllocation {
* *
* The driver implicitly bootstraps a [NetworkMapService]. * The driver implicitly bootstraps a [NetworkMapService].
* *
* @param isDebug Indicates whether the spawned nodes should start in jdwt debug mode and have debug level logging.
* @param driverDirectory The base directory node directories go into, defaults to "build/<timestamp>/". The node * @param driverDirectory The base directory node directories go into, defaults to "build/<timestamp>/". The node
* directories themselves are "<baseDirectory>/<legalName>/", where legalName defaults to "<randomName>-<messagingPort>" * directories themselves are "<baseDirectory>/<legalName>/", where legalName defaults to "<randomName>-<messagingPort>"
* and may be specified in [DriverDSL.startNode]. * and may be specified in [DriverDSL.startNode].
@ -193,7 +230,9 @@ sealed class PortAllocation {
* @param debugPortAllocation The port allocation strategy to use for jvm debugging. Defaults to incremental. * @param debugPortAllocation The port allocation strategy to use for jvm debugging. Defaults to incremental.
* @param systemProperties A Map of extra system properties which will be given to each new node. Defaults to empty. * @param systemProperties A Map of extra system properties which will be given to each new node. Defaults to empty.
* @param useTestClock If true the test clock will be used in Node. * @param useTestClock If true the test clock will be used in Node.
* @param isDebug Indicates whether the spawned nodes should start in jdwt debug mode and have debug level logging. * @param networkMapStartStrategy Determines whether a network map node is started automatically.
* @param startNodesInProcess Provides the default behaviour of whether new nodes should start inside this process or
* not. Note that this may be overridden in [DriverDSLExposedInterface.startNode].
* @param dsl The dsl itself. * @param dsl The dsl itself.
* @return The value returned in the [dsl] closure. * @return The value returned in the [dsl] closure.
*/ */
@ -206,6 +245,7 @@ fun <A> driver(
systemProperties: Map<String, String> = emptyMap(), systemProperties: Map<String, String> = emptyMap(),
useTestClock: Boolean = false, useTestClock: Boolean = false,
networkMapStartStrategy: NetworkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = true), networkMapStartStrategy: NetworkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = true),
startNodesInProcess: Boolean = false,
dsl: DriverDSLExposedInterface.() -> A dsl: DriverDSLExposedInterface.() -> A
) = genericDriver( ) = genericDriver(
driverDsl = DriverDSL( driverDsl = DriverDSL(
@ -215,6 +255,7 @@ fun <A> driver(
driverDirectory = driverDirectory.toAbsolutePath(), driverDirectory = driverDirectory.toAbsolutePath(),
useTestClock = useTestClock, useTestClock = useTestClock,
networkMapStartStrategy = networkMapStartStrategy, networkMapStartStrategy = networkMapStartStrategy,
startNodesInProcess = startNodesInProcess,
isDebug = isDebug isDebug = isDebug
), ),
coerce = { it }, coerce = { it },
@ -259,13 +300,13 @@ class ListenProcessDeathException(message: String) : Exception(message)
/** /**
* @throws ListenProcessDeathException if [listenProcess] dies before the check succeeds, i.e. the check can't succeed as intended. * @throws ListenProcessDeathException if [listenProcess] dies before the check succeeds, i.e. the check can't succeed as intended.
*/ */
fun addressMustBeBound(executorService: ScheduledExecutorService, hostAndPort: HostAndPort, listenProcess: Process) { fun addressMustBeBound(executorService: ScheduledExecutorService, hostAndPort: HostAndPort, listenProcess: Process? = null) {
addressMustBeBoundFuture(executorService, hostAndPort, listenProcess).getOrThrow() addressMustBeBoundFuture(executorService, hostAndPort, listenProcess).getOrThrow()
} }
fun addressMustBeBoundFuture(executorService: ScheduledExecutorService, hostAndPort: HostAndPort, listenProcess: Process): ListenableFuture<Unit> { fun addressMustBeBoundFuture(executorService: ScheduledExecutorService, hostAndPort: HostAndPort, listenProcess: Process? = null): ListenableFuture<Unit> {
return poll(executorService, "address $hostAndPort to bind") { return poll(executorService, "address $hostAndPort to bind") {
if (!listenProcess.isAlive) { if (listenProcess != null && !listenProcess.isAlive) {
throw ListenProcessDeathException("The process that was expected to listen on $hostAndPort has died with status: ${listenProcess.exitValue()}") throw ListenProcessDeathException("The process that was expected to listen on $hostAndPort has died with status: ${listenProcess.exitValue()}")
} }
try { try {
@ -337,6 +378,19 @@ class ShutdownManager(private val executorService: ExecutorService) {
private val state = ThreadBox(State()) private val state = ThreadBox(State())
companion object {
inline fun <A> run(providedExecutorService: ExecutorService? = null, block: ShutdownManager.() -> A): A {
val executorService = providedExecutorService ?: Executors.newScheduledThreadPool(1)
val shutdownManager = ShutdownManager(executorService)
try {
return block(shutdownManager)
} finally {
shutdownManager.shutdown()
providedExecutorService ?: executorService.shutdown()
}
}
}
fun shutdown() { fun shutdown() {
val shutdownFutures = state.locked { val shutdownFutures = state.locked {
if (isShutdown) { if (isShutdown) {
@ -421,13 +475,15 @@ class DriverDSL(
val driverDirectory: Path, val driverDirectory: Path,
val useTestClock: Boolean, val useTestClock: Boolean,
val isDebug: Boolean, val isDebug: Boolean,
val networkMapStartStrategy: NetworkMapStartStrategy val networkMapStartStrategy: NetworkMapStartStrategy,
val startNodesInProcess: Boolean
) : DriverDSLInternalInterface { ) : DriverDSLInternalInterface {
private val dedicatedNetworkMapAddress = portAllocation.nextHostAndPort() private val dedicatedNetworkMapAddress = portAllocation.nextHostAndPort()
var _executorService: ListeningScheduledExecutorService? = null private var _executorService: ListeningScheduledExecutorService? = null
val executorService get() = _executorService!! val executorService get() = _executorService!!
var _shutdownManager: ShutdownManager? = null private var _shutdownManager: ShutdownManager? = null
override val shutdownManager get() = _shutdownManager!! override val shutdownManager get() = _shutdownManager!!
private val callerPackage = getCallerPackage()
class State { class State {
val processes = ArrayList<ListenableFuture<Process>>() val processes = ArrayList<ListenableFuture<Process>>()
@ -500,13 +556,18 @@ class DriverDSL(
advertisedServices: Set<ServiceInfo>, advertisedServices: Set<ServiceInfo>,
rpcUsers: List<User>, rpcUsers: List<User>,
verifierType: VerifierType, verifierType: VerifierType,
customOverrides: Map<String, Any?>): ListenableFuture<NodeHandle> { customOverrides: Map<String, Any?>,
startInSameProcess: Boolean?
): ListenableFuture<out NodeHandle> {
val p2pAddress = portAllocation.nextHostAndPort() val p2pAddress = portAllocation.nextHostAndPort()
val rpcAddress = portAllocation.nextHostAndPort() val rpcAddress = portAllocation.nextHostAndPort()
val webAddress = portAllocation.nextHostAndPort() val webAddress = portAllocation.nextHostAndPort()
// TODO: Derive name from the full picked name, don't just wrap the common name // TODO: Derive name from the full picked name, don't just wrap the common name
val name = providedName ?: X509Utilities.getX509Name("${oneOf(names).commonName}-${p2pAddress.port}","London","demo@r3.com",null) val name = providedName ?: X509Utilities.getX509Name("${oneOf(names).commonName}-${p2pAddress.port}","London","demo@r3.com",null)
return startNode(p2pAddress, webAddress, name, configOf( val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
configOverrides = configOf(
"myLegalName" to name.toString(), "myLegalName" to name.toString(),
"p2pAddress" to p2pAddress.toString(), "p2pAddress" to p2pAddress.toString(),
"rpcAddress" to rpcAddress.toString(), "rpcAddress" to rpcAddress.toString(),
@ -516,41 +577,29 @@ class DriverDSL(
"useTestClock" to useTestClock, "useTestClock" to useTestClock,
"rpcUsers" to rpcUsers.map { it.toMap() }, "rpcUsers" to rpcUsers.map { it.toMap() },
"verifierType" to verifierType.name "verifierType" to verifierType.name
) + customOverrides) ) + customOverrides
)
return startNodeInternal(config, webAddress, startInSameProcess)
} }
private fun startNode(p2pAddress: HostAndPort, webAddress: HostAndPort, nodeName: X500Name, configOverrides: Config) = run { override fun startNodes(nodes: List<CordformNode>, startInSameProcess: Boolean?): List<ListenableFuture<out NodeHandle>> {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(nodeName),
allowMissingConfig = true,
configOverrides = configOverrides)
val configuration = config.parseAs<FullNodeConfiguration>()
val processFuture = startNode(executorService, configuration, config, quasarJarPath, debugPort, systemProperties)
registerProcess(processFuture)
processFuture.flatMap { process ->
// We continue to use SSL enabled port for RPC when its for node user.
establishRpc(p2pAddress, configuration).flatMap { rpc ->
rpc.waitUntilRegisteredWithNetworkMap().map {
NodeHandle(rpc.nodeIdentity(), rpc, configuration, webAddress, process)
}
}
}
}
override fun startNodes(nodes: List<CordformNode>): List<ListenableFuture<NodeHandle>> {
val networkMapServiceConfigLookup = networkMapServiceConfigLookup(nodes) val networkMapServiceConfigLookup = networkMapServiceConfigLookup(nodes)
return nodes.map { return nodes.map { node ->
val p2pAddress = HostAndPort.fromString(it.config.getString("p2pAddress")); portAllocation.nextHostAndPort()
portAllocation.nextHostAndPort() // rpcAddress portAllocation.nextHostAndPort() // rpcAddress
val webAddress = portAllocation.nextHostAndPort() val webAddress = portAllocation.nextHostAndPort()
val name = X500Name(it.name) val name = X500Name(node.name)
startNode(p2pAddress, webAddress, name, it.config + mapOf(
"extraAdvertisedServiceIds" to it.advertisedServices, val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
configOverrides = node.config + mapOf(
"extraAdvertisedServiceIds" to node.advertisedServices,
"networkMapService" to networkMapServiceConfigLookup(name), "networkMapService" to networkMapServiceConfigLookup(name),
"rpcUsers" to it.rpcUsers, "rpcUsers" to node.rpcUsers,
"notaryClusterAddresses" to it.notaryClusterAddresses "notaryClusterAddresses" to node.notaryClusterAddresses
)) )
)
startNodeInternal(config, webAddress, startInSameProcess)
} }
} }
@ -559,7 +608,8 @@ class DriverDSL(
clusterSize: Int, clusterSize: Int,
type: ServiceType, type: ServiceType,
verifierType: VerifierType, verifierType: VerifierType,
rpcUsers: List<User> rpcUsers: List<User>,
startInSameProcess: Boolean?
): ListenableFuture<Pair<Party, List<NodeHandle>>> { ): ListenableFuture<Pair<Party, List<NodeHandle>>> {
val nodeNames = (0 until clusterSize).map { DUMMY_NOTARY.name.appendToCommonName(" $it") } val nodeNames = (0 until clusterSize).map { DUMMY_NOTARY.name.appendToCommonName(" $it") }
val paths = nodeNames.map { baseDirectory(it) } val paths = nodeNames.map { baseDirectory(it) }
@ -568,7 +618,14 @@ class DriverDSL(
val notaryClusterAddress = portAllocation.nextHostAndPort() val notaryClusterAddress = portAllocation.nextHostAndPort()
// Start the first node that will bootstrap the cluster // Start the first node that will bootstrap the cluster
val firstNotaryFuture = startNode(nodeNames.first(), advertisedServices, rpcUsers, verifierType, mapOf("notaryNodeAddress" to notaryClusterAddress.toString())) val firstNotaryFuture = startNode(
providedName = nodeNames.first(),
advertisedServices = advertisedServices,
rpcUsers = rpcUsers,
verifierType = verifierType,
customOverrides = mapOf("notaryNodeAddress" to notaryClusterAddress.toString()),
startInSameProcess = startInSameProcess
)
// All other nodes will join the cluster // All other nodes will join the cluster
val restNotaryFutures = nodeNames.drop(1).map { val restNotaryFutures = nodeNames.drop(1).map {
val nodeAddress = portAllocation.nextHostAndPort() val nodeAddress = portAllocation.nextHostAndPort()
@ -613,6 +670,8 @@ class DriverDSL(
Executors.newScheduledThreadPool(2, ThreadFactoryBuilder().setNameFormat("driver-pool-thread-%d").build()) Executors.newScheduledThreadPool(2, ThreadFactoryBuilder().setNameFormat("driver-pool-thread-%d").build())
) )
_shutdownManager = ShutdownManager(executorService) _shutdownManager = ShutdownManager(executorService)
// We set this property so that in-process nodes find cordapps. Out-of-process nodes need this passed in when started.
System.setProperty("net.corda.node.cordapp.scan.package", callerPackage)
if (networkMapStartStrategy.startDedicated) { if (networkMapStartStrategy.startDedicated) {
startDedicatedNetworkMapService().andForget(log) // Allow it to start concurrently with other nodes. startDedicatedNetworkMapService().andForget(log) // Allow it to start concurrently with other nodes.
} }
@ -620,9 +679,8 @@ class DriverDSL(
override fun baseDirectory(nodeName: X500Name): Path = driverDirectory / nodeName.commonName.replace(WHITESPACE, "") override fun baseDirectory(nodeName: X500Name): Path = driverDirectory / nodeName.commonName.replace(WHITESPACE, "")
override fun startDedicatedNetworkMapService(): ListenableFuture<Unit> { override fun startDedicatedNetworkMapService(startInProcess: Boolean?): ListenableFuture<out NodeHandle> {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null val webAddress = portAllocation.nextHostAndPort()
val apiAddress = portAllocation.nextHostAndPort().toString()
val networkMapLegalName = networkMapStartStrategy.legalName val networkMapLegalName = networkMapStartStrategy.legalName
val config = ConfigHelper.loadConfig( val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(networkMapLegalName), baseDirectory = baseDirectory(networkMapLegalName),
@ -631,16 +689,44 @@ class DriverDSL(
"myLegalName" to networkMapLegalName.toString(), "myLegalName" to networkMapLegalName.toString(),
// TODO: remove the webAddress as NMS doesn't need to run a web server. This will cause all // TODO: remove the webAddress as NMS doesn't need to run a web server. This will cause all
// node port numbers to be shifted, so all demos and docs need to be updated accordingly. // node port numbers to be shifted, so all demos and docs need to be updated accordingly.
"webAddress" to apiAddress, "webAddress" to webAddress.toString(),
"p2pAddress" to dedicatedNetworkMapAddress.toString(), "p2pAddress" to dedicatedNetworkMapAddress.toString(),
"useTestClock" to useTestClock "useTestClock" to useTestClock
) )
) )
return startNodeInternal(config, webAddress, startInProcess)
}
log.info("Starting network-map-service") private fun startNodeInternal(config: Config, webAddress: HostAndPort, startInProcess: Boolean?): ListenableFuture<out NodeHandle> {
val startNode = startNode(executorService, config.parseAs<FullNodeConfiguration>(), config, quasarJarPath, debugPort, systemProperties) val nodeConfiguration = config.parseAs<FullNodeConfiguration>()
registerProcess(startNode) if (startInProcess ?: startNodesInProcess) {
return startNode.flatMap { addressMustBeBoundFuture(executorService, dedicatedNetworkMapAddress, it) } val nodeAndThreadFuture = startInProcessNode(executorService, nodeConfiguration, config)
shutdownManager.registerShutdown(
nodeAndThreadFuture.map { (node, thread) -> {
node.stop()
thread.interrupt()
} }
)
return nodeAndThreadFuture.flatMap { (node, thread) ->
establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration).flatMap { rpc ->
rpc.waitUntilRegisteredWithNetworkMap().map {
NodeHandle.InProcess(rpc.nodeIdentity(), rpc, nodeConfiguration, webAddress, node, thread)
}
}
}
} else {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val processFuture = startOutOfProcessNode(executorService, nodeConfiguration, config, quasarJarPath, debugPort, systemProperties, callerPackage)
registerProcess(processFuture)
return processFuture.flatMap { process ->
// We continue to use SSL enabled port for RPC when its for node user.
establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration).flatMap { rpc ->
rpc.waitUntilRegisteredWithNetworkMap().map {
NodeHandle.OutOfProcess(rpc.nodeIdentity(), rpc, nodeConfiguration, webAddress, debugPort, process)
}
}
}
}
} }
override fun <A> pollUntilNonNull(pollName: String, pollInterval: Duration, warnCount: Int, check: () -> A?): ListenableFuture<A> { override fun <A> pollUntilNonNull(pollName: String, pollInterval: Duration, warnCount: Int, check: () -> A?): ListenableFuture<A> {
@ -658,20 +744,37 @@ class DriverDSL(
private fun <A> oneOf(array: Array<A>) = array[Random().nextInt(array.size)] private fun <A> oneOf(array: Array<A>) = array[Random().nextInt(array.size)]
private fun startNode( private fun startInProcessNode(
executorService: ListeningScheduledExecutorService,
nodeConf: FullNodeConfiguration,
config: Config
): ListenableFuture<Pair<Node, Thread>> {
return executorService.submit<Pair<Node, Thread>> {
log.info("Starting in-process Node ${nodeConf.myLegalName.commonName}")
// Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config)
val clock: Clock = if (nodeConf.useTestClock) TestClock() else NodeClock()
// TODO pass the version in?
val node = Node(nodeConf, nodeConf.calculateServices(), MOCK_VERSION_INFO, clock)
node.start()
val nodeThread = thread(name = nodeConf.myLegalName.commonName) {
node.run()
}
node to nodeThread
}.flatMap { nodeAndThread -> addressMustBeBoundFuture(executorService, nodeConf.p2pAddress).map { nodeAndThread } }
}
private fun startOutOfProcessNode(
executorService: ListeningScheduledExecutorService, executorService: ListeningScheduledExecutorService,
nodeConf: FullNodeConfiguration, nodeConf: FullNodeConfiguration,
config: Config, config: Config,
quasarJarPath: String, quasarJarPath: String,
debugPort: Int?, debugPort: Int?,
overriddenSystemProperties: Map<String, String> overriddenSystemProperties: Map<String, String>,
callerPackage: String
): ListenableFuture<Process> { ): ListenableFuture<Process> {
// Get the package of the caller of the driver and pass this to the node for CorDapp scanning
val callerPackage = Exception()
.stackTrace
.first { it.fileName != "Driver.kt" }
.let { Class.forName(it.className).`package`.name }
val processFuture = executorService.submit<Process> { val processFuture = executorService.submit<Process> {
log.info("Starting out-of-process Node ${nodeConf.myLegalName.commonName}")
// Write node.conf // Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config) writeConfig(nodeConf.baseDirectory, "node.conf", config)
@ -726,6 +829,13 @@ class DriverDSL(
) )
}.flatMap { process -> addressMustBeBoundFuture(executorService, handle.webAddress, process).map { process } } }.flatMap { process -> addressMustBeBoundFuture(executorService, handle.webAddress, process).map { process } }
} }
private fun getCallerPackage(): String {
return Exception()
.stackTrace
.first { it.fileName != "Driver.kt" }
.let { Class.forName(it.className).`package`.name }
}
} }
} }

View File

@ -0,0 +1,108 @@
package net.corda.testing.performance
import com.codahale.metrics.Gauge
import com.codahale.metrics.MetricRegistry
import com.google.common.base.Stopwatch
import net.corda.core.utilities.Rate
import net.corda.testing.driver.ShutdownManager
import java.time.Duration
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.thread
import kotlin.concurrent.withLock
fun startTightLoopInjector(
parallelism: Int,
numberOfInjections: Int,
queueBound: Int,
work: () -> Unit
) {
ShutdownManager.run {
val executor = Executors.newFixedThreadPool(parallelism)
registerShutdown { executor.shutdown() }
val remainingLatch = CountDownLatch(numberOfInjections)
val queuedCount = AtomicInteger(0)
val lock = ReentrantLock()
val canQueueAgain = lock.newCondition()
val injector = thread(name = "injector") {
val leftToSubmit = AtomicInteger(numberOfInjections)
while (true) {
if (leftToSubmit.getAndDecrement() == 0) break
executor.submit {
work()
if (queuedCount.decrementAndGet() < queueBound / 2) {
lock.withLock {
canQueueAgain.signal()
}
}
remainingLatch.countDown()
}
if (queuedCount.incrementAndGet() > queueBound) {
lock.withLock {
canQueueAgain.await()
}
}
}
}
registerShutdown { injector.interrupt() }
remainingLatch.await()
injector.join()
}
}
fun startPublishingFixedRateInjector(
metricRegistry: MetricRegistry,
parallelism: Int,
overallDuration: Duration,
injectionRate: Rate,
queueSizeMetricName: String = "QueueSize",
workDurationMetricName: String = "WorkDuration",
work: () -> Unit
) {
val workSemaphore = Semaphore(0)
metricRegistry.register(queueSizeMetricName, Gauge { workSemaphore.availablePermits() })
val workDurationTimer = metricRegistry.timer(workDurationMetricName)
ShutdownManager.run {
val executor = Executors.newSingleThreadScheduledExecutor()
registerShutdown { executor.shutdown() }
val workExecutor = Executors.newFixedThreadPool(parallelism)
registerShutdown { workExecutor.shutdown() }
val timings = Collections.synchronizedList(ArrayList<Long>())
for (i in 1..parallelism) {
workExecutor.submit {
try {
while (true) {
workSemaphore.acquire()
workDurationTimer.time {
timings.add(
Stopwatch.createStarted().apply {
work()
}.stop().elapsed(TimeUnit.MICROSECONDS)
)
}
}
} catch (throwable: Throwable) {
throwable.printStackTrace()
}
}
}
val injector = executor.scheduleAtFixedRate(
{
workSemaphore.release((injectionRate * TimeUnit.SECONDS).toInt())
},
0,
1,
TimeUnit.SECONDS
)
registerShutdown {
injector.cancel(true)
}
Thread.sleep(overallDuration.toMillis())
}
}

View File

@ -0,0 +1,34 @@
package net.corda.testing.performance
import com.codahale.metrics.ConsoleReporter
import com.codahale.metrics.JmxReporter
import com.codahale.metrics.MetricRegistry
import net.corda.testing.driver.ShutdownManager
import java.util.concurrent.TimeUnit
import javax.management.ObjectName
import kotlin.concurrent.thread
fun startReporter(shutdownManager: ShutdownManager, metricRegistry: MetricRegistry = MetricRegistry()): MetricRegistry {
val jmxReporter = thread {
JmxReporter.
forRegistry(metricRegistry).
inDomain("net.corda").
createsObjectNamesWith { _, domain, name ->
// Make the JMX hierarchy a bit better organised.
val category = name.substringBefore('.')
val subName = name.substringAfter('.', "")
if (subName == "")
ObjectName("$domain:name=$category")
else
ObjectName("$domain:type=$category,name=$subName")
}.
build().
start()
}
shutdownManager.registerShutdown { jmxReporter.interrupt() }
val consoleReporter = thread {
ConsoleReporter.forRegistry(metricRegistry).build().start(1, TimeUnit.SECONDS)
}
shutdownManager.registerShutdown { consoleReporter.interrupt() }
return metricRegistry
}

View File

@ -80,6 +80,7 @@ fun <A> verifierDriver(
systemProperties: Map<String, String> = emptyMap(), systemProperties: Map<String, String> = emptyMap(),
useTestClock: Boolean = false, useTestClock: Boolean = false,
networkMapStartStrategy: NetworkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = false), networkMapStartStrategy: NetworkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = false),
startNodesInProcess: Boolean = false,
dsl: VerifierExposedDSLInterface.() -> A dsl: VerifierExposedDSLInterface.() -> A
) = genericDriver( ) = genericDriver(
driverDsl = VerifierDriverDSL( driverDsl = VerifierDriverDSL(
@ -90,7 +91,8 @@ fun <A> verifierDriver(
driverDirectory = driverDirectory.toAbsolutePath(), driverDirectory = driverDirectory.toAbsolutePath(),
useTestClock = useTestClock, useTestClock = useTestClock,
networkMapStartStrategy = networkMapStartStrategy, networkMapStartStrategy = networkMapStartStrategy,
isDebug = isDebug isDebug = isDebug,
startNodesInProcess = startNodesInProcess
) )
), ),
coerce = { it }, coerce = { it },