On exit, the driver will automaticallly shutdown any nodes which weren't waited for.

The motivation for this came with the recent change that a default notary is started by the driver, which if ignored will leak the notary process.

Also, waitForAllNodesToFinish() has been replaced by a driver parameter.
This commit is contained in:
Shams Asari 2017-11-06 14:24:27 +00:00
parent b0af1c715c
commit 2fe41715cc
17 changed files with 187 additions and 193 deletions

13
.idea/compiler.xml generated
View File

@ -16,6 +16,9 @@
<module name="client_test" target="1.8" />
<module name="confidential-identities_main" target="1.8" />
<module name="confidential-identities_test" target="1.8" />
<module name="corda-core_integrationTest" target="1.8" />
<module name="corda-core_smokeTest" target="1.8" />
<module name="corda-finance_integrationTest" target="1.8" />
<module name="corda-project_main" target="1.8" />
<module name="corda-project_test" target="1.8" />
<module name="corda-webserver_integrationTest" target="1.8" />
@ -43,6 +46,8 @@
<module name="example-code_integrationTest" target="1.8" />
<module name="example-code_main" target="1.8" />
<module name="example-code_test" target="1.8" />
<module name="experimental-kryo-hook_main" target="1.8" />
<module name="experimental-kryo-hook_test" target="1.8" />
<module name="experimental_main" target="1.8" />
<module name="experimental_test" target="1.8" />
<module name="explorer-capsule_main" target="1.6" />
@ -54,8 +59,13 @@
<module name="finance_test" target="1.8" />
<module name="graphs_main" target="1.8" />
<module name="graphs_test" target="1.8" />
<module name="irs-demo-cordapp_integrationTest" target="1.8" />
<module name="irs-demo-cordapp_main" target="1.8" />
<module name="irs-demo-cordapp_main~1" target="1.8" />
<module name="irs-demo-cordapp_test" target="1.8" />
<module name="irs-demo-cordapp_test~1" target="1.8" />
<module name="irs-demo-web_main" target="1.8" />
<module name="irs-demo-web_test" target="1.8" />
<module name="irs-demo_integrationTest" target="1.8" />
<module name="irs-demo_main" target="1.8" />
<module name="irs-demo_test" target="1.8" />
@ -106,6 +116,9 @@
<module name="simm-valuation-demo_test" target="1.8" />
<module name="smoke-test-utils_main" target="1.8" />
<module name="smoke-test-utils_test" target="1.8" />
<module name="source-example-code_integrationTest" target="1.8" />
<module name="source-example-code_main" target="1.8" />
<module name="source-example-code_test" target="1.8" />
<module name="test-common_main" target="1.8" />
<module name="test-common_test" target="1.8" />
<module name="test-utils_integrationTest" target="1.8" />

View File

@ -83,6 +83,8 @@ UNRELEASED
* Replaced node configuration parameter ``certificateSigningService`` with ``compatibilityZoneURL``, which is Corda
compatibility zone network management service's address.
* ``waitForAllNodesToFinish()`` method in ``DriverDSLExposedInterface`` has instead become a parameter on driver creation.
.. _changelog_v1:
Release 1.0

View File

@ -48,7 +48,7 @@ fun main(args: Array<String>) {
startFlow<CashExitFlow>(),
invokeRpc(CordaRPCOps::nodeInfo)
))
driver(driverDirectory = baseDirectory, extraCordappPackagesToScan = listOf("net.corda.finance")) {
driver(driverDirectory = baseDirectory, extraCordappPackagesToScan = listOf("net.corda.finance"), waitForAllNodesToFinish = true) {
val node = startNode(providedName = ALICE.name, rpcUsers = listOf(user)).get()
// END 1
@ -96,7 +96,6 @@ fun main(args: Array<String>) {
graph.display()
}
}
waitForAllNodesToFinish()
// END 5
}
}

View File

