diff --git a/build.gradle b/build.gradle index 07a99bf735..834347ca42 100644 --- a/build.gradle +++ b/build.gradle @@ -69,7 +69,7 @@ buildscript { ext.jopt_simple_version = '5.0.2' ext.jansi_version = '1.14' ext.hibernate_version = '5.2.6.Final' - ext.h2_version = '1.4.194' // Update docs if renamed or removed. + ext.h2_version = '1.4.197' // Update docs if renamed or removed. ext.postgresql_version = '42.1.4' ext.rxjava_version = '1.2.4' ext.dokka_version = '0.9.16-eap-2' diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 2b91ddca64..3bd1122770 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -44,9 +44,15 @@ Unreleased * java.security.cert.X509CRL serialization support added. -* Added ``NetworkMapCache.getNodesByLegalName`` for querying nodes belonging to a distributed service such as a notary cluster - where they all share a common identity. ``NetworkMapCache.getNodeByLegalName`` has been tightened to throw if more than - one node with the legal name is found. +* Upgraded H2 to v1.4.197. + +* Shell (embedded available only in dev mode or via SSH) connects to the node via RPC instead of using the ``CordaRPCOps`` object directly. + To enable RPC connectivity ensure node’s ``rpcSettings.address`` and ``rpcSettings.adminAddress`` settings are present. + +.. _changelog_v3: + +Version 3.0 +----------- * Per CorDapp configuration is now exposed. ``CordappContext`` now exposes a ``CordappConfig`` object that is populated at CorDapp context creation time from a file source during runtime. diff --git a/finance/src/integration-test/kotlin/net/corda/finance/flows/CashSelectionTest.kt b/finance/src/integration-test/kotlin/net/corda/finance/flows/CashSelectionTest.kt index de1a0658c4..bf055a3fa4 100644 --- a/finance/src/integration-test/kotlin/net/corda/finance/flows/CashSelectionTest.kt +++ b/finance/src/integration-test/kotlin/net/corda/finance/flows/CashSelectionTest.kt @@ -1,13 +1,10 @@ package net.corda.finance.flows -import net.corda.core.internal.packageName import net.corda.core.messaging.startFlow import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.getOrThrow import net.corda.finance.DOLLARS -import net.corda.finance.contracts.asset.Cash import net.corda.finance.contracts.getCashBalance -import net.corda.finance.schemas.CashSchemaV1 import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.BOB_NAME import net.corda.testing.core.DUMMY_BANK_A_NAME @@ -31,11 +28,8 @@ class CashSelectionTest : IntegrationTest() { } @Test - fun unconsumed_cash_states() { - - driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf(Cash::class, CashSchemaV1::class).map { it.packageName })) { - - defaultNotaryNode.getOrThrow() + fun `unconsumed cash states`() { + driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) { val node = startNode().getOrThrow() as InProcessImpl val issuerRef = OpaqueBytes.of(0) val issuedAmount = 1000.DOLLARS diff --git a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionPostgreSQLImpl.kt b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionPostgreSQLImpl.kt index 67e2db0377..6f85002a6c 100644 --- a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionPostgreSQLImpl.kt +++ b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionPostgreSQLImpl.kt @@ -11,6 +11,7 @@ package net.corda.finance.contracts.asset.cash.selection import net.corda.core.contracts.Amount +import net.corda.core.crypto.toStringShort import net.corda.core.identity.AbstractParty import net.corda.core.identity.Party import net.corda.core.utilities.* @@ -54,8 +55,12 @@ class CashSelectionPostgreSQLImpl : AbstractCashSelection() { " AND vs.notary_name = ?" else "") + (if (onlyFromIssuerParties.isNotEmpty()) " AND ccs.issuer_key_hash = ANY (?)" else "") + - (if (withIssuerRefs.isNotEmpty()) - " AND ccs.issuer_ref = ANY (?)" else "") + + (if (withIssuerRefs.isNotEmpty()) { + val repeats = generateSequence { "?" } + .take(withIssuerRefs.size) + .joinToString(",") + " AND ccs.issuer_ref IN ($repeats)" + } else "") + """) nested WHERE nested.total < ? """ @@ -70,14 +75,12 @@ class CashSelectionPostgreSQLImpl : AbstractCashSelection() { } if (onlyFromIssuerParties.isNotEmpty()) { val issuerKeys = connection.createArrayOf("VARCHAR", onlyFromIssuerParties.map - { it.owningKey.toBase58String() }.toTypedArray()) + { it.owningKey.toStringShort() }.toTypedArray()) statement.setArray(3 + paramOffset, issuerKeys) paramOffset += 1 } - if (withIssuerRefs.isNotEmpty()) { - val issuerRefs = connection.createArrayOf("BYTEA", withIssuerRefs.map - { it.bytes }.toTypedArray()) - statement.setArray(3 + paramOffset, issuerRefs) + withIssuerRefs.map { it.bytes }.forEach { + statement.setBytes( 3 + paramOffset, it) paramOffset += 1 } statement.setLong(3 + paramOffset, amount.quantity) diff --git a/node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt b/node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt index bb51d257ad..f1fc551dfe 100644 --- a/node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt @@ -66,14 +66,10 @@ class FlowsDrainingModeContentionTest : IntegrationTest() { @Test fun `draining mode does not deadlock with acks between 2 nodes`() { - val message = "Ground control to Major Tom" - driver(DriverParameters(isDebug = true, startNodesInProcess = true, portAllocation = portAllocation, extraCordappPackagesToScan = listOf(MessageState::class.packageName))) { - val nodeA = startNode(providedName = ALICE_NAME, rpcUsers = users).getOrThrow() val nodeB = startNode(providedName = BOB_NAME, rpcUsers = users).getOrThrow() - defaultNotaryNode.getOrThrow() val nodeARpcInfo = RpcInfo(nodeA.rpcAddress, user.username, user.password) val flow = nodeA.rpc.startFlow(::ProposeTransactionAndWaitForCommit, message, nodeARpcInfo, nodeB.nodeInfo.singleIdentity(), defaultNotaryIdentity) diff --git a/node/src/integration-test/kotlin/net/corda/node/persistence/NodeStatePersistenceTests.kt b/node/src/integration-test/kotlin/net/corda/node/persistence/NodeStatePersistenceTests.kt index 96b4fd4f4b..e32ada9538 100644 --- a/node/src/integration-test/kotlin/net/corda/node/persistence/NodeStatePersistenceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/persistence/NodeStatePersistenceTests.kt @@ -62,8 +62,6 @@ class NodeStatePersistenceTests : IntegrationTest() { val nodeName = { val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow() val nodeName = nodeHandle.nodeInfo.singleIdentity().name - // Ensure the notary node has finished starting up, before starting a flow that needs a notary - defaultNotaryNode.getOrThrow() CordaRPCClient(nodeHandle.rpcAddress).start(user.username, user.password).use { it.proxy.startFlow(::SendMessageFlow, message, defaultNotaryIdentity).returnValue.getOrThrow() } @@ -96,8 +94,6 @@ class NodeStatePersistenceTests : IntegrationTest() { val nodeName = { val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow() val nodeName = nodeHandle.nodeInfo.singleIdentity().name - // Ensure the notary node has finished starting up, before starting a flow that needs a notary - defaultNotaryNode.getOrThrow() CordaRPCClient(nodeHandle.rpcAddress).start(user.username, user.password).use { it.proxy.startFlow(::SendMessageFlow, message, defaultNotaryIdentity).returnValue.getOrThrow() } diff --git a/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt b/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt index 49975cb02e..d2254dbe7f 100644 --- a/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt @@ -103,16 +103,11 @@ class NodeRegistrationTest : IntegrationTest() { notarySpecs = listOf(NotarySpec(notaryName)), extraCordappPackagesToScan = listOf("net.corda.finance") ) { - val nodes = listOf( + val (alice, genevieve) = listOf( startNode(providedName = aliceName), - startNode(providedName = genevieveName), - defaultNotaryNode + startNode(providedName = genevieveName) ).transpose().getOrThrow() - log.info("Nodes started") - - val (alice, genevieve) = nodes - assertThat(registrationHandler.idsPolled).containsOnly( aliceName.organisation, genevieveName.organisation, diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index fe6da562d7..9241482528 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -38,11 +38,8 @@ import net.corda.node.internal.security.RPCSecurityManagerWithAdditionalUser import net.corda.node.serialization.KryoServerSerializationScheme import net.corda.node.services.api.NodePropertiesStore import net.corda.node.services.api.SchemaService -import net.corda.node.services.config.NodeConfiguration -import net.corda.node.services.config.SecurityConfiguration -import net.corda.node.services.config.VerifierType +import net.corda.node.services.config.* import net.corda.node.services.config.shell.localShellUser -import net.corda.node.services.config.shouldInitCrashShell import net.corda.node.services.messaging.* import net.corda.node.services.rpc.ArtemisRpcBroker import net.corda.node.services.transactions.InMemoryTransactionVerifierService @@ -174,7 +171,7 @@ open class Node(configuration: NodeConfiguration, val securityManagerConfig = configuration.security?.authService ?: SecurityConfiguration.AuthService.fromUsers(configuration.rpcUsers) securityManager = with(RPCSecurityManagerImpl(securityManagerConfig)) { - if (configuration.shouldInitCrashShell()) RPCSecurityManagerWithAdditionalUser(this, localShellUser()) else this + if (configuration.shouldStartLocalShell()) RPCSecurityManagerWithAdditionalUser(this, localShellUser()) else this } if (!configuration.messagingServerExternal) { diff --git a/node/src/test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt index eea4fe8edc..c8dd22f3eb 100644 --- a/node/src/test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt @@ -117,8 +117,8 @@ class NodeSchemaServiceTest { @Test fun `check node runs inclusive of notary node schema set using driverDSL`() { driver(DriverParameters(startNodesInProcess = true)) { - val notaryNode = defaultNotaryNode.getOrThrow().rpc.startFlow(::MappedSchemasFlow) - val mappedSchemas = notaryNode.returnValue.getOrThrow() + val notary = defaultNotaryNode.getOrThrow() + val mappedSchemas = notary.rpc.startFlow(::MappedSchemasFlow).returnValue.getOrThrow() // check against NodeCore + NodeNotary Schemas assertTrue(mappedSchemas.contains(NodeCoreV1.name)) assertTrue(mappedSchemas.contains(NodeNotaryV1.name)) diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt index 87cd8ab449..3f4a15c701 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt @@ -19,15 +19,14 @@ import net.corda.core.internal.packageName import net.corda.core.node.services.* import net.corda.core.node.services.vault.* import net.corda.core.node.services.vault.QueryCriteria.* -import net.corda.core.utilities.NonEmptySet -import net.corda.core.utilities.days -import net.corda.core.utilities.seconds -import net.corda.core.utilities.toHexString +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.* import net.corda.finance.* import net.corda.finance.contracts.CommercialPaper import net.corda.finance.contracts.Commodity import net.corda.finance.contracts.DealState import net.corda.finance.contracts.asset.Cash +import net.corda.finance.contracts.asset.cash.selection.AbstractCashSelection import net.corda.finance.sampleschemas.SampleCashSchemaV3 import net.corda.finance.schemas.CashSchemaV1 import net.corda.finance.schemas.CashSchemaV1.PersistentCashState @@ -52,7 +51,6 @@ import java.time.LocalDate import java.time.ZoneOffset import java.time.temporal.ChronoUnit import java.util.* -import kotlin.test.assertTrue open class VaultQueryTests { private companion object { @@ -2064,6 +2062,39 @@ open class VaultQueryTests { } } + @Test + fun `unconsumedCashStatesForSpending_single_issuer_reference`() { + database.transaction { + vaultFiller.fillWithSomeTestCash(1000.DOLLARS, notaryServices, 1, DUMMY_CASH_ISSUER) + } + database.transaction { + val builder = TransactionBuilder() + val issuer = DUMMY_CASH_ISSUER + val exitStates = AbstractCashSelection + .getInstance { services.jdbcSession().metaData } + .unconsumedCashStatesForSpending(services, 300.DOLLARS, setOf(issuer.party), + builder.notary, builder.lockId, setOf(issuer.reference)) + + assertThat(exitStates).hasSize(1) + assertThat(exitStates[0].state.data.amount.quantity).isEqualTo(100000) + } + } + + @Test + fun `unconsumedCashStatesForSpending_single_issuer_reference_not_matching`() { + database.transaction { + vaultFiller.fillWithSomeTestCash(1000.DOLLARS, notaryServices, 1, DUMMY_CASH_ISSUER) + } + database.transaction { + val builder = TransactionBuilder() + val issuer = DUMMY_CASH_ISSUER + val exitStates = AbstractCashSelection + .getInstance { services.jdbcSession().metaData } + .unconsumedCashStatesForSpending(services, 300.DOLLARS, setOf(issuer.party), + builder.notary, builder.lockId, setOf(OpaqueBytes.of(13))) + assertThat(exitStates).hasSize(0) + } + } /** * USE CASE demonstrations (outside of mainline Corda) * diff --git a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt index 93badf7b85..a123337b7b 100644 --- a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt +++ b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt @@ -54,7 +54,6 @@ class TraderDemoTest : IntegrationTest() { startFlow(), all())) driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) { - defaultNotaryNode.getOrThrow() val (nodeA, nodeB, bankNode) = listOf( startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)), startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)), diff --git a/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt b/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt index 6b5302e8f9..643eb8a38e 100644 --- a/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt +++ b/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt @@ -21,6 +21,7 @@ import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.getOrThrow import net.corda.node.internal.NodeStartup import net.corda.testing.common.internal.ProjectStructure.projectRootDir +import net.corda.testing.core.BOB_NAME import net.corda.testing.core.DUMMY_BANK_A_NAME import net.corda.testing.core.DUMMY_BANK_B_NAME import net.corda.testing.core.DUMMY_NOTARY_NAME @@ -89,9 +90,20 @@ class DriverTests : IntegrationTest() { } } + @Test + fun `default notary is visible when the startNode future completes`() { + // Based on local testing, running this 3 times gives us a high confidence that we'll spot if the feature is not working + repeat(3) { + driver(DriverParameters(startNodesInProcess = true)) { + val bob = startNode(providedName = BOB_NAME).getOrThrow() + assertThat(bob.rpc.networkMapSnapshot().flatMap { it.legalIdentities }).contains(defaultNotaryIdentity) + } + } + } + @Test fun `random free port allocation`() { - val nodeHandle = driver(DriverParameters(portAllocation = RandomFree)) { + val nodeHandle = driver(DriverParameters(portAllocation = RandomFree, notarySpecs = emptyList())) { val nodeInfo = startNode(providedName = DUMMY_BANK_A_NAME) nodeMustBeUp(nodeInfo) } @@ -103,10 +115,7 @@ class DriverTests : IntegrationTest() { // Make sure we're using the log4j2 config which writes to the log file val logConfigFile = projectRootDir / "config" / "dev" / "log4j2.xml" assertThat(logConfigFile).isRegularFile() - driver(DriverParameters( - isDebug = true, - systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()) - )) { + driver(DriverParameters(isDebug = true,notarySpecs = emptyList(), systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) { val baseDirectory = startNode(providedName = DUMMY_BANK_A_NAME).getOrThrow().baseDirectory val logFile = (baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).list { it.sorted().findFirst().get() } val debugLinesPresent = logFile.readLines { lines -> lines.anyMatch { line -> line.startsWith("[DEBUG]") } } @@ -116,7 +125,7 @@ class DriverTests : IntegrationTest() { @Test fun `monitoring mode enables jolokia exporting of JMX metrics via HTTP JSON`() { - driver(DriverParameters(startNodesInProcess = false, jmxPolicy = JmxPolicy(true))) { + driver(DriverParameters(startNodesInProcess = false, jmxPolicy = JmxPolicy(true), notarySpecs = emptyList())) { // start another node so we gain access to node JMX metrics val webAddress = NetworkHostAndPort("localhost", 7006) startNode(providedName = DUMMY_REGULATOR_NAME, @@ -145,33 +154,32 @@ class DriverTests : IntegrationTest() { @Test fun `driver rejects multiple nodes with the same name`() { - - driver(DriverParameters(startNodesInProcess = true)) { - - assertThatThrownBy { listOf(newNode(DUMMY_BANK_A_NAME)(), newNode(DUMMY_BANK_B_NAME)(), newNode(DUMMY_BANK_A_NAME)()).transpose().getOrThrow() }.isInstanceOf(IllegalArgumentException::class.java) + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { + assertThatThrownBy { + listOf( + newNode(DUMMY_BANK_A_NAME)(), + newNode(DUMMY_BANK_B_NAME)(), + newNode(DUMMY_BANK_A_NAME)() + ).transpose().getOrThrow() + }.isInstanceOf(IllegalArgumentException::class.java) } } @Test fun `driver rejects multiple nodes with the same name parallel`() { - - driver(DriverParameters(startNodesInProcess = true)) { - + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { val nodes = listOf(newNode(DUMMY_BANK_A_NAME), newNode(DUMMY_BANK_B_NAME), newNode(DUMMY_BANK_A_NAME)) - - assertThatThrownBy { nodes.parallelStream().map { it.invoke() }.toList().transpose().getOrThrow() }.isInstanceOf(IllegalArgumentException::class.java) + assertThatThrownBy { + nodes.parallelStream().map { it.invoke() }.toList().transpose().getOrThrow() + }.isInstanceOf(IllegalArgumentException::class.java) } } @Test fun `driver allows reusing names of nodes that have been stopped`() { - - driver(DriverParameters(startNodesInProcess = true)) { - + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { val nodeA = newNode(DUMMY_BANK_A_NAME)().getOrThrow() - nodeA.stop() - assertThatCode { newNode(DUMMY_BANK_A_NAME)().getOrThrow() }.doesNotThrowAnyException() } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt index 44deb54d9f..de3828a2de 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt @@ -78,7 +78,8 @@ interface DriverDSL { * @param maximumHeapSize The maximum JVM heap size to use for the node as a [String]. By default a number is interpreted * as being in bytes. Append the letter 'k' or 'K' to the value to indicate Kilobytes, 'm' or 'M' to indicate * megabytes, and 'g' or 'G' to indicate gigabytes. The default value is "512m" = 512 megabytes. - * @return A [CordaFuture] on the [NodeHandle] to the node. The future will complete when the node is available. + * @return A [CordaFuture] on the [NodeHandle] to the node. The future will complete when the node is available and + * it sees all previously started nodes, including the notaries. */ fun startNode( defaultParameters: NodeParameters = NodeParameters(), diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt index ee96e62db0..d5a24ee12a 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt @@ -29,14 +29,12 @@ import net.corda.core.messaging.CordaRPCOps import net.corda.core.node.NetworkParameters import net.corda.core.node.NotaryInfo import net.corda.core.node.services.NetworkMapCache -import net.corda.core.toFuture import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.millis import net.corda.node.NodeRegistrationOption import net.corda.node.internal.Node -import net.corda.node.internal.NodeStartup import net.corda.node.internal.StartedNode import net.corda.node.services.Permissions import net.corda.node.services.config.* @@ -67,8 +65,7 @@ import net.corda.testing.node.internal.DriverDSLImpl.ClusterType.NON_VALIDATING_ import net.corda.testing.node.internal.DriverDSLImpl.ClusterType.VALIDATING_RAFT import okhttp3.OkHttpClient import okhttp3.Request -import rx.Observable -import rx.observables.ConnectableObservable +import rx.Subscription import rx.schedulers.Schedulers import java.lang.management.ManagementFactory import java.net.ConnectException @@ -83,11 +80,10 @@ import java.time.Instant import java.time.ZoneOffset.UTC import java.time.format.DateTimeFormatter import java.util.* -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicInteger +import kotlin.collections.HashMap import kotlin.concurrent.thread import net.corda.nodeapi.internal.config.User as InternalUser @@ -112,12 +108,11 @@ class DriverDSLImpl( override val shutdownManager get() = _shutdownManager!! private val cordappPackages = extraCordappPackagesToScan + getCallerPackage() // Map from a nodes legal name to an observable emitting the number of nodes in its network map. - private val countObservables = ConcurrentHashMap>() - private val nodeNames = mutableSetOf() + private val networkVisibilityController = NetworkVisibilityController() /** - * Future which completes when the network map is available, whether a local one or one from the CZ. This future acts - * as a gate to prevent nodes from starting too early. The value of the future is a [LocalNetworkMap] object, which - * is null if the network map is being provided by the CZ. + * Future which completes when the network map infrastructure is available, whether a local one or one from the CZ. + * This future acts as a gate to prevent nodes from starting too early. The value of the future is a [LocalNetworkMap] + * object, which is null if the network map is being provided by the CZ. */ private lateinit var networkMapAvailability: CordaFuture private lateinit var _notaries: CordaFuture> @@ -130,13 +125,9 @@ class DriverDSLImpl( private val state = ThreadBox(State()) //TODO: remove this once we can bundle quasar properly. - private val quasarJarPath: String by lazy { - resolveJar(".*quasar.*\\.jar$") - } + private val quasarJarPath: String by lazy { resolveJar(".*quasar.*\\.jar$") } - private val jolokiaJarPath: String by lazy { - resolveJar(".*jolokia-jvm-.*-agent\\.jar$") - } + private val jolokiaJarPath: String by lazy { resolveJar(".*jolokia-jvm-.*-agent\\.jar$") } private fun resolveJar(jarNamePattern: String): String { return try { @@ -199,12 +190,7 @@ class DriverDSLImpl( val p2pAddress = portAllocation.nextHostAndPort() // TODO: Derive name from the full picked name, don't just wrap the common name val name = providedName ?: CordaX500Name("${oneOf(names).organisation}-${p2pAddress.port}", "London", "GB") - synchronized(nodeNames) { - val wasANewNode = nodeNames.add(name) - if (!wasANewNode) { - throw IllegalArgumentException("Node with name $name is already started or starting.") - } - } + val registrationFuture = if (compatibilityZone?.rootCert != null) { // We don't need the network map to be available to be able to register the node startNodeRegistration(name, compatibilityZone.rootCert, compatibilityZone.url) @@ -272,14 +258,20 @@ class DriverDSLImpl( return if (startNodesInProcess) { executorService.fork { - NetworkRegistrationHelper(config.corda, HTTPNetworkRegistrationService(compatibilityZoneURL), NodeRegistrationOption(rootTruststorePath, rootTruststorePassword)).buildKeystore() + NetworkRegistrationHelper( + config.corda, + HTTPNetworkRegistrationService(compatibilityZoneURL), + NodeRegistrationOption(rootTruststorePath, rootTruststorePassword) + ).buildKeystore() config } } else { - startOutOfProcessMiniNode(config, + startOutOfProcessMiniNode( + config, "--initial-registration", "--network-root-truststore=${rootTruststorePath.toAbsolutePath()}", - "--network-root-truststore-password=$rootTruststorePassword").map { config } + "--network-root-truststore-password=$rootTruststorePassword" + ).map { config } } } @@ -581,54 +573,6 @@ class DriverDSLImpl( return driverDirectory / nodeDirectoryName } - /** - * @nodeName the name of the node which performs counting - * @param initial number of nodes currently in the network map of a running node. - * @param networkMapCacheChangeObservable an observable returning the updates to the node network map. - * @return a [ConnectableObservable] which emits a new [Int] every time the number of registered nodes changes - * the initial value emitted is always [initial] - */ - private fun nodeCountObservable(nodeName: CordaX500Name, initial: Int, networkMapCacheChangeObservable: Observable): - ConnectableObservable { - val count = AtomicInteger(initial) - return networkMapCacheChangeObservable.map { - log.debug("nodeCountObservable for '$nodeName' received '$it'") - when (it) { - is NetworkMapCache.MapChange.Added -> count.incrementAndGet() - is NetworkMapCache.MapChange.Removed -> count.decrementAndGet() - is NetworkMapCache.MapChange.Modified -> count.get() - } - }.startWith(initial).replay() - } - - /** - * @param rpc the [CordaRPCOps] of a newly started node. - * @return a [CordaFuture] which resolves when every node started by driver has in its network map a number of nodes - * equal to the number of running nodes. The future will yield the number of connected nodes. - */ - private fun allNodesConnected(rpc: CordaRPCOps): CordaFuture { - val (snapshot, updates) = rpc.networkMapFeed() - val nodeName = rpc.nodeInfo().legalIdentities[0].name - val counterObservable = nodeCountObservable(nodeName, snapshot.size, updates) - countObservables[nodeName] = counterObservable - /* TODO: this might not always be the exact number of nodes one has to wait for, - * for example in the following sequence - * 1 start 3 nodes in order, A, B, C. - * 2 before the future returned by this function resolves, kill B - * At that point this future won't ever resolve as it will wait for nodes to know 3 other nodes. - */ - val requiredNodes = countObservables.size - - // This is an observable which yield the minimum number of nodes in each node network map. - val smallestSeenNetworkMapSize = Observable.combineLatest(countObservables.values.toList()) { args: Array -> - log.debug("smallestSeenNetworkMapSize for '$nodeName' is: ${args.toList()}") - args.map { it as Int }.min() ?: 0 - } - val future = smallestSeenNetworkMapSize.filter { it >= requiredNodes }.toFuture() - counterObservable.connect() - return future - } - /** * Start the node with the given flag which is expected to start the node for some function, which once complete will * terminate the node. @@ -658,16 +602,14 @@ class DriverDSLImpl( startInProcess: Boolean?, maximumHeapSize: String, localNetworkMap: LocalNetworkMap?): CordaFuture { + val visibilityHandle = networkVisibilityController.register(config.corda.myLegalName) val baseDirectory = config.corda.baseDirectory.createDirectories() localNetworkMap?.networkParametersCopier?.install(baseDirectory) localNetworkMap?.nodeInfosCopier?.addConfig(baseDirectory) val onNodeExit: () -> Unit = { localNetworkMap?.nodeInfosCopier?.removeConfig(baseDirectory) - countObservables.remove(config.corda.myLegalName) - synchronized(nodeNames) { - nodeNames.remove(config.corda.myLegalName) - } + visibilityHandle.close() } val useHTTPS = config.typesafe.run { hasPath("useHTTPS") && getBoolean("useHTTPS") } @@ -684,7 +626,7 @@ class DriverDSLImpl( ) return nodeAndThreadFuture.flatMap { (node, thread) -> establishRpc(config, openFuture()).flatMap { rpc -> - allNodesConnected(rpc).map { + visibilityHandle.listen(rpc).map { InProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, thread, onNodeExit, node) } } @@ -707,12 +649,13 @@ class DriverDSLImpl( } establishRpc(config, processDeathFuture).flatMap { rpc -> // Check for all nodes to have all other nodes in background in case RPC is failing over: - val networkMapFuture = executorService.fork { allNodesConnected(rpc) }.flatMap { it } + val networkMapFuture = executorService.fork { visibilityHandle.listen(rpc) }.flatMap { it } firstOf(processDeathFuture, networkMapFuture) { if (it == processDeathFuture) { throw ListenProcessDeathException(config.corda.p2pAddress, process) } - // Will interrupt polling for process death as this is no longer relevant since the process been successfully started and reflected itself in the NetworkMap. + // Will interrupt polling for process death as this is no longer relevant since the process been + // successfully started and reflected itself in the NetworkMap. processDeathFuture.cancel(true) log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: $webAddress") OutOfProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, debugPort, process, onNodeExit) @@ -731,7 +674,7 @@ class DriverDSLImpl( /** * The local version of the network map, which is a bunch of classes that copy the relevant files to the node directories. */ - private inner class LocalNetworkMap(notaryInfos: List) { + inner class LocalNetworkMap(notaryInfos: List) { val networkParametersCopier = NetworkParametersCopier(networkParameters.copy(notaries = notaryInfos)) // TODO: this object will copy NodeInfo files from started nodes to other nodes additional-node-infos/ // This uses the FileSystem and adds a delay (~5 seconds) given by the time we wait before polling the file system. @@ -743,12 +686,12 @@ class DriverDSLImpl( * Simple holder class to capture the node configuration both as the raw [Config] object and the parsed [NodeConfiguration]. * Keeping [Config] around is needed as the user may specify extra config options not specified in [NodeConfiguration]. */ - private class NodeConfig(val typesafe: Config, val corda: NodeConfiguration = typesafe.parseAsNodeConfiguration().also { nodeConfiguration -> - val errors = nodeConfiguration.validate() - if (errors.isNotEmpty()) { - throw IllegalStateException("Invalid node configuration. Errors where:${System.lineSeparator()}${errors.joinToString(System.lineSeparator())}") + private class NodeConfig(val typesafe: Config, val corda: NodeConfiguration = typesafe.parseAsNodeConfiguration()) { + init { + val errors = corda.validate() + require(errors.isEmpty()) { "Invalid node configuration. Errors where:\n${errors.joinToString("\n")}" } } - }) + } companion object { internal val log = contextLogger() @@ -916,6 +859,63 @@ class DriverDSLImpl( } } +/** + * Keeps track of how many nodes each node sees and gates nodes from completing their startNode [CordaFuture] until all + * current nodes see everyone. + */ +private class NetworkVisibilityController { + private val nodeVisibilityHandles = ThreadBox(HashMap()) + + fun register(name: CordaX500Name): VisibilityHandle { + val handle = VisibilityHandle() + nodeVisibilityHandles.locked { + require(putIfAbsent(name, handle) == null) { "Node with name $name is already started or starting" } + } + return handle + } + + private fun checkIfAllVisible() { + nodeVisibilityHandles.locked { + val minView = values.stream().mapToInt { it.visibleNodeCount }.min().orElse(0) + if (minView >= size) { + values.forEach { it.future.set(Unit) } + } + } + } + + inner class VisibilityHandle : AutoCloseable { + internal val future = openFuture() + internal var visibleNodeCount = 0 + private var subscription: Subscription? = null + + fun listen(rpc: CordaRPCOps): CordaFuture { + check(subscription == null) + val (snapshot, updates) = rpc.networkMapFeed() + visibleNodeCount = snapshot.size + checkIfAllVisible() + subscription = updates.subscribe { when (it) { + is NetworkMapCache.MapChange.Added -> { + visibleNodeCount++ + checkIfAllVisible() + } + is NetworkMapCache.MapChange.Removed -> { + visibleNodeCount-- + checkIfAllVisible() + } + } } + return future + } + + override fun close() { + subscription?.unsubscribe() + nodeVisibilityHandles.locked { + values -= this@VisibilityHandle + checkIfAllVisible() + } + } + } +} + interface InternalDriverDSL : DriverDSL, CordformContext { private companion object { private val DEFAULT_POLL_INTERVAL = 500.millis