CORDA-1095: Fixed rare race where the startNode future completes before the default notary is visible (#2947)

This commit is contained in:
Shams Asari 2018-04-11 15:33:55 +01:00 committed by GitHub
parent b46c3b89bd
commit f88542faa2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 130 additions and 137 deletions

View File

@ -1,13 +1,10 @@
package net.corda.finance.flows
import net.corda.core.internal.packageName
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.finance.DOLLARS
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.contracts.getCashBalance
import net.corda.finance.schemas.CashSchemaV1
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import net.corda.testing.driver.internal.InProcessImpl
@ -17,11 +14,8 @@ import org.junit.Test
class CashSelectionTest {
@Test
fun unconsumed_cash_states() {
driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf(Cash::class, CashSchemaV1::class).map { it.packageName })) {
defaultNotaryNode.getOrThrow()
fun `unconsumed cash states`() {
driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) {
val node = startNode().getOrThrow() as InProcessImpl
val issuerRef = OpaqueBytes.of(0)
val issuedAmount = 1000.DOLLARS

View File

@ -52,14 +52,10 @@ class FlowsDrainingModeContentionTest {
@Test
fun `draining mode does not deadlock with acks between 2 nodes`() {
val message = "Ground control to Major Tom"
driver(DriverParameters(isDebug = true, startNodesInProcess = true, portAllocation = portAllocation, extraCordappPackagesToScan = listOf(MessageState::class.packageName))) {
val nodeA = startNode(providedName = ALICE_NAME, rpcUsers = users).getOrThrow()
val nodeB = startNode(providedName = BOB_NAME, rpcUsers = users).getOrThrow()
defaultNotaryNode.getOrThrow()
val nodeARpcInfo = RpcInfo(nodeA.rpcAddress, user.username, user.password)
val flow = nodeA.rpc.startFlow(::ProposeTransactionAndWaitForCommit, message, nodeARpcInfo, nodeB.nodeInfo.singleIdentity(), defaultNotaryIdentity)

View File

@ -42,8 +42,6 @@ class NodeStatePersistenceTests {
val nodeName = {
val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow()
val nodeName = nodeHandle.nodeInfo.singleIdentity().name
// Ensure the notary node has finished starting up, before starting a flow that needs a notary
defaultNotaryNode.getOrThrow()
CordaRPCClient(nodeHandle.rpcAddress).start(user.username, user.password).use {
it.proxy.startFlow(::SendMessageFlow, message, defaultNotaryIdentity).returnValue.getOrThrow()
}
@ -76,8 +74,6 @@ class NodeStatePersistenceTests {
val nodeName = {
val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow()
val nodeName = nodeHandle.nodeInfo.singleIdentity().name
// Ensure the notary node has finished starting up, before starting a flow that needs a notary
defaultNotaryNode.getOrThrow()
CordaRPCClient(nodeHandle.rpcAddress).start(user.username, user.password).use {
it.proxy.startFlow(::SendMessageFlow, message, defaultNotaryIdentity).returnValue.getOrThrow()
}

View File

@ -90,16 +90,11 @@ class NodeRegistrationTest {
notarySpecs = listOf(NotarySpec(notaryName)),
extraCordappPackagesToScan = listOf("net.corda.finance")
) {
val nodes = listOf(
val (alice, genevieve) = listOf(
startNode(providedName = aliceName),
startNode(providedName = genevieveName),
defaultNotaryNode
startNode(providedName = genevieveName)
).transpose().getOrThrow()
log.info("Nodes started")
val (alice, genevieve) = nodes
assertThat(registrationHandler.idsPolled).containsOnly(
aliceName.organisation,
genevieveName.organisation,

View File

@ -107,8 +107,8 @@ class NodeSchemaServiceTest {
@Test
fun `check node runs inclusive of notary node schema set using driverDSL`() {
driver(DriverParameters(startNodesInProcess = true)) {
val notaryNode = defaultNotaryNode.getOrThrow().rpc.startFlow(::MappedSchemasFlow)
val mappedSchemas = notaryNode.returnValue.getOrThrow()
val notary = defaultNotaryNode.getOrThrow()
val mappedSchemas = notary.rpc.startFlow(::MappedSchemasFlow).returnValue.getOrThrow()
// check against NodeCore + NodeNotary Schemas
assertTrue(mappedSchemas.contains(NodeCoreV1.name))
assertTrue(mappedSchemas.contains(NodeNotaryV1.name))

View File

@ -34,7 +34,6 @@ class TraderDemoTest {
startFlow<CommercialPaperIssueFlow>(),
all()))
driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) {
defaultNotaryNode.getOrThrow()
val (nodeA, nodeB, bankNode) = listOf(
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)),
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)),

View File

@ -11,15 +11,13 @@ import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.NodeStartup
import net.corda.testing.common.internal.ProjectStructure.projectRootDir
import net.corda.testing.node.internal.addressMustBeBound
import net.corda.testing.node.internal.addressMustNotBeBound
import net.corda.testing.node.internal.internalDriver
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.DUMMY_BANK_B_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.*
import net.corda.testing.driver.internal.RandomFree
import net.corda.testing.http.HttpApi
import net.corda.testing.node.NotarySpec
import net.corda.testing.node.internal.addressMustBeBound
import net.corda.testing.node.internal.addressMustNotBeBound
import net.corda.testing.node.internal.internalDriver
import org.assertj.core.api.Assertions.*
import org.json.simple.JSONObject
import org.junit.Test
@ -70,9 +68,20 @@ class DriverTests {
}
}
@Test
fun `default notary is visible when the startNode future completes`() {
// Based on local testing, running this 3 times gives us a high confidence that we'll spot if the feature is not working
repeat(3) {
driver(DriverParameters(startNodesInProcess = true)) {
val bob = startNode(providedName = BOB_NAME).getOrThrow()
assertThat(bob.rpc.networkMapSnapshot().flatMap { it.legalIdentities }).contains(defaultNotaryIdentity)
}
}
}
@Test
fun `random free port allocation`() {
val nodeHandle = driver(DriverParameters(portAllocation = RandomFree)) {
val nodeHandle = driver(DriverParameters(portAllocation = RandomFree, notarySpecs = emptyList())) {
val nodeInfo = startNode(providedName = DUMMY_BANK_A_NAME)
nodeMustBeUp(nodeInfo)
}
@ -84,7 +93,11 @@ class DriverTests {
// Make sure we're using the log4j2 config which writes to the log file
val logConfigFile = projectRootDir / "config" / "dev" / "log4j2.xml"
assertThat(logConfigFile).isRegularFile()
driver(DriverParameters(isDebug = true, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) {
driver(DriverParameters(
isDebug = true,
notarySpecs = emptyList(),
systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString())
)) {
val baseDirectory = startNode(providedName = DUMMY_BANK_A_NAME).getOrThrow().baseDirectory
val logFile = (baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).list { it.sorted().findFirst().get() }
val debugLinesPresent = logFile.readLines { lines -> lines.anyMatch { line -> line.startsWith("[DEBUG]") } }
@ -94,7 +107,7 @@ class DriverTests {
@Test
fun `monitoring mode enables jolokia exporting of JMX metrics via HTTP JSON`() {
driver(DriverParameters(startNodesInProcess = false)) {
driver(DriverParameters(startNodesInProcess = false, notarySpecs = emptyList())) {
// start another node so we gain access to node JMX metrics
val webAddress = NetworkHostAndPort("localhost", 7006)
startNode(providedName = DUMMY_REGULATOR_NAME,
@ -123,33 +136,32 @@ class DriverTests {
@Test
fun `driver rejects multiple nodes with the same name`() {
driver(DriverParameters(startNodesInProcess = true)) {
assertThatThrownBy { listOf(newNode(DUMMY_BANK_A_NAME)(), newNode(DUMMY_BANK_B_NAME)(), newNode(DUMMY_BANK_A_NAME)()).transpose().getOrThrow() }.isInstanceOf(IllegalArgumentException::class.java)
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) {
assertThatThrownBy {
listOf(
newNode(DUMMY_BANK_A_NAME)(),
newNode(DUMMY_BANK_B_NAME)(),
newNode(DUMMY_BANK_A_NAME)()
).transpose().getOrThrow()
}.isInstanceOf(IllegalArgumentException::class.java)
}
}
@Test
fun `driver rejects multiple nodes with the same name parallel`() {
driver(DriverParameters(startNodesInProcess = true)) {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) {
val nodes = listOf(newNode(DUMMY_BANK_A_NAME), newNode(DUMMY_BANK_B_NAME), newNode(DUMMY_BANK_A_NAME))
assertThatThrownBy { nodes.parallelStream().map { it.invoke() }.toList().transpose().getOrThrow() }.isInstanceOf(IllegalArgumentException::class.java)
assertThatThrownBy {
nodes.parallelStream().map { it.invoke() }.toList().transpose().getOrThrow()
}.isInstanceOf(IllegalArgumentException::class.java)
}
}
@Test
fun `driver allows reusing names of nodes that have been stopped`() {
driver(DriverParameters(startNodesInProcess = true)) {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) {
val nodeA = newNode(DUMMY_BANK_A_NAME)().getOrThrow()
nodeA.stop()
assertThatCode { newNode(DUMMY_BANK_A_NAME)().getOrThrow() }.doesNotThrowAnyException()
}
}

View File

@ -68,7 +68,8 @@ interface DriverDSL {
* @param maximumHeapSize The maximum JVM heap size to use for the node as a [String]. By default a number is interpreted
* as being in bytes. Append the letter 'k' or 'K' to the value to indicate Kilobytes, 'm' or 'M' to indicate
* megabytes, and 'g' or 'G' to indicate gigabytes. The default value is "512m" = 512 megabytes.
* @return A [CordaFuture] on the [NodeHandle] to the node. The future will complete when the node is available.
* @return A [CordaFuture] on the [NodeHandle] to the node. The future will complete when the node is available and
* it sees all previously started nodes, including the notaries.
*/
fun startNode(
defaultParameters: NodeParameters = NodeParameters(),

View File

@ -19,14 +19,12 @@ import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NotaryInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.toFuture
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.millis
import net.corda.node.NodeRegistrationOption
import net.corda.node.internal.Node
import net.corda.node.internal.NodeStartup
import net.corda.node.internal.StartedNode
import net.corda.node.services.Permissions
import net.corda.node.services.config.*
@ -57,8 +55,7 @@ import net.corda.testing.node.internal.DriverDSLImpl.ClusterType.NON_VALIDATING_
import net.corda.testing.node.internal.DriverDSLImpl.ClusterType.VALIDATING_RAFT
import okhttp3.OkHttpClient
import okhttp3.Request
import rx.Observable
import rx.observables.ConnectableObservable
import rx.Subscription
import rx.schedulers.Schedulers
import java.lang.management.ManagementFactory
import java.net.ConnectException
@ -73,11 +70,10 @@ import java.time.Instant
import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.collections.HashMap
import kotlin.concurrent.thread
import net.corda.nodeapi.internal.config.User as InternalUser
@ -102,12 +98,11 @@ class DriverDSLImpl(
override val shutdownManager get() = _shutdownManager!!
private val cordappPackages = extraCordappPackagesToScan + getCallerPackage()
// Map from a nodes legal name to an observable emitting the number of nodes in its network map.
private val countObservables = ConcurrentHashMap<CordaX500Name, Observable<Int>>()
private val nodeNames = mutableSetOf<CordaX500Name>()
private val networkVisibilityController = NetworkVisibilityController()
/**
* Future which completes when the network map is available, whether a local one or one from the CZ. This future acts
* as a gate to prevent nodes from starting too early. The value of the future is a [LocalNetworkMap] object, which
* is null if the network map is being provided by the CZ.
* Future which completes when the network map infrastructure is available, whether a local one or one from the CZ.
* This future acts as a gate to prevent nodes from starting too early. The value of the future is a [LocalNetworkMap]
* object, which is null if the network map is being provided by the CZ.
*/
private lateinit var networkMapAvailability: CordaFuture<LocalNetworkMap?>
private lateinit var _notaries: CordaFuture<List<NotaryHandle>>
@ -120,13 +115,9 @@ class DriverDSLImpl(
private val state = ThreadBox(State())
//TODO: remove this once we can bundle quasar properly.
private val quasarJarPath: String by lazy {
resolveJar(".*quasar.*\\.jar$")
}
private val quasarJarPath: String by lazy { resolveJar(".*quasar.*\\.jar$") }
private val jolokiaJarPath: String by lazy {
resolveJar(".*jolokia-jvm-.*-agent\\.jar$")
}
private val jolokiaJarPath: String by lazy { resolveJar(".*jolokia-jvm-.*-agent\\.jar$") }
private fun resolveJar(jarNamePattern: String): String {
return try {
@ -189,12 +180,7 @@ class DriverDSLImpl(
val p2pAddress = portAllocation.nextHostAndPort()
// TODO: Derive name from the full picked name, don't just wrap the common name
val name = providedName ?: CordaX500Name("${oneOf(names).organisation}-${p2pAddress.port}", "London", "GB")
synchronized(nodeNames) {
val wasANewNode = nodeNames.add(name)
if (!wasANewNode) {
throw IllegalArgumentException("Node with name $name is already started or starting.")
}
}
val registrationFuture = if (compatibilityZone?.rootCert != null) {
// We don't need the network map to be available to be able to register the node
startNodeRegistration(name, compatibilityZone.rootCert, compatibilityZone.url)
@ -262,14 +248,20 @@ class DriverDSLImpl(
return if (startNodesInProcess) {
executorService.fork {
NetworkRegistrationHelper(config.corda, HTTPNetworkRegistrationService(compatibilityZoneURL), NodeRegistrationOption(rootTruststorePath, rootTruststorePassword)).buildKeystore()
NetworkRegistrationHelper(
config.corda,
HTTPNetworkRegistrationService(compatibilityZoneURL),
NodeRegistrationOption(rootTruststorePath, rootTruststorePassword)
).buildKeystore()
config
}
} else {
startOutOfProcessMiniNode(config,
startOutOfProcessMiniNode(
config,
"--initial-registration",
"--network-root-truststore=${rootTruststorePath.toAbsolutePath()}",
"--network-root-truststore-password=$rootTruststorePassword").map { config }
"--network-root-truststore-password=$rootTruststorePassword"
).map { config }
}
}
@ -575,54 +567,6 @@ class DriverDSLImpl(
return driverDirectory / nodeDirectoryName
}
/**
* @nodeName the name of the node which performs counting
* @param initial number of nodes currently in the network map of a running node.
* @param networkMapCacheChangeObservable an observable returning the updates to the node network map.
* @return a [ConnectableObservable] which emits a new [Int] every time the number of registered nodes changes
* the initial value emitted is always [initial]
*/
private fun nodeCountObservable(nodeName: CordaX500Name, initial: Int, networkMapCacheChangeObservable: Observable<NetworkMapCache.MapChange>):
ConnectableObservable<Int> {
val count = AtomicInteger(initial)
return networkMapCacheChangeObservable.map {
log.debug("nodeCountObservable for '$nodeName' received '$it'")
when (it) {
is NetworkMapCache.MapChange.Added -> count.incrementAndGet()
is NetworkMapCache.MapChange.Removed -> count.decrementAndGet()
is NetworkMapCache.MapChange.Modified -> count.get()
}
}.startWith(initial).replay()
}
/**
* @param rpc the [CordaRPCOps] of a newly started node.
* @return a [CordaFuture] which resolves when every node started by driver has in its network map a number of nodes
* equal to the number of running nodes. The future will yield the number of connected nodes.
*/
private fun allNodesConnected(rpc: CordaRPCOps): CordaFuture<Int> {
val (snapshot, updates) = rpc.networkMapFeed()
val nodeName = rpc.nodeInfo().legalIdentities[0].name
val counterObservable = nodeCountObservable(nodeName, snapshot.size, updates)
countObservables[nodeName] = counterObservable
/* TODO: this might not always be the exact number of nodes one has to wait for,
* for example in the following sequence
* 1 start 3 nodes in order, A, B, C.
* 2 before the future returned by this function resolves, kill B
* At that point this future won't ever resolve as it will wait for nodes to know 3 other nodes.
*/
val requiredNodes = countObservables.size
// This is an observable which yield the minimum number of nodes in each node network map.
val smallestSeenNetworkMapSize = Observable.combineLatest(countObservables.values.toList()) { args: Array<Any> ->
log.debug("smallestSeenNetworkMapSize for '$nodeName' is: ${args.toList()}")
args.map { it as Int }.min() ?: 0
}
val future = smallestSeenNetworkMapSize.filter { it >= requiredNodes }.toFuture()
counterObservable.connect()
return future
}
/**
* Start the node with the given flag which is expected to start the node for some function, which once complete will
* terminate the node.
@ -652,16 +596,14 @@ class DriverDSLImpl(
startInProcess: Boolean?,
maximumHeapSize: String,
localNetworkMap: LocalNetworkMap?): CordaFuture<NodeHandle> {
val visibilityHandle = networkVisibilityController.register(config.corda.myLegalName)
val baseDirectory = config.corda.baseDirectory.createDirectories()
localNetworkMap?.networkParametersCopier?.install(baseDirectory)
localNetworkMap?.nodeInfosCopier?.addConfig(baseDirectory)
val onNodeExit: () -> Unit = {
localNetworkMap?.nodeInfosCopier?.removeConfig(baseDirectory)
countObservables.remove(config.corda.myLegalName)
synchronized(nodeNames) {
nodeNames.remove(config.corda.myLegalName)
}
visibilityHandle.close()
}
val useHTTPS = config.typesafe.run { hasPath("useHTTPS") && getBoolean("useHTTPS") }
@ -678,7 +620,7 @@ class DriverDSLImpl(
)
return nodeAndThreadFuture.flatMap { (node, thread) ->
establishRpc(config, openFuture()).flatMap { rpc ->
allNodesConnected(rpc).map {
visibilityHandle.listen(rpc).map {
InProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, thread, onNodeExit, node)
}
}
@ -701,12 +643,13 @@ class DriverDSLImpl(
}
establishRpc(config, processDeathFuture).flatMap { rpc ->
// Check for all nodes to have all other nodes in background in case RPC is failing over:
val networkMapFuture = executorService.fork { allNodesConnected(rpc) }.flatMap { it }
val networkMapFuture = executorService.fork { visibilityHandle.listen(rpc) }.flatMap { it }
firstOf(processDeathFuture, networkMapFuture) {
if (it == processDeathFuture) {
throw ListenProcessDeathException(config.corda.p2pAddress, process)
}
// Will interrupt polling for process death as this is no longer relevant since the process been successfully started and reflected itself in the NetworkMap.
// Will interrupt polling for process death as this is no longer relevant since the process been
// successfully started and reflected itself in the NetworkMap.
processDeathFuture.cancel(true)
log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: $webAddress")
OutOfProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, debugPort, process, onNodeExit)
@ -725,7 +668,7 @@ class DriverDSLImpl(
/**
* The local version of the network map, which is a bunch of classes that copy the relevant files to the node directories.
*/
private inner class LocalNetworkMap(notaryInfos: List<NotaryInfo>) {
inner class LocalNetworkMap(notaryInfos: List<NotaryInfo>) {
val networkParametersCopier = NetworkParametersCopier(networkParameters.copy(notaries = notaryInfos))
// TODO: this object will copy NodeInfo files from started nodes to other nodes additional-node-infos/
// This uses the FileSystem and adds a delay (~5 seconds) given by the time we wait before polling the file system.
@ -737,12 +680,12 @@ class DriverDSLImpl(
* Simple holder class to capture the node configuration both as the raw [Config] object and the parsed [NodeConfiguration].
* Keeping [Config] around is needed as the user may specify extra config options not specified in [NodeConfiguration].
*/
private class NodeConfig(val typesafe: Config, val corda: NodeConfiguration = typesafe.parseAsNodeConfiguration().also { nodeConfiguration ->
val errors = nodeConfiguration.validate()
if (errors.isNotEmpty()) {
throw IllegalStateException("Invalid node configuration. Errors where:${System.lineSeparator()}${errors.joinToString(System.lineSeparator())}")
private class NodeConfig(val typesafe: Config, val corda: NodeConfiguration = typesafe.parseAsNodeConfiguration()) {
init {
val errors = corda.validate()
require(errors.isEmpty()) { "Invalid node configuration. Errors where:\n${errors.joinToString("\n")}" }
}
})
}
companion object {
internal val log = contextLogger()
@ -910,6 +853,63 @@ class DriverDSLImpl(
}
}
/**
* Keeps track of how many nodes each node sees and gates nodes from completing their startNode [CordaFuture] until all
* current nodes see everyone.
*/
private class NetworkVisibilityController {
private val nodeVisibilityHandles = ThreadBox(HashMap<CordaX500Name, VisibilityHandle>())
fun register(name: CordaX500Name): VisibilityHandle {
val handle = VisibilityHandle()
nodeVisibilityHandles.locked {
require(putIfAbsent(name, handle) == null) { "Node with name $name is already started or starting" }
}
return handle
}
private fun checkIfAllVisible() {
nodeVisibilityHandles.locked {
val minView = values.stream().mapToInt { it.visibleNodeCount }.min().orElse(0)
if (minView >= size) {
values.forEach { it.future.set(Unit) }
}
}
}
inner class VisibilityHandle : AutoCloseable {
internal val future = openFuture<Unit>()
internal var visibleNodeCount = 0
private var subscription: Subscription? = null
fun listen(rpc: CordaRPCOps): CordaFuture<Unit> {
check(subscription == null)
val (snapshot, updates) = rpc.networkMapFeed()
visibleNodeCount = snapshot.size
checkIfAllVisible()
subscription = updates.subscribe { when (it) {
is NetworkMapCache.MapChange.Added -> {
visibleNodeCount++
checkIfAllVisible()
}
is NetworkMapCache.MapChange.Removed -> {
visibleNodeCount--
checkIfAllVisible()
}
} }
return future
}
override fun close() {
subscription?.unsubscribe()
nodeVisibilityHandles.locked {
values -= this@VisibilityHandle
checkIfAllVisible()
}
}
}
}
interface InternalDriverDSL : DriverDSL, CordformContext {
private companion object {
private val DEFAULT_POLL_INTERVAL = 500.millis