@ -283,7 +283,7 @@ IntelliJ
fun main(args: Array<String>) {
// No permissions required as we are not invoking flows.
val user = User("user1", "test", permissions = setOf())
driver(isDebug = true) {
driver(isDebug = true, waitForNodesToFinish = true) {
startNode(getX500Name(O="Controller",L="London",C='GB"), setOf(ServiceInfo(ValidatingNotaryService.type)))
val (nodeA, nodeB, nodeC) = Futures.allAsList(
startNode(getX500Name(O="PartyA",L="London",C="GB"), rpcUsers = listOf(user)),
@ -293,8 +293,6 @@ IntelliJ
startWebserver(nodeA)
startWebserver(nodeB)
startWebserver(nodeC)
waitForAllNodesToFinish()
}
}
@ -505,7 +503,7 @@ Debugging is done via IntelliJ as follows:
fun main(args: Array<String>) {
// No permissions required as we are not invoking flows.
val user = User("user1", "test", permissions = setOf())
driver(isDebug = true) {
driver(isDebug = true, waitForNodesToFinish = true) {
startNode(getX500Name(O="Controller",L="London",C="GB"), setOf(ServiceInfo(ValidatingNotaryService.type)))
val (nodeA, nodeB, nodeC) = Futures.allAsList(
startNode(getX500Name(O="PartyA",L=London,C=GB"), rpcUsers = listOf(user)),
@ -515,8 +513,6 @@ Debugging is done via IntelliJ as follows:
startWebserver(nodeA)
startWebserver(nodeB)
startWebserver(nodeC)
waitForAllNodesToFinish()
}
}

View File

@ -12,9 +12,8 @@ import net.corda.testing.driver.driver
*/
fun main(args: Array<String>) {
val demoUser = listOf(User("demo", "demo", setOf("StartFlow.net.corda.flows.FinalityFlow")))
driver(isDebug = true, driverDirectory = "build" / "attachment-demo-nodes") {
driver(isDebug = true, driverDirectory = "build" / "attachment-demo-nodes", waitForAllNodesToFinish = true) {
startNode(providedName = DUMMY_BANK_A.name, rpcUsers = demoUser)
startNode(providedName = DUMMY_BANK_B.name, rpcUsers = demoUser)
waitForAllNodesToFinish()
}
}

View File

@ -56,7 +56,7 @@ private class BankOfCordaDriver {
try {
when (role) {
Role.ISSUER -> {
driver(isDebug = true, extraCordappPackagesToScan = listOf("net.corda.finance.contracts.asset")) {
driver(isDebug = true, extraCordappPackagesToScan = listOf("net.corda.finance.contracts.asset"), waitForAllNodesToFinish = true) {
val bankUser = User(
BANK_USERNAME,
"test",
@ -76,7 +76,6 @@ private class BankOfCordaDriver {
startFlow<CashConfigDataFlow>()))
startNode(providedName = BIGCORP_LEGAL_NAME, rpcUsers = listOf(bigCorpUser))
startWebserver(bankOfCorda.get())
waitForAllNodesToFinish()
}
}
else -> {

View File

@ -10,7 +10,7 @@ import net.corda.testing.driver.driver
* Do not use in a production environment.
*/
fun main(args: Array<String>) {
driver(useTestClock = true, isDebug = true) {
driver(useTestClock = true, isDebug = true, waitForAllNodesToFinish = true) {
val (nodeA, nodeB) = listOf(
startNode(providedName = DUMMY_BANK_A.name),
startNode(providedName = DUMMY_BANK_B.name)
@ -20,7 +20,5 @@ fun main(args: Array<String>) {
startWebserver(controller)
startWebserver(nodeA)
startWebserver(nodeB)
waitForAllNodesToFinish()
}
}

View File

@ -1,8 +1,6 @@
package net.corda.test.spring
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.map
import net.corda.core.utilities.loggerFor
import net.corda.testing.driver.*
@ -14,7 +12,6 @@ import java.net.ConnectException
import java.net.URL
import java.nio.file.Path
import java.nio.file.Paths
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
interface SpringDriverExposedDSLInterface : DriverDSLExposedInterface {
@ -62,17 +59,17 @@ fun <A> springDriver(
coerce = { it }, dsl = dsl
)
data class SpringBootDriverDSL(
val driverDSL: DriverDSL
) : DriverDSLInternalInterface by driverDSL, SpringDriverInternalDSLInterface {
val log = loggerFor<SpringBootDriverDSL>()
data class SpringBootDriverDSL(private val driverDSL: DriverDSL) : DriverDSLInternalInterface by driverDSL, SpringDriverInternalDSLInterface {
companion object {
val log = loggerFor<SpringBootDriverDSL>()
}
override fun startSpringBootWebapp(clazz: Class<*>, handle: NodeHandle, checkUrl: String): CordaFuture<WebserverHandle> {
val debugPort = if (driverDSL.isDebug) driverDSL.debugPortAllocation.nextPort() else null
val processFuture = startApplication(driverDSL.executorService, handle, debugPort, clazz)
driverDSL.registerProcess(processFuture)
return processFuture.map { queryWebserver(handle, it, checkUrl) }
val process = startApplication(handle, debugPort, clazz)
driverDSL.shutdownManager.registerProcessShutdown(process)
val webReadyFuture = addressMustBeBoundFuture(driverDSL.executorService, handle.webAddress, process)
return webReadyFuture.map { queryWebserver(handle, process, checkUrl) }
}
private fun queryWebserver(handle: NodeHandle, process: Process, checkUrl: String): WebserverHandle {
@ -99,29 +96,27 @@ data class SpringBootDriverDSL(
throw IllegalStateException("Webserver at ${handle.webAddress} has died or was not reachable at URL ${url}")
}
private fun startApplication(executorService: ExecutorService, handle: NodeHandle, debugPort: Int?, clazz: Class<*>): CordaFuture<Process> {
return executorService.fork {
val className = clazz.canonicalName
ProcessUtilities.startJavaProcessImpl(
className = className, // cannot directly get class for this, so just use string
jdwpPort = debugPort,
extraJvmArguments = listOf(
"-Dname=node-${handle.configuration.p2pAddress}-webserver",
"-Djava.io.tmpdir=${System.getProperty("java.io.tmpdir")}"
// Inherit from parent process
),
classpath = ProcessUtilities.defaultClassPath,
workingDirectory = handle.configuration.baseDirectory,
errorLogPath = Paths.get("error.$className.log"),
arguments = listOf(
"--base-directory", handle.configuration.baseDirectory.toString(),
"--server.port=${handle.webAddress.port}",
"--corda.host=${handle.configuration.rpcAddress}",
"--corda.user=${handle.configuration.rpcUsers.first().username}",
"--corda.password=${handle.configuration.rpcUsers.first().password}"
),
maximumHeapSize = null
)
}.flatMap { process -> addressMustBeBoundFuture(driverDSL.executorService, handle.webAddress, process).map { process } }
private fun startApplication(handle: NodeHandle, debugPort: Int?, clazz: Class<*>): Process {
val className = clazz.canonicalName
return ProcessUtilities.startJavaProcessImpl(
className = className, // cannot directly get class for this, so just use string
jdwpPort = debugPort,
extraJvmArguments = listOf(
"-Dname=node-${handle.configuration.p2pAddress}-webserver",
"-Djava.io.tmpdir=${System.getProperty("java.io.tmpdir")}"
// Inherit from parent process
),
classpath = ProcessUtilities.defaultClassPath,
workingDirectory = handle.configuration.baseDirectory,
errorLogPath = Paths.get("error.$className.log"),
arguments = listOf(
"--base-directory", handle.configuration.baseDirectory.toString(),
"--server.port=${handle.webAddress.port}",
"--corda.host=${handle.configuration.rpcAddress}",
"--corda.user=${handle.configuration.rpcUsers.first().username}",
"--corda.password=${handle.configuration.rpcUsers.first().password}"
),
maximumHeapSize = null
)
}
}

View File

@ -12,7 +12,7 @@ import net.corda.testing.driver.driver
* via the web api.
*/
fun main(args: Array<String>) {
driver(isDebug = true) {
driver(isDebug = true, waitForAllNodesToFinish = true) {
val (nodeA, nodeB, nodeC) = listOf(
startNode(providedName = DUMMY_BANK_A.name),
startNode(providedName = DUMMY_BANK_B.name),
@ -22,7 +22,5 @@ fun main(args: Array<String>) {
startWebserver(nodeA)
startWebserver(nodeB)
startWebserver(nodeC)
waitForAllNodesToFinish()
}
}

View File

@ -22,13 +22,12 @@ fun main(args: Array<String>) {
startFlow<SellerFlow>(),
all())
val demoUser = listOf(User("demo", "demo", permissions))
driver(driverDirectory = "build" / "trader-demo-nodes", isDebug = true) {
driver(driverDirectory = "build" / "trader-demo-nodes", isDebug = true, waitForAllNodesToFinish = true) {
val user = User("user1", "test", permissions = setOf(startFlow<CashIssueFlow>(),
startFlow<CommercialPaperIssueFlow>(),
startFlow<SellerFlow>()))
startNode(providedName = DUMMY_BANK_A.name, rpcUsers = demoUser)
startNode(providedName = DUMMY_BANK_B.name, rpcUsers = demoUser)
startNode(providedName = BOC.name, rpcUsers = listOf(user))
waitForAllNodesToFinish()
}
}

View File

@ -7,15 +7,16 @@ import net.corda.core.internal.readLines
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.NodeStartup
import net.corda.testing.DUMMY_BANK_A
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.DUMMY_REGULATOR
import net.corda.testing.ProjectStructure.projectRootDir
import net.corda.testing.node.NotarySpec
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
class DriverTests {
companion object {
private val executorService: ScheduledExecutorService = Executors.newScheduledThreadPool(2)
@ -62,4 +63,19 @@ class DriverTests {
assertThat(debugLinesPresent).isTrue()
}
}
@Test
fun `started node, which is not waited for in the driver, is shutdown when the driver exits`() {
// First check that the process-id file is created by the node on startup, so that we can be sure our check that
// it's deleted on shutdown isn't a false-positive.
driver {
val baseDirectory = defaultNotaryNode.getOrThrow().configuration.baseDirectory
assertThat(baseDirectory / "process-id").exists()
}
val baseDirectory = driver(notarySpecs = listOf(NotarySpec(DUMMY_NOTARY.name))) {
(this as DriverDSL).baseDirectory(DUMMY_NOTARY.name)
}
assertThat(baseDirectory / "process-id").doesNotExist()
}
}

View File

@ -37,7 +37,6 @@ import net.corda.nodeapi.internal.addShutdownHook
import net.corda.testing.*
import net.corda.testing.common.internal.NetworkParametersCopier
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.setGlobalSerialization
import net.corda.testing.internal.ProcessUtilities
import net.corda.testing.node.ClusterSpec
import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO
@ -61,10 +60,8 @@ import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.atomic.AtomicInteger
import kotlin.collections.ArrayList
import kotlin.concurrent.thread
/**
* This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests.
*
@ -172,8 +169,6 @@ interface DriverDSLExposedInterface : CordformContext {
*/
fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture<WebserverHandle>
fun waitForAllNodesToFinish()
/**
* Polls a function until it returns a non-null value. Note that there is no timeout on the polling.
*
@ -338,6 +333,7 @@ fun <A> driver(
useTestClock: Boolean = defaultParameters.useTestClock,
initialiseSerialization: Boolean = defaultParameters.initialiseSerialization,
startNodesInProcess: Boolean = defaultParameters.startNodesInProcess,
waitForAllNodesToFinish: Boolean = defaultParameters.waitForNodesToFinish,
notarySpecs: List<NotarySpec> = defaultParameters.notarySpecs,
extraCordappPackagesToScan: List<String> = defaultParameters.extraCordappPackagesToScan,
dsl: DriverDSLExposedInterface.() -> A
@ -351,6 +347,7 @@ fun <A> driver(
useTestClock = useTestClock,
isDebug = isDebug,
startNodesInProcess = startNodesInProcess,
waitForNodesToFinish = waitForAllNodesToFinish,
notarySpecs = notarySpecs,
extraCordappPackagesToScan = extraCordappPackagesToScan
),
@ -385,6 +382,7 @@ data class DriverParameters(
val useTestClock: Boolean = false,
val initialiseSerialization: Boolean = true,
val startNodesInProcess: Boolean = false,
val waitForNodesToFinish: Boolean = false,
val notarySpecs: List<NotarySpec> = listOf(NotarySpec(DUMMY_NOTARY.name)),
val extraCordappPackagesToScan: List<String> = emptyList()
) {
@ -396,6 +394,7 @@ data class DriverParameters(
fun setUseTestClock(useTestClock: Boolean) = copy(useTestClock = useTestClock)
fun setInitialiseSerialization(initialiseSerialization: Boolean) = copy(initialiseSerialization = initialiseSerialization)
fun setStartNodesInProcess(startNodesInProcess: Boolean) = copy(startNodesInProcess = startNodesInProcess)
fun setTerminateNodesOnShutdown(terminateNodesOnShutdown: Boolean) = copy(waitForNodesToFinish = terminateNodesOnShutdown)
fun setExtraCordappPackagesToScan(extraCordappPackagesToScan: List<String>) = copy(extraCordappPackagesToScan = extraCordappPackagesToScan)
fun setNotarySpecs(notarySpecs: List<NotarySpec>) = copy(notarySpecs = notarySpecs)
}
@ -446,6 +445,7 @@ fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericD
systemProperties: Map<String, String> = defaultParameters.systemProperties,
useTestClock: Boolean = defaultParameters.useTestClock,
initialiseSerialization: Boolean = defaultParameters.initialiseSerialization,
waitForNodesToFinish: Boolean = defaultParameters.waitForNodesToFinish,
startNodesInProcess: Boolean = defaultParameters.startNodesInProcess,
notarySpecs: List<NotarySpec>,
extraCordappPackagesToScan: List<String> = defaultParameters.extraCordappPackagesToScan,
@ -462,6 +462,7 @@ fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericD
useTestClock = useTestClock,
isDebug = isDebug,
startNodesInProcess = startNodesInProcess,
waitForNodesToFinish = waitForNodesToFinish,
extraCordappPackagesToScan = extraCordappPackagesToScan,
notarySpecs = notarySpecs
)
@ -567,6 +568,7 @@ class DriverDSL(
val useTestClock: Boolean,
val isDebug: Boolean,
val startNodesInProcess: Boolean,
val waitForNodesToFinish: Boolean,
extraCordappPackagesToScan: List<String>,
val notarySpecs: List<NotarySpec>
) : DriverDSLInternalInterface {
@ -586,7 +588,7 @@ class DriverDSL(
private lateinit var networkParameters: NetworkParametersCopier
class State {
val processes = ArrayList<CordaFuture<Process>>()
val processes = ArrayList<Process>()
}
private val state = ThreadBox(State())
@ -600,20 +602,12 @@ class DriverDSL(
Paths.get(quasarFileUrl.toURI()).toString()
}
fun registerProcess(process: CordaFuture<Process>) {
shutdownManager.registerProcessShutdown(process)
state.locked {
processes.add(process)
}
}
override fun waitForAllNodesToFinish() = state.locked {
processes.transpose().get().forEach {
it.waitFor()
}
}
override fun shutdown() {
if (waitForNodesToFinish) {
state.locked {
processes.forEach { it.waitFor() }
}
}
_shutdownManager?.shutdown()
_executorService?.shutdownNow()
}
@ -708,9 +702,10 @@ class DriverDSL(
override fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture<WebserverHandle> {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val processFuture = DriverDSL.startWebserver(executorService, handle, debugPort, maximumHeapSize)
registerProcess(processFuture)
return processFuture.map { queryWebserver(handle, it) }
val process = DriverDSL.startWebserver(handle, debugPort, maximumHeapSize)
shutdownManager.registerProcessShutdown(process)
val webReadyFuture = addressMustBeBoundFuture(executorService, handle.webAddress, process)
return webReadyFuture.map { queryWebserver(handle, process) }
}
override fun start() {
@ -892,18 +887,22 @@ class DriverDSL(
}
} else {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val processFuture = startOutOfProcessNode(executorService, configuration, config, quasarJarPath, debugPort, systemProperties, cordappPackages, maximumHeapSize)
registerProcess(processFuture)
return processFuture.flatMap { process ->
val process = startOutOfProcessNode(configuration, config, quasarJarPath, debugPort, systemProperties, cordappPackages, maximumHeapSize)
if (waitForNodesToFinish) {
state.locked {
processes += process
}
} else {
shutdownManager.registerProcessShutdown(process)
}
val p2pReadyFuture = addressMustBeBoundFuture(executorService, configuration.p2pAddress, process)
return p2pReadyFuture.flatMap {
val processDeathFuture = poll(executorService, "process death") {
if (process.isAlive) null else process
}
establishRpc(configuration, processDeathFuture).flatMap { rpc ->
// Check for all nodes to have all other nodes in background in case RPC is failing over:
val forked = executorService.fork {
allNodesConnected(rpc)
}
val networkMapFuture = forked.flatMap { it }
val networkMapFuture = executorService.fork { allNodesConnected(rpc) }.flatMap { it }
firstOf(processDeathFuture, networkMapFuture) {
if (it == processDeathFuture) {
throw ListenProcessDeathException(configuration.p2pAddress, process)
@ -962,7 +961,6 @@ class DriverDSL(
}
private fun startOutOfProcessNode(
executorService: ScheduledExecutorService,
nodeConf: NodeConfiguration,
config: Config,
quasarJarPath: String,
@ -970,70 +968,58 @@ class DriverDSL(
overriddenSystemProperties: Map<String, String>,
cordappPackages: List<String>,
maximumHeapSize: String
): CordaFuture<Process> {
val processFuture = executorService.fork {
log.info("Starting out-of-process Node ${nodeConf.myLegalName.organisation}, debug port is " + (debugPort ?: "not enabled"))
// Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config)
): Process {
log.info("Starting out-of-process Node ${nodeConf.myLegalName.organisation}, debug port is " + (debugPort ?: "not enabled"))
// Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config)
val systemProperties = overriddenSystemProperties + mapOf(
"name" to nodeConf.myLegalName,
"visualvm.display.name" to "corda-${nodeConf.myLegalName}",
Node.scanPackagesSystemProperty to cordappPackages.joinToString(Node.scanPackagesSeparator),
"java.io.tmpdir" to System.getProperty("java.io.tmpdir") // Inherit from parent process
)
// See experimental/quasar-hook/README.md for how to generate.
val excludePattern = "x(antlr**;bftsmart**;ch**;co.paralleluniverse**;com.codahale**;com.esotericsoftware**;" +
"com.fasterxml**;com.google**;com.ibm**;com.intellij**;com.jcabi**;com.nhaarman**;com.opengamma**;" +
"com.typesafe**;com.zaxxer**;de.javakaffee**;groovy**;groovyjarjarantlr**;groovyjarjarasm**;io.atomix**;" +
"io.github**;io.netty**;jdk**;joptsimple**;junit**;kotlin**;net.bytebuddy**;net.i2p**;org.apache**;" +
"org.assertj**;org.bouncycastle**;org.codehaus**;org.crsh**;org.dom4j**;org.fusesource**;org.h2**;" +
"org.hamcrest**;org.hibernate**;org.jboss**;org.jcp**;org.joda**;org.junit**;org.mockito**;org.objectweb**;" +
"org.objenesis**;org.slf4j**;org.w3c**;org.xml**;org.yaml**;reflectasm**;rx**)"
val extraJvmArguments = systemProperties.removeResolvedClasspath().map { "-D${it.key}=${it.value}" } +
"-javaagent:$quasarJarPath=$excludePattern"
val loggingLevel = if (debugPort == null) "INFO" else "DEBUG"
val systemProperties = overriddenSystemProperties + mapOf(
"name" to nodeConf.myLegalName,
"visualvm.display.name" to "corda-${nodeConf.myLegalName}",
Node.scanPackagesSystemProperty to cordappPackages.joinToString(Node.scanPackagesSeparator),
"java.io.tmpdir" to System.getProperty("java.io.tmpdir") // Inherit from parent process
)
// See experimental/quasar-hook/README.md for how to generate.
val excludePattern = "x(antlr**;bftsmart**;ch**;co.paralleluniverse**;com.codahale**;com.esotericsoftware**;" +
"com.fasterxml**;com.google**;com.ibm**;com.intellij**;com.jcabi**;com.nhaarman**;com.opengamma**;" +
"com.typesafe**;com.zaxxer**;de.javakaffee**;groovy**;groovyjarjarantlr**;groovyjarjarasm**;io.atomix**;" +
"io.github**;io.netty**;jdk**;joptsimple**;junit**;kotlin**;net.bytebuddy**;net.i2p**;org.apache**;" +
"org.assertj**;org.bouncycastle**;org.codehaus**;org.crsh**;org.dom4j**;org.fusesource**;org.h2**;" +
"org.hamcrest**;org.hibernate**;org.jboss**;org.jcp**;org.joda**;org.junit**;org.mockito**;org.objectweb**;" +
"org.objenesis**;org.slf4j**;org.w3c**;org.xml**;org.yaml**;reflectasm**;rx**)"
val extraJvmArguments = systemProperties.removeResolvedClasspath().map { "-D${it.key}=${it.value}" } +
"-javaagent:$quasarJarPath=$excludePattern"
val loggingLevel = if (debugPort == null) "INFO" else "DEBUG"
ProcessUtilities.startCordaProcess(
className = "net.corda.node.Corda", // cannot directly get class for this, so just use string
arguments = listOf(
"--base-directory=${nodeConf.baseDirectory}",
"--logging-level=$loggingLevel",
"--no-local-shell"
),
jdwpPort = debugPort,
extraJvmArguments = extraJvmArguments,
errorLogPath = nodeConf.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "error.log",
workingDirectory = nodeConf.baseDirectory,
maximumHeapSize = maximumHeapSize
)
}
return processFuture.flatMap { process ->
addressMustBeBoundFuture(executorService, nodeConf.p2pAddress, process).map { process }
}
return ProcessUtilities.startCordaProcess(
className = "net.corda.node.Corda", // cannot directly get class for this, so just use string
arguments = listOf(
"--base-directory=${nodeConf.baseDirectory}",
"--logging-level=$loggingLevel",
"--no-local-shell"
),
jdwpPort = debugPort,
extraJvmArguments = extraJvmArguments,
errorLogPath = nodeConf.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "error.log",
workingDirectory = nodeConf.baseDirectory,
maximumHeapSize = maximumHeapSize
)
}
private fun startWebserver(
executorService: ScheduledExecutorService,
handle: NodeHandle,
debugPort: Int?,
maximumHeapSize: String
): CordaFuture<Process> {
return executorService.fork {
val className = "net.corda.webserver.WebServer"
ProcessUtilities.startCordaProcess(
className = className, // cannot directly get class for this, so just use string
arguments = listOf("--base-directory", handle.configuration.baseDirectory.toString()),
jdwpPort = debugPort,
extraJvmArguments = listOf(
"-Dname=node-${handle.configuration.p2pAddress}-webserver",
"-Djava.io.tmpdir=${System.getProperty("java.io.tmpdir")}" // Inherit from parent process
),
errorLogPath = Paths.get("error.$className.log"),
workingDirectory = null,
maximumHeapSize = maximumHeapSize
)
}.flatMap { process -> addressMustBeBoundFuture(executorService, handle.webAddress, process).map { process } }
private fun startWebserver(handle: NodeHandle, debugPort: Int?, maximumHeapSize: String): Process {
val className = "net.corda.webserver.WebServer"
return ProcessUtilities.startCordaProcess(
className = className, // cannot directly get class for this, so just use string
arguments = listOf("--base-directory", handle.configuration.baseDirectory.toString()),
jdwpPort = debugPort,
extraJvmArguments = listOf(
"-Dname=node-${handle.configuration.p2pAddress}-webserver",
"-Djava.io.tmpdir=${System.getProperty("java.io.tmpdir")}" // Inherit from parent process
),
errorLogPath = Paths.get("error.$className.log"),
workingDirectory = null,
maximumHeapSize = maximumHeapSize
)
}
private fun getCallerPackage(): String {

View File

@ -3,14 +3,12 @@ package net.corda.testing.driver
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.map
import net.corda.core.utilities.Try
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.seconds
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
@ -46,6 +44,7 @@ class ShutdownManager(private val executorService: ExecutorService) {
registeredShutdowns
}
}
val shutdowns = shutdownActionFutures.map { Try.on { it.getOrThrow(1.seconds) } }
shutdowns.reversed().forEach {
when (it) {
@ -63,30 +62,26 @@ class ShutdownManager(private val executorService: ExecutorService) {
fun registerShutdown(shutdown: CordaFuture<() -> Unit>) {
state.locked {
require(!isShutdown)
registeredShutdowns.add(shutdown)
registeredShutdowns += shutdown
}
}
fun registerShutdown(shutdown: () -> Unit) = registerShutdown(doneFuture(shutdown))
fun registerProcessShutdown(processFuture: CordaFuture<Process>) {
val processShutdown = processFuture.map { process ->
{
process.destroy()
/** Wait 5 seconds, then [Process.destroyForcibly] */
val finishedFuture = executorService.submit {
process.waitFor()
}
try {
finishedFuture.get(5, TimeUnit.SECONDS)
} catch (exception: TimeoutException) {
finishedFuture.cancel(true)
process.destroyForcibly()
}
Unit
fun registerProcessShutdown(process: Process) {
registerShutdown {
process.destroy()
/** Wait 5 seconds, then [Process.destroyForcibly] */
val finishedFuture = executorService.submit {
process.waitFor()
}
try {
finishedFuture.getOrThrow(5.seconds)
} catch (timeout: TimeoutException) {
finishedFuture.cancel(true)
process.destroyForcibly()
}
}
registerShutdown(processShutdown)
}
interface Follower {

View File

@ -7,6 +7,7 @@ import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.div
@ -229,6 +230,7 @@ fun <A> rpcDriver(
useTestClock: Boolean = false,
initialiseSerialization: Boolean = true,
startNodesInProcess: Boolean = false,
waitForNodesToFinish: Boolean = false,
extraCordappPackagesToScan: List<String> = emptyList(),
notarySpecs: List<NotarySpec> = emptyList(),
dsl: RPCDriverExposedDSLInterface.() -> A
@ -242,6 +244,7 @@ fun <A> rpcDriver(
useTestClock = useTestClock,
isDebug = isDebug,
startNodesInProcess = startNodesInProcess,
waitForNodesToFinish = waitForNodesToFinish,
extraCordappPackagesToScan = extraCordappPackagesToScan,
notarySpecs = notarySpecs
)
@ -404,11 +407,9 @@ data class RPCDriverDSL(
}
override fun <I : RPCOps> startRandomRpcClient(rpcOpsClass: Class<I>, rpcAddress: NetworkHostAndPort, username: String, password: String): CordaFuture<Process> {
val processFuture = driverDSL.executorService.fork {
ProcessUtilities.startJavaProcess<RandomRpcUser>(listOf(rpcOpsClass.name, rpcAddress.toString(), username, password))
}
driverDSL.shutdownManager.registerProcessShutdown(processFuture)
return processFuture
val process = ProcessUtilities.startJavaProcess<RandomRpcUser>(listOf(rpcOpsClass.name, rpcAddress.toString(), username, password))
driverDSL.shutdownManager.registerProcessShutdown(process)
return doneFuture(process)
}
override fun startArtemisSession(rpcAddress: NetworkHostAndPort, username: String, password: String): ClientSession {

View File

@ -16,12 +16,9 @@ fun CordformDefinition.clean() {
* Creates and starts all nodes required for the demo.
*/
// TODO add notaries to cordform!
fun CordformDefinition.runNodes() = driver(
isDebug = true,
driverDirectory = driverDirectory,
portAllocation = PortAllocation.Incremental(10001)
) {
setup(this)
startNodes(nodeConfigurers.map { configurer -> CordformNode().also { configurer.accept(it) } })
waitForAllNodesToFinish()
fun CordformDefinition.runNodes() {
driver(isDebug = true, driverDirectory = driverDirectory, portAllocation = PortAllocation.Incremental(10001), waitForAllNodesToFinish = true) {
setup(this)
startNodes(nodeConfigurers.map { configurer -> CordformNode().also { configurer.accept(it) } })
}
}

View File

@ -64,7 +64,7 @@ class ExplorerSimulation(private val options: OptionSet) {
private fun startDemoNodes() {
val portAllocation = PortAllocation.Incremental(20000)
driver(portAllocation = portAllocation, extraCordappPackagesToScan = listOf("net.corda.finance")) {
driver(portAllocation = portAllocation, extraCordappPackagesToScan = listOf("net.corda.finance"), waitForAllNodesToFinish = true) {
// TODO : Supported flow should be exposed somehow from the node instead of set of ServiceInfo.
val alice = startNode(providedName = ALICE.name, rpcUsers = listOf(user))
val bob = startNode(providedName = BOB.name, rpcUsers = listOf(user))
@ -89,8 +89,6 @@ class ExplorerSimulation(private val options: OptionSet) {
options.has("S") -> startNormalSimulation()
options.has("F") -> startErrorFlowsSimulation()
}
waitForAllNodesToFinish()
}
}

View File

@ -5,7 +5,10 @@ import com.typesafe.config.ConfigFactory
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.concurrent.*
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.serialization.internal.nodeSerializationEnv
@ -79,6 +82,7 @@ fun <A> verifierDriver(
systemProperties: Map<String, String> = emptyMap(),
useTestClock: Boolean = false,
startNodesInProcess: Boolean = false,
waitForNodesToFinish: Boolean = false,
extraCordappPackagesToScan: List<String> = emptyList(),
notarySpecs: List<NotarySpec> = emptyList(),
dsl: VerifierExposedDSLInterface.() -> A
@ -92,6 +96,7 @@ fun <A> verifierDriver(
useTestClock = useTestClock,
isDebug = isDebug,
startNodesInProcess = startNodesInProcess,
waitForNodesToFinish = waitForNodesToFinish,
extraCordappPackagesToScan = extraCordappPackagesToScan,
notarySpecs = notarySpecs
)
@ -254,17 +259,15 @@ data class VerifierDriverDSL(
log.info("Starting verifier connecting to address $address")
val id = verifierCount.andIncrement
val jdwpPort = if (driverDSL.isDebug) driverDSL.debugPortAllocation.nextPort() else null
val processFuture = driverDSL.executorService.fork {
val verifierName = CordaX500Name(organisation = "Verifier$id", locality = "London", country = "GB")
val baseDirectory = (driverDSL.driverDirectory / verifierName.organisation).createDirectories()
val config = createConfiguration(baseDirectory, address)
val configFilename = "verifier.conf"
writeConfig(baseDirectory, configFilename, config)
Verifier.loadConfiguration(baseDirectory, baseDirectory / configFilename).configureDevKeyAndTrustStores(verifierName)
ProcessUtilities.startJavaProcess<Verifier>(listOf(baseDirectory.toString()), jdwpPort = jdwpPort)
}
driverDSL.shutdownManager.registerProcessShutdown(processFuture)
return processFuture.map(::VerifierHandle)
val verifierName = CordaX500Name(organisation = "Verifier$id", locality = "London", country = "GB")
val baseDirectory = (driverDSL.driverDirectory / verifierName.organisation).createDirectories()
val config = createConfiguration(baseDirectory, address)
val configFilename = "verifier.conf"
writeConfig(baseDirectory, configFilename, config)
Verifier.loadConfiguration(baseDirectory, baseDirectory / configFilename).configureDevKeyAndTrustStores(verifierName)
val process = ProcessUtilities.startJavaProcess<Verifier>(listOf(baseDirectory.toString()), jdwpPort = jdwpPort)
driverDSL.shutdownManager.registerProcessShutdown(process)
return doneFuture(VerifierHandle(process))
}
private fun <A> NodeHandle.connectToNode(closure: (ClientSession) -> A): A {