CORDA-1095: Fixed rare race where the startNode future completes before the default notary is visible (#2947)

This commit is contained in:
Shams Asari 2018-04-11 15:33:55 +01:00 committed by GitHub
parent b46c3b89bd
commit f88542faa2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 130 additions and 137 deletions

View File

@ -1,13 +1,10 @@
package net.corda.finance.flows package net.corda.finance.flows
import net.corda.core.internal.packageName
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.finance.DOLLARS import net.corda.finance.DOLLARS
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.contracts.getCashBalance import net.corda.finance.contracts.getCashBalance
import net.corda.finance.schemas.CashSchemaV1
import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver import net.corda.testing.driver.driver
import net.corda.testing.driver.internal.InProcessImpl import net.corda.testing.driver.internal.InProcessImpl
@ -17,11 +14,8 @@ import org.junit.Test
class CashSelectionTest { class CashSelectionTest {
@Test @Test
fun unconsumed_cash_states() { fun `unconsumed cash states`() {
driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) {
driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf(Cash::class, CashSchemaV1::class).map { it.packageName })) {
defaultNotaryNode.getOrThrow()
val node = startNode().getOrThrow() as InProcessImpl val node = startNode().getOrThrow() as InProcessImpl
val issuerRef = OpaqueBytes.of(0) val issuerRef = OpaqueBytes.of(0)
val issuedAmount = 1000.DOLLARS val issuedAmount = 1000.DOLLARS

View File

@ -52,14 +52,10 @@ class FlowsDrainingModeContentionTest {
@Test @Test
fun `draining mode does not deadlock with acks between 2 nodes`() { fun `draining mode does not deadlock with acks between 2 nodes`() {
val message = "Ground control to Major Tom" val message = "Ground control to Major Tom"
driver(DriverParameters(isDebug = true, startNodesInProcess = true, portAllocation = portAllocation, extraCordappPackagesToScan = listOf(MessageState::class.packageName))) { driver(DriverParameters(isDebug = true, startNodesInProcess = true, portAllocation = portAllocation, extraCordappPackagesToScan = listOf(MessageState::class.packageName))) {
val nodeA = startNode(providedName = ALICE_NAME, rpcUsers = users).getOrThrow() val nodeA = startNode(providedName = ALICE_NAME, rpcUsers = users).getOrThrow()
val nodeB = startNode(providedName = BOB_NAME, rpcUsers = users).getOrThrow() val nodeB = startNode(providedName = BOB_NAME, rpcUsers = users).getOrThrow()
defaultNotaryNode.getOrThrow()
val nodeARpcInfo = RpcInfo(nodeA.rpcAddress, user.username, user.password) val nodeARpcInfo = RpcInfo(nodeA.rpcAddress, user.username, user.password)
val flow = nodeA.rpc.startFlow(::ProposeTransactionAndWaitForCommit, message, nodeARpcInfo, nodeB.nodeInfo.singleIdentity(), defaultNotaryIdentity) val flow = nodeA.rpc.startFlow(::ProposeTransactionAndWaitForCommit, message, nodeARpcInfo, nodeB.nodeInfo.singleIdentity(), defaultNotaryIdentity)

View File

@ -42,8 +42,6 @@ class NodeStatePersistenceTests {
val nodeName = { val nodeName = {
val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow() val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow()
val nodeName = nodeHandle.nodeInfo.singleIdentity().name val nodeName = nodeHandle.nodeInfo.singleIdentity().name
// Ensure the notary node has finished starting up, before starting a flow that needs a notary
defaultNotaryNode.getOrThrow()
CordaRPCClient(nodeHandle.rpcAddress).start(user.username, user.password).use { CordaRPCClient(nodeHandle.rpcAddress).start(user.username, user.password).use {
it.proxy.startFlow(::SendMessageFlow, message, defaultNotaryIdentity).returnValue.getOrThrow() it.proxy.startFlow(::SendMessageFlow, message, defaultNotaryIdentity).returnValue.getOrThrow()
} }
@ -76,8 +74,6 @@ class NodeStatePersistenceTests {
val nodeName = { val nodeName = {
val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow() val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow()
val nodeName = nodeHandle.nodeInfo.singleIdentity().name val nodeName = nodeHandle.nodeInfo.singleIdentity().name
// Ensure the notary node has finished starting up, before starting a flow that needs a notary
defaultNotaryNode.getOrThrow()
CordaRPCClient(nodeHandle.rpcAddress).start(user.username, user.password).use { CordaRPCClient(nodeHandle.rpcAddress).start(user.username, user.password).use {
it.proxy.startFlow(::SendMessageFlow, message, defaultNotaryIdentity).returnValue.getOrThrow() it.proxy.startFlow(::SendMessageFlow, message, defaultNotaryIdentity).returnValue.getOrThrow()
} }

View File

@ -90,16 +90,11 @@ class NodeRegistrationTest {
notarySpecs = listOf(NotarySpec(notaryName)), notarySpecs = listOf(NotarySpec(notaryName)),
extraCordappPackagesToScan = listOf("net.corda.finance") extraCordappPackagesToScan = listOf("net.corda.finance")
) { ) {
val nodes = listOf( val (alice, genevieve) = listOf(
startNode(providedName = aliceName), startNode(providedName = aliceName),
startNode(providedName = genevieveName), startNode(providedName = genevieveName)
defaultNotaryNode
).transpose().getOrThrow() ).transpose().getOrThrow()
log.info("Nodes started")
val (alice, genevieve) = nodes
assertThat(registrationHandler.idsPolled).containsOnly( assertThat(registrationHandler.idsPolled).containsOnly(
aliceName.organisation, aliceName.organisation,
genevieveName.organisation, genevieveName.organisation,

View File

@ -107,8 +107,8 @@ class NodeSchemaServiceTest {
@Test @Test
fun `check node runs inclusive of notary node schema set using driverDSL`() { fun `check node runs inclusive of notary node schema set using driverDSL`() {
driver(DriverParameters(startNodesInProcess = true)) { driver(DriverParameters(startNodesInProcess = true)) {
val notaryNode = defaultNotaryNode.getOrThrow().rpc.startFlow(::MappedSchemasFlow) val notary = defaultNotaryNode.getOrThrow()
val mappedSchemas = notaryNode.returnValue.getOrThrow() val mappedSchemas = notary.rpc.startFlow(::MappedSchemasFlow).returnValue.getOrThrow()
// check against NodeCore + NodeNotary Schemas // check against NodeCore + NodeNotary Schemas
assertTrue(mappedSchemas.contains(NodeCoreV1.name)) assertTrue(mappedSchemas.contains(NodeCoreV1.name))
assertTrue(mappedSchemas.contains(NodeNotaryV1.name)) assertTrue(mappedSchemas.contains(NodeNotaryV1.name))

View File

@ -34,7 +34,6 @@ class TraderDemoTest {
startFlow<CommercialPaperIssueFlow>(), startFlow<CommercialPaperIssueFlow>(),
all())) all()))
driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) { driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) {
defaultNotaryNode.getOrThrow()
val (nodeA, nodeB, bankNode) = listOf( val (nodeA, nodeB, bankNode) = listOf(
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)), startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)),
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)), startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)),

View File

@ -11,15 +11,13 @@ import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.NodeStartup import net.corda.node.internal.NodeStartup
import net.corda.testing.common.internal.ProjectStructure.projectRootDir import net.corda.testing.common.internal.ProjectStructure.projectRootDir
import net.corda.testing.node.internal.addressMustBeBound import net.corda.testing.core.*
import net.corda.testing.node.internal.addressMustNotBeBound
import net.corda.testing.node.internal.internalDriver
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.DUMMY_BANK_B_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.driver.internal.RandomFree import net.corda.testing.driver.internal.RandomFree
import net.corda.testing.http.HttpApi import net.corda.testing.http.HttpApi
import net.corda.testing.node.NotarySpec import net.corda.testing.node.NotarySpec
import net.corda.testing.node.internal.addressMustBeBound
import net.corda.testing.node.internal.addressMustNotBeBound
import net.corda.testing.node.internal.internalDriver
import org.assertj.core.api.Assertions.* import org.assertj.core.api.Assertions.*
import org.json.simple.JSONObject import org.json.simple.JSONObject
import org.junit.Test import org.junit.Test
@ -70,9 +68,20 @@ class DriverTests {
} }
} }
@Test
fun `default notary is visible when the startNode future completes`() {
// Based on local testing, running this 3 times gives us a high confidence that we'll spot if the feature is not working
repeat(3) {
driver(DriverParameters(startNodesInProcess = true)) {
val bob = startNode(providedName = BOB_NAME).getOrThrow()
assertThat(bob.rpc.networkMapSnapshot().flatMap { it.legalIdentities }).contains(defaultNotaryIdentity)
}
}
}
@Test @Test
fun `random free port allocation`() { fun `random free port allocation`() {
val nodeHandle = driver(DriverParameters(portAllocation = RandomFree)) { val nodeHandle = driver(DriverParameters(portAllocation = RandomFree, notarySpecs = emptyList())) {
val nodeInfo = startNode(providedName = DUMMY_BANK_A_NAME) val nodeInfo = startNode(providedName = DUMMY_BANK_A_NAME)
nodeMustBeUp(nodeInfo) nodeMustBeUp(nodeInfo)
} }
@ -84,7 +93,11 @@ class DriverTests {
// Make sure we're using the log4j2 config which writes to the log file // Make sure we're using the log4j2 config which writes to the log file
val logConfigFile = projectRootDir / "config" / "dev" / "log4j2.xml" val logConfigFile = projectRootDir / "config" / "dev" / "log4j2.xml"
assertThat(logConfigFile).isRegularFile() assertThat(logConfigFile).isRegularFile()
driver(DriverParameters(isDebug = true, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) { driver(DriverParameters(
isDebug = true,
notarySpecs = emptyList(),
systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString())
)) {
val baseDirectory = startNode(providedName = DUMMY_BANK_A_NAME).getOrThrow().baseDirectory val baseDirectory = startNode(providedName = DUMMY_BANK_A_NAME).getOrThrow().baseDirectory
val logFile = (baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).list { it.sorted().findFirst().get() } val logFile = (baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).list { it.sorted().findFirst().get() }
val debugLinesPresent = logFile.readLines { lines -> lines.anyMatch { line -> line.startsWith("[DEBUG]") } } val debugLinesPresent = logFile.readLines { lines -> lines.anyMatch { line -> line.startsWith("[DEBUG]") } }
@ -94,7 +107,7 @@ class DriverTests {
@Test @Test
fun `monitoring mode enables jolokia exporting of JMX metrics via HTTP JSON`() { fun `monitoring mode enables jolokia exporting of JMX metrics via HTTP JSON`() {
driver(DriverParameters(startNodesInProcess = false)) { driver(DriverParameters(startNodesInProcess = false, notarySpecs = emptyList())) {
// start another node so we gain access to node JMX metrics // start another node so we gain access to node JMX metrics
val webAddress = NetworkHostAndPort("localhost", 7006) val webAddress = NetworkHostAndPort("localhost", 7006)
startNode(providedName = DUMMY_REGULATOR_NAME, startNode(providedName = DUMMY_REGULATOR_NAME,
@ -123,33 +136,32 @@ class DriverTests {
@Test @Test
fun `driver rejects multiple nodes with the same name`() { fun `driver rejects multiple nodes with the same name`() {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) {
driver(DriverParameters(startNodesInProcess = true)) { assertThatThrownBy {
listOf(
assertThatThrownBy { listOf(newNode(DUMMY_BANK_A_NAME)(), newNode(DUMMY_BANK_B_NAME)(), newNode(DUMMY_BANK_A_NAME)()).transpose().getOrThrow() }.isInstanceOf(IllegalArgumentException::class.java) newNode(DUMMY_BANK_A_NAME)(),
newNode(DUMMY_BANK_B_NAME)(),
newNode(DUMMY_BANK_A_NAME)()
).transpose().getOrThrow()
}.isInstanceOf(IllegalArgumentException::class.java)
} }
} }
@Test @Test
fun `driver rejects multiple nodes with the same name parallel`() { fun `driver rejects multiple nodes with the same name parallel`() {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) {
driver(DriverParameters(startNodesInProcess = true)) {
val nodes = listOf(newNode(DUMMY_BANK_A_NAME), newNode(DUMMY_BANK_B_NAME), newNode(DUMMY_BANK_A_NAME)) val nodes = listOf(newNode(DUMMY_BANK_A_NAME), newNode(DUMMY_BANK_B_NAME), newNode(DUMMY_BANK_A_NAME))
assertThatThrownBy {
assertThatThrownBy { nodes.parallelStream().map { it.invoke() }.toList().transpose().getOrThrow() }.isInstanceOf(IllegalArgumentException::class.java) nodes.parallelStream().map { it.invoke() }.toList().transpose().getOrThrow()
}.isInstanceOf(IllegalArgumentException::class.java)
} }
} }
@Test @Test
fun `driver allows reusing names of nodes that have been stopped`() { fun `driver allows reusing names of nodes that have been stopped`() {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) {
driver(DriverParameters(startNodesInProcess = true)) {
val nodeA = newNode(DUMMY_BANK_A_NAME)().getOrThrow() val nodeA = newNode(DUMMY_BANK_A_NAME)().getOrThrow()
nodeA.stop() nodeA.stop()
assertThatCode { newNode(DUMMY_BANK_A_NAME)().getOrThrow() }.doesNotThrowAnyException() assertThatCode { newNode(DUMMY_BANK_A_NAME)().getOrThrow() }.doesNotThrowAnyException()
} }
} }

View File

@ -68,7 +68,8 @@ interface DriverDSL {
* @param maximumHeapSize The maximum JVM heap size to use for the node as a [String]. By default a number is interpreted * @param maximumHeapSize The maximum JVM heap size to use for the node as a [String]. By default a number is interpreted
* as being in bytes. Append the letter 'k' or 'K' to the value to indicate Kilobytes, 'm' or 'M' to indicate * as being in bytes. Append the letter 'k' or 'K' to the value to indicate Kilobytes, 'm' or 'M' to indicate
* megabytes, and 'g' or 'G' to indicate gigabytes. The default value is "512m" = 512 megabytes. * megabytes, and 'g' or 'G' to indicate gigabytes. The default value is "512m" = 512 megabytes.
* @return A [CordaFuture] on the [NodeHandle] to the node. The future will complete when the node is available. * @return A [CordaFuture] on the [NodeHandle] to the node. The future will complete when the node is available and
* it sees all previously started nodes, including the notaries.
*/ */
fun startNode( fun startNode(
defaultParameters: NodeParameters = NodeParameters(), defaultParameters: NodeParameters = NodeParameters(),

View File

@ -19,14 +19,12 @@ import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NetworkParameters import net.corda.core.node.NetworkParameters
import net.corda.core.node.NotaryInfo import net.corda.core.node.NotaryInfo
import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.NetworkMapCache
import net.corda.core.toFuture
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.millis import net.corda.core.utilities.millis
import net.corda.node.NodeRegistrationOption import net.corda.node.NodeRegistrationOption
import net.corda.node.internal.Node import net.corda.node.internal.Node
import net.corda.node.internal.NodeStartup
import net.corda.node.internal.StartedNode import net.corda.node.internal.StartedNode
import net.corda.node.services.Permissions import net.corda.node.services.Permissions
import net.corda.node.services.config.* import net.corda.node.services.config.*
@ -57,8 +55,7 @@ import net.corda.testing.node.internal.DriverDSLImpl.ClusterType.NON_VALIDATING_
import net.corda.testing.node.internal.DriverDSLImpl.ClusterType.VALIDATING_RAFT import net.corda.testing.node.internal.DriverDSLImpl.ClusterType.VALIDATING_RAFT
import okhttp3.OkHttpClient import okhttp3.OkHttpClient
import okhttp3.Request import okhttp3.Request
import rx.Observable import rx.Subscription
import rx.observables.ConnectableObservable
import rx.schedulers.Schedulers import rx.schedulers.Schedulers
import java.lang.management.ManagementFactory import java.lang.management.ManagementFactory
import java.net.ConnectException import java.net.ConnectException
@ -73,11 +70,10 @@ import java.time.Instant
import java.time.ZoneOffset.UTC import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter import java.time.format.DateTimeFormatter
import java.util.* import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger import kotlin.collections.HashMap
import kotlin.concurrent.thread import kotlin.concurrent.thread
import net.corda.nodeapi.internal.config.User as InternalUser import net.corda.nodeapi.internal.config.User as InternalUser
@ -102,12 +98,11 @@ class DriverDSLImpl(
override val shutdownManager get() = _shutdownManager!! override val shutdownManager get() = _shutdownManager!!
private val cordappPackages = extraCordappPackagesToScan + getCallerPackage() private val cordappPackages = extraCordappPackagesToScan + getCallerPackage()
// Map from a nodes legal name to an observable emitting the number of nodes in its network map. // Map from a nodes legal name to an observable emitting the number of nodes in its network map.
private val countObservables = ConcurrentHashMap<CordaX500Name, Observable<Int>>() private val networkVisibilityController = NetworkVisibilityController()
private val nodeNames = mutableSetOf<CordaX500Name>()
/** /**
* Future which completes when the network map is available, whether a local one or one from the CZ. This future acts * Future which completes when the network map infrastructure is available, whether a local one or one from the CZ.
* as a gate to prevent nodes from starting too early. The value of the future is a [LocalNetworkMap] object, which * This future acts as a gate to prevent nodes from starting too early. The value of the future is a [LocalNetworkMap]
* is null if the network map is being provided by the CZ. * object, which is null if the network map is being provided by the CZ.
*/ */
private lateinit var networkMapAvailability: CordaFuture<LocalNetworkMap?> private lateinit var networkMapAvailability: CordaFuture<LocalNetworkMap?>
private lateinit var _notaries: CordaFuture<List<NotaryHandle>> private lateinit var _notaries: CordaFuture<List<NotaryHandle>>
@ -120,13 +115,9 @@ class DriverDSLImpl(
private val state = ThreadBox(State()) private val state = ThreadBox(State())
//TODO: remove this once we can bundle quasar properly. //TODO: remove this once we can bundle quasar properly.
private val quasarJarPath: String by lazy { private val quasarJarPath: String by lazy { resolveJar(".*quasar.*\\.jar$") }
resolveJar(".*quasar.*\\.jar$")
}
private val jolokiaJarPath: String by lazy { private val jolokiaJarPath: String by lazy { resolveJar(".*jolokia-jvm-.*-agent\\.jar$") }
resolveJar(".*jolokia-jvm-.*-agent\\.jar$")
}
private fun resolveJar(jarNamePattern: String): String { private fun resolveJar(jarNamePattern: String): String {
return try { return try {
@ -189,12 +180,7 @@ class DriverDSLImpl(
val p2pAddress = portAllocation.nextHostAndPort() val p2pAddress = portAllocation.nextHostAndPort()
// TODO: Derive name from the full picked name, don't just wrap the common name // TODO: Derive name from the full picked name, don't just wrap the common name
val name = providedName ?: CordaX500Name("${oneOf(names).organisation}-${p2pAddress.port}", "London", "GB") val name = providedName ?: CordaX500Name("${oneOf(names).organisation}-${p2pAddress.port}", "London", "GB")
synchronized(nodeNames) {
val wasANewNode = nodeNames.add(name)
if (!wasANewNode) {
throw IllegalArgumentException("Node with name $name is already started or starting.")
}
}
val registrationFuture = if (compatibilityZone?.rootCert != null) { val registrationFuture = if (compatibilityZone?.rootCert != null) {
// We don't need the network map to be available to be able to register the node // We don't need the network map to be available to be able to register the node
startNodeRegistration(name, compatibilityZone.rootCert, compatibilityZone.url) startNodeRegistration(name, compatibilityZone.rootCert, compatibilityZone.url)
@ -262,14 +248,20 @@ class DriverDSLImpl(
return if (startNodesInProcess) { return if (startNodesInProcess) {
executorService.fork { executorService.fork {
NetworkRegistrationHelper(config.corda, HTTPNetworkRegistrationService(compatibilityZoneURL), NodeRegistrationOption(rootTruststorePath, rootTruststorePassword)).buildKeystore() NetworkRegistrationHelper(
config.corda,
HTTPNetworkRegistrationService(compatibilityZoneURL),
NodeRegistrationOption(rootTruststorePath, rootTruststorePassword)
).buildKeystore()
config config
} }
} else { } else {
startOutOfProcessMiniNode(config, startOutOfProcessMiniNode(
config,
"--initial-registration", "--initial-registration",
"--network-root-truststore=${rootTruststorePath.toAbsolutePath()}", "--network-root-truststore=${rootTruststorePath.toAbsolutePath()}",
"--network-root-truststore-password=$rootTruststorePassword").map { config } "--network-root-truststore-password=$rootTruststorePassword"
).map { config }
} }
} }
@ -575,54 +567,6 @@ class DriverDSLImpl(
return driverDirectory / nodeDirectoryName return driverDirectory / nodeDirectoryName
} }
/**
* @nodeName the name of the node which performs counting
* @param initial number of nodes currently in the network map of a running node.
* @param networkMapCacheChangeObservable an observable returning the updates to the node network map.
* @return a [ConnectableObservable] which emits a new [Int] every time the number of registered nodes changes
* the initial value emitted is always [initial]
*/
private fun nodeCountObservable(nodeName: CordaX500Name, initial: Int, networkMapCacheChangeObservable: Observable<NetworkMapCache.MapChange>):
ConnectableObservable<Int> {
val count = AtomicInteger(initial)
return networkMapCacheChangeObservable.map {
log.debug("nodeCountObservable for '$nodeName' received '$it'")
when (it) {
is NetworkMapCache.MapChange.Added -> count.incrementAndGet()
is NetworkMapCache.MapChange.Removed -> count.decrementAndGet()
is NetworkMapCache.MapChange.Modified -> count.get()
}
}.startWith(initial).replay()
}
/**
* @param rpc the [CordaRPCOps] of a newly started node.
* @return a [CordaFuture] which resolves when every node started by driver has in its network map a number of nodes
* equal to the number of running nodes. The future will yield the number of connected nodes.
*/
private fun allNodesConnected(rpc: CordaRPCOps): CordaFuture<Int> {
val (snapshot, updates) = rpc.networkMapFeed()
val nodeName = rpc.nodeInfo().legalIdentities[0].name
val counterObservable = nodeCountObservable(nodeName, snapshot.size, updates)
countObservables[nodeName] = counterObservable
/* TODO: this might not always be the exact number of nodes one has to wait for,
* for example in the following sequence
* 1 start 3 nodes in order, A, B, C.
* 2 before the future returned by this function resolves, kill B
* At that point this future won't ever resolve as it will wait for nodes to know 3 other nodes.
*/
val requiredNodes = countObservables.size
// This is an observable which yield the minimum number of nodes in each node network map.
val smallestSeenNetworkMapSize = Observable.combineLatest(countObservables.values.toList()) { args: Array<Any> ->
log.debug("smallestSeenNetworkMapSize for '$nodeName' is: ${args.toList()}")
args.map { it as Int }.min() ?: 0
}
val future = smallestSeenNetworkMapSize.filter { it >= requiredNodes }.toFuture()
counterObservable.connect()
return future
}
/** /**
* Start the node with the given flag which is expected to start the node for some function, which once complete will * Start the node with the given flag which is expected to start the node for some function, which once complete will
* terminate the node. * terminate the node.
@ -652,16 +596,14 @@ class DriverDSLImpl(
startInProcess: Boolean?, startInProcess: Boolean?,
maximumHeapSize: String, maximumHeapSize: String,
localNetworkMap: LocalNetworkMap?): CordaFuture<NodeHandle> { localNetworkMap: LocalNetworkMap?): CordaFuture<NodeHandle> {
val visibilityHandle = networkVisibilityController.register(config.corda.myLegalName)
val baseDirectory = config.corda.baseDirectory.createDirectories() val baseDirectory = config.corda.baseDirectory.createDirectories()
localNetworkMap?.networkParametersCopier?.install(baseDirectory) localNetworkMap?.networkParametersCopier?.install(baseDirectory)
localNetworkMap?.nodeInfosCopier?.addConfig(baseDirectory) localNetworkMap?.nodeInfosCopier?.addConfig(baseDirectory)
val onNodeExit: () -> Unit = { val onNodeExit: () -> Unit = {
localNetworkMap?.nodeInfosCopier?.removeConfig(baseDirectory) localNetworkMap?.nodeInfosCopier?.removeConfig(baseDirectory)
countObservables.remove(config.corda.myLegalName) visibilityHandle.close()
synchronized(nodeNames) {
nodeNames.remove(config.corda.myLegalName)
}
} }
val useHTTPS = config.typesafe.run { hasPath("useHTTPS") && getBoolean("useHTTPS") } val useHTTPS = config.typesafe.run { hasPath("useHTTPS") && getBoolean("useHTTPS") }
@ -678,7 +620,7 @@ class DriverDSLImpl(
) )
return nodeAndThreadFuture.flatMap { (node, thread) -> return nodeAndThreadFuture.flatMap { (node, thread) ->
establishRpc(config, openFuture()).flatMap { rpc -> establishRpc(config, openFuture()).flatMap { rpc ->
allNodesConnected(rpc).map { visibilityHandle.listen(rpc).map {
InProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, thread, onNodeExit, node) InProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, thread, onNodeExit, node)
} }
} }
@ -701,12 +643,13 @@ class DriverDSLImpl(
} }
establishRpc(config, processDeathFuture).flatMap { rpc -> establishRpc(config, processDeathFuture).flatMap { rpc ->
// Check for all nodes to have all other nodes in background in case RPC is failing over: // Check for all nodes to have all other nodes in background in case RPC is failing over:
val networkMapFuture = executorService.fork { allNodesConnected(rpc) }.flatMap { it } val networkMapFuture = executorService.fork { visibilityHandle.listen(rpc) }.flatMap { it }
firstOf(processDeathFuture, networkMapFuture) { firstOf(processDeathFuture, networkMapFuture) {
if (it == processDeathFuture) { if (it == processDeathFuture) {
throw ListenProcessDeathException(config.corda.p2pAddress, process) throw ListenProcessDeathException(config.corda.p2pAddress, process)
} }
// Will interrupt polling for process death as this is no longer relevant since the process been successfully started and reflected itself in the NetworkMap. // Will interrupt polling for process death as this is no longer relevant since the process been
// successfully started and reflected itself in the NetworkMap.
processDeathFuture.cancel(true) processDeathFuture.cancel(true)
log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: $webAddress") log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: $webAddress")
OutOfProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, debugPort, process, onNodeExit) OutOfProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, debugPort, process, onNodeExit)
@ -725,7 +668,7 @@ class DriverDSLImpl(
/** /**
* The local version of the network map, which is a bunch of classes that copy the relevant files to the node directories. * The local version of the network map, which is a bunch of classes that copy the relevant files to the node directories.
*/ */
private inner class LocalNetworkMap(notaryInfos: List<NotaryInfo>) { inner class LocalNetworkMap(notaryInfos: List<NotaryInfo>) {
val networkParametersCopier = NetworkParametersCopier(networkParameters.copy(notaries = notaryInfos)) val networkParametersCopier = NetworkParametersCopier(networkParameters.copy(notaries = notaryInfos))
// TODO: this object will copy NodeInfo files from started nodes to other nodes additional-node-infos/ // TODO: this object will copy NodeInfo files from started nodes to other nodes additional-node-infos/
// This uses the FileSystem and adds a delay (~5 seconds) given by the time we wait before polling the file system. // This uses the FileSystem and adds a delay (~5 seconds) given by the time we wait before polling the file system.
@ -737,12 +680,12 @@ class DriverDSLImpl(
* Simple holder class to capture the node configuration both as the raw [Config] object and the parsed [NodeConfiguration]. * Simple holder class to capture the node configuration both as the raw [Config] object and the parsed [NodeConfiguration].
* Keeping [Config] around is needed as the user may specify extra config options not specified in [NodeConfiguration]. * Keeping [Config] around is needed as the user may specify extra config options not specified in [NodeConfiguration].
*/ */
private class NodeConfig(val typesafe: Config, val corda: NodeConfiguration = typesafe.parseAsNodeConfiguration().also { nodeConfiguration -> private class NodeConfig(val typesafe: Config, val corda: NodeConfiguration = typesafe.parseAsNodeConfiguration()) {
val errors = nodeConfiguration.validate() init {
if (errors.isNotEmpty()) { val errors = corda.validate()
throw IllegalStateException("Invalid node configuration. Errors where:${System.lineSeparator()}${errors.joinToString(System.lineSeparator())}") require(errors.isEmpty()) { "Invalid node configuration. Errors where:\n${errors.joinToString("\n")}" }
} }
}) }
companion object { companion object {
internal val log = contextLogger() internal val log = contextLogger()
@ -910,6 +853,63 @@ class DriverDSLImpl(
} }
} }
/**
* Keeps track of how many nodes each node sees and gates nodes from completing their startNode [CordaFuture] until all
* current nodes see everyone.
*/
private class NetworkVisibilityController {
private val nodeVisibilityHandles = ThreadBox(HashMap<CordaX500Name, VisibilityHandle>())
fun register(name: CordaX500Name): VisibilityHandle {
val handle = VisibilityHandle()
nodeVisibilityHandles.locked {
require(putIfAbsent(name, handle) == null) { "Node with name $name is already started or starting" }
}
return handle
}
private fun checkIfAllVisible() {
nodeVisibilityHandles.locked {
val minView = values.stream().mapToInt { it.visibleNodeCount }.min().orElse(0)
if (minView >= size) {
values.forEach { it.future.set(Unit) }
}
}
}
inner class VisibilityHandle : AutoCloseable {
internal val future = openFuture<Unit>()
internal var visibleNodeCount = 0
private var subscription: Subscription? = null
fun listen(rpc: CordaRPCOps): CordaFuture<Unit> {
check(subscription == null)
val (snapshot, updates) = rpc.networkMapFeed()
visibleNodeCount = snapshot.size
checkIfAllVisible()
subscription = updates.subscribe { when (it) {
is NetworkMapCache.MapChange.Added -> {
visibleNodeCount++
checkIfAllVisible()
}
is NetworkMapCache.MapChange.Removed -> {
visibleNodeCount--
checkIfAllVisible()
}
} }
return future
}
override fun close() {
subscription?.unsubscribe()
nodeVisibilityHandles.locked {
values -= this@VisibilityHandle
checkIfAllVisible()
}
}
}
}
interface InternalDriverDSL : DriverDSL, CordformContext { interface InternalDriverDSL : DriverDSL, CordformContext {
private companion object { private companion object {
private val DEFAULT_POLL_INTERVAL = 500.millis private val DEFAULT_POLL_INTERVAL = 500.millis