diff --git a/.idea/compiler.xml b/.idea/compiler.xml index c431af7fa7..67675cf9a5 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -16,6 +16,9 @@ + + + @@ -43,6 +46,8 @@ + + @@ -54,8 +59,13 @@ + + + + + @@ -106,6 +116,9 @@ + + + diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index f395e9b831..b8a7353ef3 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -83,6 +83,8 @@ UNRELEASED * Replaced node configuration parameter ``certificateSigningService`` with ``compatibilityZoneURL``, which is Corda compatibility zone network management service's address. +* ``waitForAllNodesToFinish()`` method in ``DriverDSLExposedInterface`` has instead become a parameter on driver creation. + .. _changelog_v1: Release 1.0 diff --git a/docs/source/example-code/src/main/kotlin/net/corda/docs/ClientRpcTutorial.kt b/docs/source/example-code/src/main/kotlin/net/corda/docs/ClientRpcTutorial.kt index 1f20376fbd..f68bc82edd 100644 --- a/docs/source/example-code/src/main/kotlin/net/corda/docs/ClientRpcTutorial.kt +++ b/docs/source/example-code/src/main/kotlin/net/corda/docs/ClientRpcTutorial.kt @@ -48,7 +48,7 @@ fun main(args: Array) { startFlow(), invokeRpc(CordaRPCOps::nodeInfo) )) - driver(driverDirectory = baseDirectory, extraCordappPackagesToScan = listOf("net.corda.finance")) { + driver(driverDirectory = baseDirectory, extraCordappPackagesToScan = listOf("net.corda.finance"), waitForAllNodesToFinish = true) { val node = startNode(providedName = ALICE.name, rpcUsers = listOf(user)).get() // END 1 @@ -96,7 +96,6 @@ fun main(args: Array) { graph.display() } } - waitForAllNodesToFinish() // END 5 } } diff --git a/docs/source/tutorial-cordapp.rst b/docs/source/tutorial-cordapp.rst index 7073a67af3..d8dfcff998 100644 --- a/docs/source/tutorial-cordapp.rst +++ b/docs/source/tutorial-cordapp.rst @@ -283,7 +283,7 @@ IntelliJ fun main(args: Array) { // No permissions required as we are not invoking flows. val user = User("user1", "test", permissions = setOf()) - driver(isDebug = true) { + driver(isDebug = true, waitForNodesToFinish = true) { startNode(getX500Name(O="Controller",L="London",C='GB"), setOf(ServiceInfo(ValidatingNotaryService.type))) val (nodeA, nodeB, nodeC) = Futures.allAsList( startNode(getX500Name(O="PartyA",L="London",C="GB"), rpcUsers = listOf(user)), @@ -293,8 +293,6 @@ IntelliJ startWebserver(nodeA) startWebserver(nodeB) startWebserver(nodeC) - - waitForAllNodesToFinish() } } @@ -505,7 +503,7 @@ Debugging is done via IntelliJ as follows: fun main(args: Array) { // No permissions required as we are not invoking flows. val user = User("user1", "test", permissions = setOf()) - driver(isDebug = true) { + driver(isDebug = true, waitForNodesToFinish = true) { startNode(getX500Name(O="Controller",L="London",C="GB"), setOf(ServiceInfo(ValidatingNotaryService.type))) val (nodeA, nodeB, nodeC) = Futures.allAsList( startNode(getX500Name(O="PartyA",L=London,C=GB"), rpcUsers = listOf(user)), @@ -515,8 +513,6 @@ Debugging is done via IntelliJ as follows: startWebserver(nodeA) startWebserver(nodeB) startWebserver(nodeC) - - waitForAllNodesToFinish() } } diff --git a/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/Main.kt b/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/Main.kt index ac2c59abac..eafecb2fa8 100644 --- a/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/Main.kt +++ b/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/Main.kt @@ -12,9 +12,8 @@ import net.corda.testing.driver.driver */ fun main(args: Array) { val demoUser = listOf(User("demo", "demo", setOf("StartFlow.net.corda.flows.FinalityFlow"))) - driver(isDebug = true, driverDirectory = "build" / "attachment-demo-nodes") { + driver(isDebug = true, driverDirectory = "build" / "attachment-demo-nodes", waitForAllNodesToFinish = true) { startNode(providedName = DUMMY_BANK_A.name, rpcUsers = demoUser) startNode(providedName = DUMMY_BANK_B.name, rpcUsers = demoUser) - waitForAllNodesToFinish() } } diff --git a/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/BankOfCordaDriver.kt b/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/BankOfCordaDriver.kt index a0e7752640..f836788d71 100644 --- a/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/BankOfCordaDriver.kt +++ b/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/BankOfCordaDriver.kt @@ -56,7 +56,7 @@ private class BankOfCordaDriver { try { when (role) { Role.ISSUER -> { - driver(isDebug = true, extraCordappPackagesToScan = listOf("net.corda.finance.contracts.asset")) { + driver(isDebug = true, extraCordappPackagesToScan = listOf("net.corda.finance.contracts.asset"), waitForAllNodesToFinish = true) { val bankUser = User( BANK_USERNAME, "test", @@ -76,7 +76,6 @@ private class BankOfCordaDriver { startFlow())) startNode(providedName = BIGCORP_LEGAL_NAME, rpcUsers = listOf(bigCorpUser)) startWebserver(bankOfCorda.get()) - waitForAllNodesToFinish() } } else -> { diff --git a/samples/irs-demo/cordapp/src/test/kotlin/net/corda/irs/Main.kt b/samples/irs-demo/cordapp/src/test/kotlin/net/corda/irs/Main.kt index 64acf2c4ac..91315e3e9f 100644 --- a/samples/irs-demo/cordapp/src/test/kotlin/net/corda/irs/Main.kt +++ b/samples/irs-demo/cordapp/src/test/kotlin/net/corda/irs/Main.kt @@ -10,7 +10,7 @@ import net.corda.testing.driver.driver * Do not use in a production environment. */ fun main(args: Array) { - driver(useTestClock = true, isDebug = true) { + driver(useTestClock = true, isDebug = true, waitForAllNodesToFinish = true) { val (nodeA, nodeB) = listOf( startNode(providedName = DUMMY_BANK_A.name), startNode(providedName = DUMMY_BANK_B.name) @@ -20,7 +20,5 @@ fun main(args: Array) { startWebserver(controller) startWebserver(nodeA) startWebserver(nodeB) - - waitForAllNodesToFinish() } } diff --git a/samples/irs-demo/src/integration-test/kotlin/net/corda/test/spring/SpringDriver.kt b/samples/irs-demo/src/integration-test/kotlin/net/corda/test/spring/SpringDriver.kt index d5a98679a6..204e981c70 100644 --- a/samples/irs-demo/src/integration-test/kotlin/net/corda/test/spring/SpringDriver.kt +++ b/samples/irs-demo/src/integration-test/kotlin/net/corda/test/spring/SpringDriver.kt @@ -1,8 +1,6 @@ package net.corda.test.spring import net.corda.core.concurrent.CordaFuture -import net.corda.core.internal.concurrent.flatMap -import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.map import net.corda.core.utilities.loggerFor import net.corda.testing.driver.* @@ -14,7 +12,6 @@ import java.net.ConnectException import java.net.URL import java.nio.file.Path import java.nio.file.Paths -import java.util.concurrent.ExecutorService import java.util.concurrent.TimeUnit interface SpringDriverExposedDSLInterface : DriverDSLExposedInterface { @@ -62,17 +59,17 @@ fun springDriver( coerce = { it }, dsl = dsl ) -data class SpringBootDriverDSL( - val driverDSL: DriverDSL -) : DriverDSLInternalInterface by driverDSL, SpringDriverInternalDSLInterface { - - val log = loggerFor() +data class SpringBootDriverDSL(private val driverDSL: DriverDSL) : DriverDSLInternalInterface by driverDSL, SpringDriverInternalDSLInterface { + companion object { + val log = loggerFor() + } override fun startSpringBootWebapp(clazz: Class<*>, handle: NodeHandle, checkUrl: String): CordaFuture { val debugPort = if (driverDSL.isDebug) driverDSL.debugPortAllocation.nextPort() else null - val processFuture = startApplication(driverDSL.executorService, handle, debugPort, clazz) - driverDSL.registerProcess(processFuture) - return processFuture.map { queryWebserver(handle, it, checkUrl) } + val process = startApplication(handle, debugPort, clazz) + driverDSL.shutdownManager.registerProcessShutdown(process) + val webReadyFuture = addressMustBeBoundFuture(driverDSL.executorService, handle.webAddress, process) + return webReadyFuture.map { queryWebserver(handle, process, checkUrl) } } private fun queryWebserver(handle: NodeHandle, process: Process, checkUrl: String): WebserverHandle { @@ -99,29 +96,27 @@ data class SpringBootDriverDSL( throw IllegalStateException("Webserver at ${handle.webAddress} has died or was not reachable at URL ${url}") } - private fun startApplication(executorService: ExecutorService, handle: NodeHandle, debugPort: Int?, clazz: Class<*>): CordaFuture { - return executorService.fork { - val className = clazz.canonicalName - ProcessUtilities.startJavaProcessImpl( - className = className, // cannot directly get class for this, so just use string - jdwpPort = debugPort, - extraJvmArguments = listOf( - "-Dname=node-${handle.configuration.p2pAddress}-webserver", - "-Djava.io.tmpdir=${System.getProperty("java.io.tmpdir")}" - // Inherit from parent process - ), - classpath = ProcessUtilities.defaultClassPath, - workingDirectory = handle.configuration.baseDirectory, - errorLogPath = Paths.get("error.$className.log"), - arguments = listOf( - "--base-directory", handle.configuration.baseDirectory.toString(), - "--server.port=${handle.webAddress.port}", - "--corda.host=${handle.configuration.rpcAddress}", - "--corda.user=${handle.configuration.rpcUsers.first().username}", - "--corda.password=${handle.configuration.rpcUsers.first().password}" - ), - maximumHeapSize = null - ) - }.flatMap { process -> addressMustBeBoundFuture(driverDSL.executorService, handle.webAddress, process).map { process } } + private fun startApplication(handle: NodeHandle, debugPort: Int?, clazz: Class<*>): Process { + val className = clazz.canonicalName + return ProcessUtilities.startJavaProcessImpl( + className = className, // cannot directly get class for this, so just use string + jdwpPort = debugPort, + extraJvmArguments = listOf( + "-Dname=node-${handle.configuration.p2pAddress}-webserver", + "-Djava.io.tmpdir=${System.getProperty("java.io.tmpdir")}" + // Inherit from parent process + ), + classpath = ProcessUtilities.defaultClassPath, + workingDirectory = handle.configuration.baseDirectory, + errorLogPath = Paths.get("error.$className.log"), + arguments = listOf( + "--base-directory", handle.configuration.baseDirectory.toString(), + "--server.port=${handle.webAddress.port}", + "--corda.host=${handle.configuration.rpcAddress}", + "--corda.user=${handle.configuration.rpcUsers.first().username}", + "--corda.password=${handle.configuration.rpcUsers.first().password}" + ), + maximumHeapSize = null + ) } } \ No newline at end of file diff --git a/samples/simm-valuation-demo/src/test/kotlin/net/corda/vega/Main.kt b/samples/simm-valuation-demo/src/test/kotlin/net/corda/vega/Main.kt index c6bdcb2a2b..41d82323bd 100644 --- a/samples/simm-valuation-demo/src/test/kotlin/net/corda/vega/Main.kt +++ b/samples/simm-valuation-demo/src/test/kotlin/net/corda/vega/Main.kt @@ -12,7 +12,7 @@ import net.corda.testing.driver.driver * via the web api. */ fun main(args: Array) { - driver(isDebug = true) { + driver(isDebug = true, waitForAllNodesToFinish = true) { val (nodeA, nodeB, nodeC) = listOf( startNode(providedName = DUMMY_BANK_A.name), startNode(providedName = DUMMY_BANK_B.name), @@ -22,7 +22,5 @@ fun main(args: Array) { startWebserver(nodeA) startWebserver(nodeB) startWebserver(nodeC) - - waitForAllNodesToFinish() } } diff --git a/samples/trader-demo/src/test/kotlin/net/corda/traderdemo/Main.kt b/samples/trader-demo/src/test/kotlin/net/corda/traderdemo/Main.kt index 3413a012f5..dce315e84e 100644 --- a/samples/trader-demo/src/test/kotlin/net/corda/traderdemo/Main.kt +++ b/samples/trader-demo/src/test/kotlin/net/corda/traderdemo/Main.kt @@ -22,13 +22,12 @@ fun main(args: Array) { startFlow(), all()) val demoUser = listOf(User("demo", "demo", permissions)) - driver(driverDirectory = "build" / "trader-demo-nodes", isDebug = true) { + driver(driverDirectory = "build" / "trader-demo-nodes", isDebug = true, waitForAllNodesToFinish = true) { val user = User("user1", "test", permissions = setOf(startFlow(), startFlow(), startFlow())) startNode(providedName = DUMMY_BANK_A.name, rpcUsers = demoUser) startNode(providedName = DUMMY_BANK_B.name, rpcUsers = demoUser) startNode(providedName = BOC.name, rpcUsers = listOf(user)) - waitForAllNodesToFinish() } } diff --git a/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt b/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt index de8d83cc82..39500c7834 100644 --- a/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt +++ b/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt @@ -7,15 +7,16 @@ import net.corda.core.internal.readLines import net.corda.core.utilities.getOrThrow import net.corda.node.internal.NodeStartup import net.corda.testing.DUMMY_BANK_A +import net.corda.testing.DUMMY_NOTARY import net.corda.testing.DUMMY_REGULATOR import net.corda.testing.ProjectStructure.projectRootDir +import net.corda.testing.node.NotarySpec import org.assertj.core.api.Assertions.assertThat import org.junit.Test import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService class DriverTests { - companion object { private val executorService: ScheduledExecutorService = Executors.newScheduledThreadPool(2) @@ -62,4 +63,19 @@ class DriverTests { assertThat(debugLinesPresent).isTrue() } } + + @Test + fun `started node, which is not waited for in the driver, is shutdown when the driver exits`() { + // First check that the process-id file is created by the node on startup, so that we can be sure our check that + // it's deleted on shutdown isn't a false-positive. + driver { + val baseDirectory = defaultNotaryNode.getOrThrow().configuration.baseDirectory + assertThat(baseDirectory / "process-id").exists() + } + + val baseDirectory = driver(notarySpecs = listOf(NotarySpec(DUMMY_NOTARY.name))) { + (this as DriverDSL).baseDirectory(DUMMY_NOTARY.name) + } + assertThat(baseDirectory / "process-id").doesNotExist() + } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt index 605a4deba0..1fd398e6ac 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt @@ -37,7 +37,6 @@ import net.corda.nodeapi.internal.addShutdownHook import net.corda.testing.* import net.corda.testing.common.internal.NetworkParametersCopier import net.corda.testing.common.internal.testNetworkParameters -import net.corda.testing.setGlobalSerialization import net.corda.testing.internal.ProcessUtilities import net.corda.testing.node.ClusterSpec import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO @@ -61,10 +60,8 @@ import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.atomic.AtomicInteger -import kotlin.collections.ArrayList import kotlin.concurrent.thread - /** * This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests. * @@ -172,8 +169,6 @@ interface DriverDSLExposedInterface : CordformContext { */ fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture - fun waitForAllNodesToFinish() - /** * Polls a function until it returns a non-null value. Note that there is no timeout on the polling. * @@ -338,6 +333,7 @@ fun driver( useTestClock: Boolean = defaultParameters.useTestClock, initialiseSerialization: Boolean = defaultParameters.initialiseSerialization, startNodesInProcess: Boolean = defaultParameters.startNodesInProcess, + waitForAllNodesToFinish: Boolean = defaultParameters.waitForNodesToFinish, notarySpecs: List = defaultParameters.notarySpecs, extraCordappPackagesToScan: List = defaultParameters.extraCordappPackagesToScan, dsl: DriverDSLExposedInterface.() -> A @@ -351,6 +347,7 @@ fun driver( useTestClock = useTestClock, isDebug = isDebug, startNodesInProcess = startNodesInProcess, + waitForNodesToFinish = waitForAllNodesToFinish, notarySpecs = notarySpecs, extraCordappPackagesToScan = extraCordappPackagesToScan ), @@ -385,6 +382,7 @@ data class DriverParameters( val useTestClock: Boolean = false, val initialiseSerialization: Boolean = true, val startNodesInProcess: Boolean = false, + val waitForNodesToFinish: Boolean = false, val notarySpecs: List = listOf(NotarySpec(DUMMY_NOTARY.name)), val extraCordappPackagesToScan: List = emptyList() ) { @@ -396,6 +394,7 @@ data class DriverParameters( fun setUseTestClock(useTestClock: Boolean) = copy(useTestClock = useTestClock) fun setInitialiseSerialization(initialiseSerialization: Boolean) = copy(initialiseSerialization = initialiseSerialization) fun setStartNodesInProcess(startNodesInProcess: Boolean) = copy(startNodesInProcess = startNodesInProcess) + fun setTerminateNodesOnShutdown(terminateNodesOnShutdown: Boolean) = copy(waitForNodesToFinish = terminateNodesOnShutdown) fun setExtraCordappPackagesToScan(extraCordappPackagesToScan: List) = copy(extraCordappPackagesToScan = extraCordappPackagesToScan) fun setNotarySpecs(notarySpecs: List) = copy(notarySpecs = notarySpecs) } @@ -446,6 +445,7 @@ fun genericD systemProperties: Map = defaultParameters.systemProperties, useTestClock: Boolean = defaultParameters.useTestClock, initialiseSerialization: Boolean = defaultParameters.initialiseSerialization, + waitForNodesToFinish: Boolean = defaultParameters.waitForNodesToFinish, startNodesInProcess: Boolean = defaultParameters.startNodesInProcess, notarySpecs: List, extraCordappPackagesToScan: List = defaultParameters.extraCordappPackagesToScan, @@ -462,6 +462,7 @@ fun genericD useTestClock = useTestClock, isDebug = isDebug, startNodesInProcess = startNodesInProcess, + waitForNodesToFinish = waitForNodesToFinish, extraCordappPackagesToScan = extraCordappPackagesToScan, notarySpecs = notarySpecs ) @@ -567,6 +568,7 @@ class DriverDSL( val useTestClock: Boolean, val isDebug: Boolean, val startNodesInProcess: Boolean, + val waitForNodesToFinish: Boolean, extraCordappPackagesToScan: List, val notarySpecs: List ) : DriverDSLInternalInterface { @@ -586,7 +588,7 @@ class DriverDSL( private lateinit var networkParameters: NetworkParametersCopier class State { - val processes = ArrayList>() + val processes = ArrayList() } private val state = ThreadBox(State()) @@ -600,20 +602,12 @@ class DriverDSL( Paths.get(quasarFileUrl.toURI()).toString() } - fun registerProcess(process: CordaFuture) { - shutdownManager.registerProcessShutdown(process) - state.locked { - processes.add(process) - } - } - - override fun waitForAllNodesToFinish() = state.locked { - processes.transpose().get().forEach { - it.waitFor() - } - } - override fun shutdown() { + if (waitForNodesToFinish) { + state.locked { + processes.forEach { it.waitFor() } + } + } _shutdownManager?.shutdown() _executorService?.shutdownNow() } @@ -708,9 +702,10 @@ class DriverDSL( override fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture { val debugPort = if (isDebug) debugPortAllocation.nextPort() else null - val processFuture = DriverDSL.startWebserver(executorService, handle, debugPort, maximumHeapSize) - registerProcess(processFuture) - return processFuture.map { queryWebserver(handle, it) } + val process = DriverDSL.startWebserver(handle, debugPort, maximumHeapSize) + shutdownManager.registerProcessShutdown(process) + val webReadyFuture = addressMustBeBoundFuture(executorService, handle.webAddress, process) + return webReadyFuture.map { queryWebserver(handle, process) } } override fun start() { @@ -892,18 +887,22 @@ class DriverDSL( } } else { val debugPort = if (isDebug) debugPortAllocation.nextPort() else null - val processFuture = startOutOfProcessNode(executorService, configuration, config, quasarJarPath, debugPort, systemProperties, cordappPackages, maximumHeapSize) - registerProcess(processFuture) - return processFuture.flatMap { process -> + val process = startOutOfProcessNode(configuration, config, quasarJarPath, debugPort, systemProperties, cordappPackages, maximumHeapSize) + if (waitForNodesToFinish) { + state.locked { + processes += process + } + } else { + shutdownManager.registerProcessShutdown(process) + } + val p2pReadyFuture = addressMustBeBoundFuture(executorService, configuration.p2pAddress, process) + return p2pReadyFuture.flatMap { val processDeathFuture = poll(executorService, "process death") { if (process.isAlive) null else process } establishRpc(configuration, processDeathFuture).flatMap { rpc -> // Check for all nodes to have all other nodes in background in case RPC is failing over: - val forked = executorService.fork { - allNodesConnected(rpc) - } - val networkMapFuture = forked.flatMap { it } + val networkMapFuture = executorService.fork { allNodesConnected(rpc) }.flatMap { it } firstOf(processDeathFuture, networkMapFuture) { if (it == processDeathFuture) { throw ListenProcessDeathException(configuration.p2pAddress, process) @@ -962,7 +961,6 @@ class DriverDSL( } private fun startOutOfProcessNode( - executorService: ScheduledExecutorService, nodeConf: NodeConfiguration, config: Config, quasarJarPath: String, @@ -970,70 +968,58 @@ class DriverDSL( overriddenSystemProperties: Map, cordappPackages: List, maximumHeapSize: String - ): CordaFuture { - val processFuture = executorService.fork { - log.info("Starting out-of-process Node ${nodeConf.myLegalName.organisation}, debug port is " + (debugPort ?: "not enabled")) - // Write node.conf - writeConfig(nodeConf.baseDirectory, "node.conf", config) + ): Process { + log.info("Starting out-of-process Node ${nodeConf.myLegalName.organisation}, debug port is " + (debugPort ?: "not enabled")) + // Write node.conf + writeConfig(nodeConf.baseDirectory, "node.conf", config) - val systemProperties = overriddenSystemProperties + mapOf( - "name" to nodeConf.myLegalName, - "visualvm.display.name" to "corda-${nodeConf.myLegalName}", - Node.scanPackagesSystemProperty to cordappPackages.joinToString(Node.scanPackagesSeparator), - "java.io.tmpdir" to System.getProperty("java.io.tmpdir") // Inherit from parent process - ) - // See experimental/quasar-hook/README.md for how to generate. - val excludePattern = "x(antlr**;bftsmart**;ch**;co.paralleluniverse**;com.codahale**;com.esotericsoftware**;" + - "com.fasterxml**;com.google**;com.ibm**;com.intellij**;com.jcabi**;com.nhaarman**;com.opengamma**;" + - "com.typesafe**;com.zaxxer**;de.javakaffee**;groovy**;groovyjarjarantlr**;groovyjarjarasm**;io.atomix**;" + - "io.github**;io.netty**;jdk**;joptsimple**;junit**;kotlin**;net.bytebuddy**;net.i2p**;org.apache**;" + - "org.assertj**;org.bouncycastle**;org.codehaus**;org.crsh**;org.dom4j**;org.fusesource**;org.h2**;" + - "org.hamcrest**;org.hibernate**;org.jboss**;org.jcp**;org.joda**;org.junit**;org.mockito**;org.objectweb**;" + - "org.objenesis**;org.slf4j**;org.w3c**;org.xml**;org.yaml**;reflectasm**;rx**)" - val extraJvmArguments = systemProperties.removeResolvedClasspath().map { "-D${it.key}=${it.value}" } + - "-javaagent:$quasarJarPath=$excludePattern" - val loggingLevel = if (debugPort == null) "INFO" else "DEBUG" + val systemProperties = overriddenSystemProperties + mapOf( + "name" to nodeConf.myLegalName, + "visualvm.display.name" to "corda-${nodeConf.myLegalName}", + Node.scanPackagesSystemProperty to cordappPackages.joinToString(Node.scanPackagesSeparator), + "java.io.tmpdir" to System.getProperty("java.io.tmpdir") // Inherit from parent process + ) + // See experimental/quasar-hook/README.md for how to generate. + val excludePattern = "x(antlr**;bftsmart**;ch**;co.paralleluniverse**;com.codahale**;com.esotericsoftware**;" + + "com.fasterxml**;com.google**;com.ibm**;com.intellij**;com.jcabi**;com.nhaarman**;com.opengamma**;" + + "com.typesafe**;com.zaxxer**;de.javakaffee**;groovy**;groovyjarjarantlr**;groovyjarjarasm**;io.atomix**;" + + "io.github**;io.netty**;jdk**;joptsimple**;junit**;kotlin**;net.bytebuddy**;net.i2p**;org.apache**;" + + "org.assertj**;org.bouncycastle**;org.codehaus**;org.crsh**;org.dom4j**;org.fusesource**;org.h2**;" + + "org.hamcrest**;org.hibernate**;org.jboss**;org.jcp**;org.joda**;org.junit**;org.mockito**;org.objectweb**;" + + "org.objenesis**;org.slf4j**;org.w3c**;org.xml**;org.yaml**;reflectasm**;rx**)" + val extraJvmArguments = systemProperties.removeResolvedClasspath().map { "-D${it.key}=${it.value}" } + + "-javaagent:$quasarJarPath=$excludePattern" + val loggingLevel = if (debugPort == null) "INFO" else "DEBUG" - ProcessUtilities.startCordaProcess( - className = "net.corda.node.Corda", // cannot directly get class for this, so just use string - arguments = listOf( - "--base-directory=${nodeConf.baseDirectory}", - "--logging-level=$loggingLevel", - "--no-local-shell" - ), - jdwpPort = debugPort, - extraJvmArguments = extraJvmArguments, - errorLogPath = nodeConf.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "error.log", - workingDirectory = nodeConf.baseDirectory, - maximumHeapSize = maximumHeapSize - ) - } - return processFuture.flatMap { process -> - addressMustBeBoundFuture(executorService, nodeConf.p2pAddress, process).map { process } - } + return ProcessUtilities.startCordaProcess( + className = "net.corda.node.Corda", // cannot directly get class for this, so just use string + arguments = listOf( + "--base-directory=${nodeConf.baseDirectory}", + "--logging-level=$loggingLevel", + "--no-local-shell" + ), + jdwpPort = debugPort, + extraJvmArguments = extraJvmArguments, + errorLogPath = nodeConf.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "error.log", + workingDirectory = nodeConf.baseDirectory, + maximumHeapSize = maximumHeapSize + ) } - private fun startWebserver( - executorService: ScheduledExecutorService, - handle: NodeHandle, - debugPort: Int?, - maximumHeapSize: String - ): CordaFuture { - return executorService.fork { - val className = "net.corda.webserver.WebServer" - ProcessUtilities.startCordaProcess( - className = className, // cannot directly get class for this, so just use string - arguments = listOf("--base-directory", handle.configuration.baseDirectory.toString()), - jdwpPort = debugPort, - extraJvmArguments = listOf( - "-Dname=node-${handle.configuration.p2pAddress}-webserver", - "-Djava.io.tmpdir=${System.getProperty("java.io.tmpdir")}" // Inherit from parent process - ), - errorLogPath = Paths.get("error.$className.log"), - workingDirectory = null, - maximumHeapSize = maximumHeapSize - ) - }.flatMap { process -> addressMustBeBoundFuture(executorService, handle.webAddress, process).map { process } } + private fun startWebserver(handle: NodeHandle, debugPort: Int?, maximumHeapSize: String): Process { + val className = "net.corda.webserver.WebServer" + return ProcessUtilities.startCordaProcess( + className = className, // cannot directly get class for this, so just use string + arguments = listOf("--base-directory", handle.configuration.baseDirectory.toString()), + jdwpPort = debugPort, + extraJvmArguments = listOf( + "-Dname=node-${handle.configuration.p2pAddress}-webserver", + "-Djava.io.tmpdir=${System.getProperty("java.io.tmpdir")}" // Inherit from parent process + ), + errorLogPath = Paths.get("error.$className.log"), + workingDirectory = null, + maximumHeapSize = maximumHeapSize + ) } private fun getCallerPackage(): String { diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/ShutdownManager.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/ShutdownManager.kt index e476585750..9ca4875c4c 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/ShutdownManager.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/ShutdownManager.kt @@ -3,14 +3,12 @@ package net.corda.testing.driver import net.corda.core.concurrent.CordaFuture import net.corda.core.internal.ThreadBox import net.corda.core.internal.concurrent.doneFuture -import net.corda.core.internal.concurrent.map import net.corda.core.utilities.Try import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.loggerFor import net.corda.core.utilities.seconds import java.util.concurrent.ExecutorService import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicInteger @@ -46,6 +44,7 @@ class ShutdownManager(private val executorService: ExecutorService) { registeredShutdowns } } + val shutdowns = shutdownActionFutures.map { Try.on { it.getOrThrow(1.seconds) } } shutdowns.reversed().forEach { when (it) { @@ -63,30 +62,26 @@ class ShutdownManager(private val executorService: ExecutorService) { fun registerShutdown(shutdown: CordaFuture<() -> Unit>) { state.locked { require(!isShutdown) - registeredShutdowns.add(shutdown) + registeredShutdowns += shutdown } } fun registerShutdown(shutdown: () -> Unit) = registerShutdown(doneFuture(shutdown)) - fun registerProcessShutdown(processFuture: CordaFuture) { - val processShutdown = processFuture.map { process -> - { - process.destroy() - /** Wait 5 seconds, then [Process.destroyForcibly] */ - val finishedFuture = executorService.submit { - process.waitFor() - } - try { - finishedFuture.get(5, TimeUnit.SECONDS) - } catch (exception: TimeoutException) { - finishedFuture.cancel(true) - process.destroyForcibly() - } - Unit + fun registerProcessShutdown(process: Process) { + registerShutdown { + process.destroy() + /** Wait 5 seconds, then [Process.destroyForcibly] */ + val finishedFuture = executorService.submit { + process.waitFor() + } + try { + finishedFuture.getOrThrow(5.seconds) + } catch (timeout: TimeoutException) { + finishedFuture.cancel(true) + process.destroyForcibly() } } - registerShutdown(processShutdown) } interface Follower { diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/internal/RPCDriver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/RPCDriver.kt index 4a4fc1476f..7a862e9296 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/internal/RPCDriver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/RPCDriver.kt @@ -7,6 +7,7 @@ import net.corda.client.rpc.internal.RPCClientConfiguration import net.corda.core.concurrent.CordaFuture import net.corda.core.crypto.random63BitValue import net.corda.core.identity.CordaX500Name +import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.map import net.corda.core.internal.div @@ -229,6 +230,7 @@ fun rpcDriver( useTestClock: Boolean = false, initialiseSerialization: Boolean = true, startNodesInProcess: Boolean = false, + waitForNodesToFinish: Boolean = false, extraCordappPackagesToScan: List = emptyList(), notarySpecs: List = emptyList(), dsl: RPCDriverExposedDSLInterface.() -> A @@ -242,6 +244,7 @@ fun rpcDriver( useTestClock = useTestClock, isDebug = isDebug, startNodesInProcess = startNodesInProcess, + waitForNodesToFinish = waitForNodesToFinish, extraCordappPackagesToScan = extraCordappPackagesToScan, notarySpecs = notarySpecs ) @@ -404,11 +407,9 @@ data class RPCDriverDSL( } override fun startRandomRpcClient(rpcOpsClass: Class, rpcAddress: NetworkHostAndPort, username: String, password: String): CordaFuture { - val processFuture = driverDSL.executorService.fork { - ProcessUtilities.startJavaProcess(listOf(rpcOpsClass.name, rpcAddress.toString(), username, password)) - } - driverDSL.shutdownManager.registerProcessShutdown(processFuture) - return processFuture + val process = ProcessUtilities.startJavaProcess(listOf(rpcOpsClass.name, rpcAddress.toString(), username, password)) + driverDSL.shutdownManager.registerProcessShutdown(process) + return doneFuture(process) } override fun startArtemisSession(rpcAddress: NetworkHostAndPort, username: String, password: String): ClientSession { diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/internal/demorun/DemoRunner.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/demorun/DemoRunner.kt index a38a5d78f1..958ac48503 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/internal/demorun/DemoRunner.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/demorun/DemoRunner.kt @@ -16,12 +16,9 @@ fun CordformDefinition.clean() { * Creates and starts all nodes required for the demo. */ // TODO add notaries to cordform! -fun CordformDefinition.runNodes() = driver( - isDebug = true, - driverDirectory = driverDirectory, - portAllocation = PortAllocation.Incremental(10001) -) { - setup(this) - startNodes(nodeConfigurers.map { configurer -> CordformNode().also { configurer.accept(it) } }) - waitForAllNodesToFinish() +fun CordformDefinition.runNodes() { + driver(isDebug = true, driverDirectory = driverDirectory, portAllocation = PortAllocation.Incremental(10001), waitForAllNodesToFinish = true) { + setup(this) + startNodes(nodeConfigurers.map { configurer -> CordformNode().also { configurer.accept(it) } }) + } } diff --git a/tools/explorer/src/main/kotlin/net/corda/explorer/ExplorerSimulation.kt b/tools/explorer/src/main/kotlin/net/corda/explorer/ExplorerSimulation.kt index 3bd939d4ab..1e3d0a2707 100644 --- a/tools/explorer/src/main/kotlin/net/corda/explorer/ExplorerSimulation.kt +++ b/tools/explorer/src/main/kotlin/net/corda/explorer/ExplorerSimulation.kt @@ -64,7 +64,7 @@ class ExplorerSimulation(private val options: OptionSet) { private fun startDemoNodes() { val portAllocation = PortAllocation.Incremental(20000) - driver(portAllocation = portAllocation, extraCordappPackagesToScan = listOf("net.corda.finance")) { + driver(portAllocation = portAllocation, extraCordappPackagesToScan = listOf("net.corda.finance"), waitForAllNodesToFinish = true) { // TODO : Supported flow should be exposed somehow from the node instead of set of ServiceInfo. val alice = startNode(providedName = ALICE.name, rpcUsers = listOf(user)) val bob = startNode(providedName = BOB.name, rpcUsers = listOf(user)) @@ -89,8 +89,6 @@ class ExplorerSimulation(private val options: OptionSet) { options.has("S") -> startNormalSimulation() options.has("F") -> startErrorFlowsSimulation() } - - waitForAllNodesToFinish() } } diff --git a/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt b/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt index f613c0584d..ecacae7cdb 100644 --- a/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt +++ b/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt @@ -5,7 +5,10 @@ import com.typesafe.config.ConfigFactory import net.corda.core.concurrent.CordaFuture import net.corda.core.crypto.random63BitValue import net.corda.core.identity.CordaX500Name -import net.corda.core.internal.concurrent.* +import net.corda.core.internal.concurrent.OpenFuture +import net.corda.core.internal.concurrent.doneFuture +import net.corda.core.internal.concurrent.fork +import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.createDirectories import net.corda.core.internal.div import net.corda.core.serialization.internal.nodeSerializationEnv @@ -79,6 +82,7 @@ fun verifierDriver( systemProperties: Map = emptyMap(), useTestClock: Boolean = false, startNodesInProcess: Boolean = false, + waitForNodesToFinish: Boolean = false, extraCordappPackagesToScan: List = emptyList(), notarySpecs: List = emptyList(), dsl: VerifierExposedDSLInterface.() -> A @@ -92,6 +96,7 @@ fun verifierDriver( useTestClock = useTestClock, isDebug = isDebug, startNodesInProcess = startNodesInProcess, + waitForNodesToFinish = waitForNodesToFinish, extraCordappPackagesToScan = extraCordappPackagesToScan, notarySpecs = notarySpecs ) @@ -254,17 +259,15 @@ data class VerifierDriverDSL( log.info("Starting verifier connecting to address $address") val id = verifierCount.andIncrement val jdwpPort = if (driverDSL.isDebug) driverDSL.debugPortAllocation.nextPort() else null - val processFuture = driverDSL.executorService.fork { - val verifierName = CordaX500Name(organisation = "Verifier$id", locality = "London", country = "GB") - val baseDirectory = (driverDSL.driverDirectory / verifierName.organisation).createDirectories() - val config = createConfiguration(baseDirectory, address) - val configFilename = "verifier.conf" - writeConfig(baseDirectory, configFilename, config) - Verifier.loadConfiguration(baseDirectory, baseDirectory / configFilename).configureDevKeyAndTrustStores(verifierName) - ProcessUtilities.startJavaProcess(listOf(baseDirectory.toString()), jdwpPort = jdwpPort) - } - driverDSL.shutdownManager.registerProcessShutdown(processFuture) - return processFuture.map(::VerifierHandle) + val verifierName = CordaX500Name(organisation = "Verifier$id", locality = "London", country = "GB") + val baseDirectory = (driverDSL.driverDirectory / verifierName.organisation).createDirectories() + val config = createConfiguration(baseDirectory, address) + val configFilename = "verifier.conf" + writeConfig(baseDirectory, configFilename, config) + Verifier.loadConfiguration(baseDirectory, baseDirectory / configFilename).configureDevKeyAndTrustStores(verifierName) + val process = ProcessUtilities.startJavaProcess(listOf(baseDirectory.toString()), jdwpPort = jdwpPort) + driverDSL.shutdownManager.registerProcessShutdown(process) + return doneFuture(VerifierHandle(process)) } private fun NodeHandle.connectToNode(closure: (ClientSession) -> A): A {