From f88542faa21e067f6656403cad89630f476e9a84 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Wed, 11 Apr 2018 15:33:55 +0100 Subject: [PATCH] CORDA-1095: Fixed rare race where the startNode future completes before the default notary is visible (#2947) --- .../corda/finance/flows/CashSelectionTest.kt | 10 +- .../FlowsDrainingModeContentionTest.kt | 4 - .../persistence/NodeStatePersistenceTests.kt | 4 - .../registration/NodeRegistrationTest.kt | 9 +- .../services/schema/NodeSchemaServiceTest.kt | 4 +- .../net/corda/traderdemo/TraderDemoTest.kt | 1 - .../net/corda/testing/driver/DriverTests.kt | 58 +++--- .../net/corda/testing/driver/DriverDSL.kt | 3 +- .../testing/node/internal/DriverDSLImpl.kt | 174 +++++++++--------- 9 files changed, 130 insertions(+), 137 deletions(-) diff --git a/finance/src/integration-test/kotlin/net/corda/finance/flows/CashSelectionTest.kt b/finance/src/integration-test/kotlin/net/corda/finance/flows/CashSelectionTest.kt index 948bd80f22..14a6531d25 100644 --- a/finance/src/integration-test/kotlin/net/corda/finance/flows/CashSelectionTest.kt +++ b/finance/src/integration-test/kotlin/net/corda/finance/flows/CashSelectionTest.kt @@ -1,13 +1,10 @@ package net.corda.finance.flows -import net.corda.core.internal.packageName import net.corda.core.messaging.startFlow import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.getOrThrow import net.corda.finance.DOLLARS -import net.corda.finance.contracts.asset.Cash import net.corda.finance.contracts.getCashBalance -import net.corda.finance.schemas.CashSchemaV1 import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.driver import net.corda.testing.driver.internal.InProcessImpl @@ -17,11 +14,8 @@ import org.junit.Test class CashSelectionTest { @Test - fun unconsumed_cash_states() { - - driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf(Cash::class, CashSchemaV1::class).map { it.packageName })) { - - defaultNotaryNode.getOrThrow() + fun `unconsumed cash states`() { + driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) { val node = startNode().getOrThrow() as InProcessImpl val issuerRef = OpaqueBytes.of(0) val issuedAmount = 1000.DOLLARS diff --git a/node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt b/node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt index 27e542d5a8..7a3ed586cd 100644 --- a/node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt @@ -52,14 +52,10 @@ class FlowsDrainingModeContentionTest { @Test fun `draining mode does not deadlock with acks between 2 nodes`() { - val message = "Ground control to Major Tom" - driver(DriverParameters(isDebug = true, startNodesInProcess = true, portAllocation = portAllocation, extraCordappPackagesToScan = listOf(MessageState::class.packageName))) { - val nodeA = startNode(providedName = ALICE_NAME, rpcUsers = users).getOrThrow() val nodeB = startNode(providedName = BOB_NAME, rpcUsers = users).getOrThrow() - defaultNotaryNode.getOrThrow() val nodeARpcInfo = RpcInfo(nodeA.rpcAddress, user.username, user.password) val flow = nodeA.rpc.startFlow(::ProposeTransactionAndWaitForCommit, message, nodeARpcInfo, nodeB.nodeInfo.singleIdentity(), defaultNotaryIdentity) diff --git a/node/src/integration-test/kotlin/net/corda/node/persistence/NodeStatePersistenceTests.kt b/node/src/integration-test/kotlin/net/corda/node/persistence/NodeStatePersistenceTests.kt index 7f7153d94c..27b2f22f78 100644 --- a/node/src/integration-test/kotlin/net/corda/node/persistence/NodeStatePersistenceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/persistence/NodeStatePersistenceTests.kt @@ -42,8 +42,6 @@ class NodeStatePersistenceTests { val nodeName = { val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow() val nodeName = nodeHandle.nodeInfo.singleIdentity().name - // Ensure the notary node has finished starting up, before starting a flow that needs a notary - defaultNotaryNode.getOrThrow() CordaRPCClient(nodeHandle.rpcAddress).start(user.username, user.password).use { it.proxy.startFlow(::SendMessageFlow, message, defaultNotaryIdentity).returnValue.getOrThrow() } @@ -76,8 +74,6 @@ class NodeStatePersistenceTests { val nodeName = { val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow() val nodeName = nodeHandle.nodeInfo.singleIdentity().name - // Ensure the notary node has finished starting up, before starting a flow that needs a notary - defaultNotaryNode.getOrThrow() CordaRPCClient(nodeHandle.rpcAddress).start(user.username, user.password).use { it.proxy.startFlow(::SendMessageFlow, message, defaultNotaryIdentity).returnValue.getOrThrow() } diff --git a/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt b/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt index 685b47c057..0bfaa223a1 100644 --- a/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt @@ -90,16 +90,11 @@ class NodeRegistrationTest { notarySpecs = listOf(NotarySpec(notaryName)), extraCordappPackagesToScan = listOf("net.corda.finance") ) { - val nodes = listOf( + val (alice, genevieve) = listOf( startNode(providedName = aliceName), - startNode(providedName = genevieveName), - defaultNotaryNode + startNode(providedName = genevieveName) ).transpose().getOrThrow() - log.info("Nodes started") - - val (alice, genevieve) = nodes - assertThat(registrationHandler.idsPolled).containsOnly( aliceName.organisation, genevieveName.organisation, diff --git a/node/src/test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt index 2734ed17b0..b4536dd550 100644 --- a/node/src/test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt @@ -107,8 +107,8 @@ class NodeSchemaServiceTest { @Test fun `check node runs inclusive of notary node schema set using driverDSL`() { driver(DriverParameters(startNodesInProcess = true)) { - val notaryNode = defaultNotaryNode.getOrThrow().rpc.startFlow(::MappedSchemasFlow) - val mappedSchemas = notaryNode.returnValue.getOrThrow() + val notary = defaultNotaryNode.getOrThrow() + val mappedSchemas = notary.rpc.startFlow(::MappedSchemasFlow).returnValue.getOrThrow() // check against NodeCore + NodeNotary Schemas assertTrue(mappedSchemas.contains(NodeCoreV1.name)) assertTrue(mappedSchemas.contains(NodeNotaryV1.name)) diff --git a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt index c0d3844960..938e4243f5 100644 --- a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt +++ b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt @@ -34,7 +34,6 @@ class TraderDemoTest { startFlow(), all())) driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) { - defaultNotaryNode.getOrThrow() val (nodeA, nodeB, bankNode) = listOf( startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)), startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)), diff --git a/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt b/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt index 7d23099fa6..dd8a94dc4b 100644 --- a/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt +++ b/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt @@ -11,15 +11,13 @@ import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.getOrThrow import net.corda.node.internal.NodeStartup import net.corda.testing.common.internal.ProjectStructure.projectRootDir -import net.corda.testing.node.internal.addressMustBeBound -import net.corda.testing.node.internal.addressMustNotBeBound -import net.corda.testing.node.internal.internalDriver -import net.corda.testing.core.DUMMY_BANK_A_NAME -import net.corda.testing.core.DUMMY_BANK_B_NAME -import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.core.* import net.corda.testing.driver.internal.RandomFree import net.corda.testing.http.HttpApi import net.corda.testing.node.NotarySpec +import net.corda.testing.node.internal.addressMustBeBound +import net.corda.testing.node.internal.addressMustNotBeBound +import net.corda.testing.node.internal.internalDriver import org.assertj.core.api.Assertions.* import org.json.simple.JSONObject import org.junit.Test @@ -70,9 +68,20 @@ class DriverTests { } } + @Test + fun `default notary is visible when the startNode future completes`() { + // Based on local testing, running this 3 times gives us a high confidence that we'll spot if the feature is not working + repeat(3) { + driver(DriverParameters(startNodesInProcess = true)) { + val bob = startNode(providedName = BOB_NAME).getOrThrow() + assertThat(bob.rpc.networkMapSnapshot().flatMap { it.legalIdentities }).contains(defaultNotaryIdentity) + } + } + } + @Test fun `random free port allocation`() { - val nodeHandle = driver(DriverParameters(portAllocation = RandomFree)) { + val nodeHandle = driver(DriverParameters(portAllocation = RandomFree, notarySpecs = emptyList())) { val nodeInfo = startNode(providedName = DUMMY_BANK_A_NAME) nodeMustBeUp(nodeInfo) } @@ -84,7 +93,11 @@ class DriverTests { // Make sure we're using the log4j2 config which writes to the log file val logConfigFile = projectRootDir / "config" / "dev" / "log4j2.xml" assertThat(logConfigFile).isRegularFile() - driver(DriverParameters(isDebug = true, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) { + driver(DriverParameters( + isDebug = true, + notarySpecs = emptyList(), + systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()) + )) { val baseDirectory = startNode(providedName = DUMMY_BANK_A_NAME).getOrThrow().baseDirectory val logFile = (baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).list { it.sorted().findFirst().get() } val debugLinesPresent = logFile.readLines { lines -> lines.anyMatch { line -> line.startsWith("[DEBUG]") } } @@ -94,7 +107,7 @@ class DriverTests { @Test fun `monitoring mode enables jolokia exporting of JMX metrics via HTTP JSON`() { - driver(DriverParameters(startNodesInProcess = false)) { + driver(DriverParameters(startNodesInProcess = false, notarySpecs = emptyList())) { // start another node so we gain access to node JMX metrics val webAddress = NetworkHostAndPort("localhost", 7006) startNode(providedName = DUMMY_REGULATOR_NAME, @@ -123,33 +136,32 @@ class DriverTests { @Test fun `driver rejects multiple nodes with the same name`() { - - driver(DriverParameters(startNodesInProcess = true)) { - - assertThatThrownBy { listOf(newNode(DUMMY_BANK_A_NAME)(), newNode(DUMMY_BANK_B_NAME)(), newNode(DUMMY_BANK_A_NAME)()).transpose().getOrThrow() }.isInstanceOf(IllegalArgumentException::class.java) + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { + assertThatThrownBy { + listOf( + newNode(DUMMY_BANK_A_NAME)(), + newNode(DUMMY_BANK_B_NAME)(), + newNode(DUMMY_BANK_A_NAME)() + ).transpose().getOrThrow() + }.isInstanceOf(IllegalArgumentException::class.java) } } @Test fun `driver rejects multiple nodes with the same name parallel`() { - - driver(DriverParameters(startNodesInProcess = true)) { - + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { val nodes = listOf(newNode(DUMMY_BANK_A_NAME), newNode(DUMMY_BANK_B_NAME), newNode(DUMMY_BANK_A_NAME)) - - assertThatThrownBy { nodes.parallelStream().map { it.invoke() }.toList().transpose().getOrThrow() }.isInstanceOf(IllegalArgumentException::class.java) + assertThatThrownBy { + nodes.parallelStream().map { it.invoke() }.toList().transpose().getOrThrow() + }.isInstanceOf(IllegalArgumentException::class.java) } } @Test fun `driver allows reusing names of nodes that have been stopped`() { - - driver(DriverParameters(startNodesInProcess = true)) { - + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { val nodeA = newNode(DUMMY_BANK_A_NAME)().getOrThrow() - nodeA.stop() - assertThatCode { newNode(DUMMY_BANK_A_NAME)().getOrThrow() }.doesNotThrowAnyException() } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt index 71c6db96b1..6f8e3c6dbf 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt @@ -68,7 +68,8 @@ interface DriverDSL { * @param maximumHeapSize The maximum JVM heap size to use for the node as a [String]. By default a number is interpreted * as being in bytes. Append the letter 'k' or 'K' to the value to indicate Kilobytes, 'm' or 'M' to indicate * megabytes, and 'g' or 'G' to indicate gigabytes. The default value is "512m" = 512 megabytes. - * @return A [CordaFuture] on the [NodeHandle] to the node. The future will complete when the node is available. + * @return A [CordaFuture] on the [NodeHandle] to the node. The future will complete when the node is available and + * it sees all previously started nodes, including the notaries. */ fun startNode( defaultParameters: NodeParameters = NodeParameters(), diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt index 06497dfc54..08f7028bb3 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt @@ -19,14 +19,12 @@ import net.corda.core.messaging.CordaRPCOps import net.corda.core.node.NetworkParameters import net.corda.core.node.NotaryInfo import net.corda.core.node.services.NetworkMapCache -import net.corda.core.toFuture import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.millis import net.corda.node.NodeRegistrationOption import net.corda.node.internal.Node -import net.corda.node.internal.NodeStartup import net.corda.node.internal.StartedNode import net.corda.node.services.Permissions import net.corda.node.services.config.* @@ -57,8 +55,7 @@ import net.corda.testing.node.internal.DriverDSLImpl.ClusterType.NON_VALIDATING_ import net.corda.testing.node.internal.DriverDSLImpl.ClusterType.VALIDATING_RAFT import okhttp3.OkHttpClient import okhttp3.Request -import rx.Observable -import rx.observables.ConnectableObservable +import rx.Subscription import rx.schedulers.Schedulers import java.lang.management.ManagementFactory import java.net.ConnectException @@ -73,11 +70,10 @@ import java.time.Instant import java.time.ZoneOffset.UTC import java.time.format.DateTimeFormatter import java.util.* -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicInteger +import kotlin.collections.HashMap import kotlin.concurrent.thread import net.corda.nodeapi.internal.config.User as InternalUser @@ -102,12 +98,11 @@ class DriverDSLImpl( override val shutdownManager get() = _shutdownManager!! private val cordappPackages = extraCordappPackagesToScan + getCallerPackage() // Map from a nodes legal name to an observable emitting the number of nodes in its network map. - private val countObservables = ConcurrentHashMap>() - private val nodeNames = mutableSetOf() + private val networkVisibilityController = NetworkVisibilityController() /** - * Future which completes when the network map is available, whether a local one or one from the CZ. This future acts - * as a gate to prevent nodes from starting too early. The value of the future is a [LocalNetworkMap] object, which - * is null if the network map is being provided by the CZ. + * Future which completes when the network map infrastructure is available, whether a local one or one from the CZ. + * This future acts as a gate to prevent nodes from starting too early. The value of the future is a [LocalNetworkMap] + * object, which is null if the network map is being provided by the CZ. */ private lateinit var networkMapAvailability: CordaFuture private lateinit var _notaries: CordaFuture> @@ -120,13 +115,9 @@ class DriverDSLImpl( private val state = ThreadBox(State()) //TODO: remove this once we can bundle quasar properly. - private val quasarJarPath: String by lazy { - resolveJar(".*quasar.*\\.jar$") - } + private val quasarJarPath: String by lazy { resolveJar(".*quasar.*\\.jar$") } - private val jolokiaJarPath: String by lazy { - resolveJar(".*jolokia-jvm-.*-agent\\.jar$") - } + private val jolokiaJarPath: String by lazy { resolveJar(".*jolokia-jvm-.*-agent\\.jar$") } private fun resolveJar(jarNamePattern: String): String { return try { @@ -189,12 +180,7 @@ class DriverDSLImpl( val p2pAddress = portAllocation.nextHostAndPort() // TODO: Derive name from the full picked name, don't just wrap the common name val name = providedName ?: CordaX500Name("${oneOf(names).organisation}-${p2pAddress.port}", "London", "GB") - synchronized(nodeNames) { - val wasANewNode = nodeNames.add(name) - if (!wasANewNode) { - throw IllegalArgumentException("Node with name $name is already started or starting.") - } - } + val registrationFuture = if (compatibilityZone?.rootCert != null) { // We don't need the network map to be available to be able to register the node startNodeRegistration(name, compatibilityZone.rootCert, compatibilityZone.url) @@ -262,14 +248,20 @@ class DriverDSLImpl( return if (startNodesInProcess) { executorService.fork { - NetworkRegistrationHelper(config.corda, HTTPNetworkRegistrationService(compatibilityZoneURL), NodeRegistrationOption(rootTruststorePath, rootTruststorePassword)).buildKeystore() + NetworkRegistrationHelper( + config.corda, + HTTPNetworkRegistrationService(compatibilityZoneURL), + NodeRegistrationOption(rootTruststorePath, rootTruststorePassword) + ).buildKeystore() config } } else { - startOutOfProcessMiniNode(config, + startOutOfProcessMiniNode( + config, "--initial-registration", "--network-root-truststore=${rootTruststorePath.toAbsolutePath()}", - "--network-root-truststore-password=$rootTruststorePassword").map { config } + "--network-root-truststore-password=$rootTruststorePassword" + ).map { config } } } @@ -575,54 +567,6 @@ class DriverDSLImpl( return driverDirectory / nodeDirectoryName } - /** - * @nodeName the name of the node which performs counting - * @param initial number of nodes currently in the network map of a running node. - * @param networkMapCacheChangeObservable an observable returning the updates to the node network map. - * @return a [ConnectableObservable] which emits a new [Int] every time the number of registered nodes changes - * the initial value emitted is always [initial] - */ - private fun nodeCountObservable(nodeName: CordaX500Name, initial: Int, networkMapCacheChangeObservable: Observable): - ConnectableObservable { - val count = AtomicInteger(initial) - return networkMapCacheChangeObservable.map { - log.debug("nodeCountObservable for '$nodeName' received '$it'") - when (it) { - is NetworkMapCache.MapChange.Added -> count.incrementAndGet() - is NetworkMapCache.MapChange.Removed -> count.decrementAndGet() - is NetworkMapCache.MapChange.Modified -> count.get() - } - }.startWith(initial).replay() - } - - /** - * @param rpc the [CordaRPCOps] of a newly started node. - * @return a [CordaFuture] which resolves when every node started by driver has in its network map a number of nodes - * equal to the number of running nodes. The future will yield the number of connected nodes. - */ - private fun allNodesConnected(rpc: CordaRPCOps): CordaFuture { - val (snapshot, updates) = rpc.networkMapFeed() - val nodeName = rpc.nodeInfo().legalIdentities[0].name - val counterObservable = nodeCountObservable(nodeName, snapshot.size, updates) - countObservables[nodeName] = counterObservable - /* TODO: this might not always be the exact number of nodes one has to wait for, - * for example in the following sequence - * 1 start 3 nodes in order, A, B, C. - * 2 before the future returned by this function resolves, kill B - * At that point this future won't ever resolve as it will wait for nodes to know 3 other nodes. - */ - val requiredNodes = countObservables.size - - // This is an observable which yield the minimum number of nodes in each node network map. - val smallestSeenNetworkMapSize = Observable.combineLatest(countObservables.values.toList()) { args: Array -> - log.debug("smallestSeenNetworkMapSize for '$nodeName' is: ${args.toList()}") - args.map { it as Int }.min() ?: 0 - } - val future = smallestSeenNetworkMapSize.filter { it >= requiredNodes }.toFuture() - counterObservable.connect() - return future - } - /** * Start the node with the given flag which is expected to start the node for some function, which once complete will * terminate the node. @@ -652,16 +596,14 @@ class DriverDSLImpl( startInProcess: Boolean?, maximumHeapSize: String, localNetworkMap: LocalNetworkMap?): CordaFuture { + val visibilityHandle = networkVisibilityController.register(config.corda.myLegalName) val baseDirectory = config.corda.baseDirectory.createDirectories() localNetworkMap?.networkParametersCopier?.install(baseDirectory) localNetworkMap?.nodeInfosCopier?.addConfig(baseDirectory) val onNodeExit: () -> Unit = { localNetworkMap?.nodeInfosCopier?.removeConfig(baseDirectory) - countObservables.remove(config.corda.myLegalName) - synchronized(nodeNames) { - nodeNames.remove(config.corda.myLegalName) - } + visibilityHandle.close() } val useHTTPS = config.typesafe.run { hasPath("useHTTPS") && getBoolean("useHTTPS") } @@ -678,7 +620,7 @@ class DriverDSLImpl( ) return nodeAndThreadFuture.flatMap { (node, thread) -> establishRpc(config, openFuture()).flatMap { rpc -> - allNodesConnected(rpc).map { + visibilityHandle.listen(rpc).map { InProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, thread, onNodeExit, node) } } @@ -701,12 +643,13 @@ class DriverDSLImpl( } establishRpc(config, processDeathFuture).flatMap { rpc -> // Check for all nodes to have all other nodes in background in case RPC is failing over: - val networkMapFuture = executorService.fork { allNodesConnected(rpc) }.flatMap { it } + val networkMapFuture = executorService.fork { visibilityHandle.listen(rpc) }.flatMap { it } firstOf(processDeathFuture, networkMapFuture) { if (it == processDeathFuture) { throw ListenProcessDeathException(config.corda.p2pAddress, process) } - // Will interrupt polling for process death as this is no longer relevant since the process been successfully started and reflected itself in the NetworkMap. + // Will interrupt polling for process death as this is no longer relevant since the process been + // successfully started and reflected itself in the NetworkMap. processDeathFuture.cancel(true) log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: $webAddress") OutOfProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, debugPort, process, onNodeExit) @@ -725,7 +668,7 @@ class DriverDSLImpl( /** * The local version of the network map, which is a bunch of classes that copy the relevant files to the node directories. */ - private inner class LocalNetworkMap(notaryInfos: List) { + inner class LocalNetworkMap(notaryInfos: List) { val networkParametersCopier = NetworkParametersCopier(networkParameters.copy(notaries = notaryInfos)) // TODO: this object will copy NodeInfo files from started nodes to other nodes additional-node-infos/ // This uses the FileSystem and adds a delay (~5 seconds) given by the time we wait before polling the file system. @@ -737,12 +680,12 @@ class DriverDSLImpl( * Simple holder class to capture the node configuration both as the raw [Config] object and the parsed [NodeConfiguration]. * Keeping [Config] around is needed as the user may specify extra config options not specified in [NodeConfiguration]. */ - private class NodeConfig(val typesafe: Config, val corda: NodeConfiguration = typesafe.parseAsNodeConfiguration().also { nodeConfiguration -> - val errors = nodeConfiguration.validate() - if (errors.isNotEmpty()) { - throw IllegalStateException("Invalid node configuration. Errors where:${System.lineSeparator()}${errors.joinToString(System.lineSeparator())}") + private class NodeConfig(val typesafe: Config, val corda: NodeConfiguration = typesafe.parseAsNodeConfiguration()) { + init { + val errors = corda.validate() + require(errors.isEmpty()) { "Invalid node configuration. Errors where:\n${errors.joinToString("\n")}" } } - }) + } companion object { internal val log = contextLogger() @@ -910,6 +853,63 @@ class DriverDSLImpl( } } +/** + * Keeps track of how many nodes each node sees and gates nodes from completing their startNode [CordaFuture] until all + * current nodes see everyone. + */ +private class NetworkVisibilityController { + private val nodeVisibilityHandles = ThreadBox(HashMap()) + + fun register(name: CordaX500Name): VisibilityHandle { + val handle = VisibilityHandle() + nodeVisibilityHandles.locked { + require(putIfAbsent(name, handle) == null) { "Node with name $name is already started or starting" } + } + return handle + } + + private fun checkIfAllVisible() { + nodeVisibilityHandles.locked { + val minView = values.stream().mapToInt { it.visibleNodeCount }.min().orElse(0) + if (minView >= size) { + values.forEach { it.future.set(Unit) } + } + } + } + + inner class VisibilityHandle : AutoCloseable { + internal val future = openFuture() + internal var visibleNodeCount = 0 + private var subscription: Subscription? = null + + fun listen(rpc: CordaRPCOps): CordaFuture { + check(subscription == null) + val (snapshot, updates) = rpc.networkMapFeed() + visibleNodeCount = snapshot.size + checkIfAllVisible() + subscription = updates.subscribe { when (it) { + is NetworkMapCache.MapChange.Added -> { + visibleNodeCount++ + checkIfAllVisible() + } + is NetworkMapCache.MapChange.Removed -> { + visibleNodeCount-- + checkIfAllVisible() + } + } } + return future + } + + override fun close() { + subscription?.unsubscribe() + nodeVisibilityHandles.locked { + values -= this@VisibilityHandle + checkIfAllVisible() + } + } + } +} + interface InternalDriverDSL : DriverDSL, CordformContext { private companion object { private val DEFAULT_POLL_INTERVAL = 500.millis