From 2dc2a8dc1907753aa2452864d61c2c53a843dd8b Mon Sep 17 00:00:00 2001 From: szymonsztuka Date: Wed, 11 Apr 2018 13:36:29 +0100 Subject: [PATCH 1/4] ENT-1727 Fix cash selection with PostgreSQL. (#2949) Change conversion to toStringShort() instead of toBase58String() - as done for H2 Cash Selection. Fix withIssuerRefs case - iterate via list of IssuerRefs and setBytes instead of setArray of BYTEA. Add test for Cash Selection with issuerRef. --- .../selection/CashSelectionPostgreSQLImpl.kt | 17 ++++---- .../node/services/vault/VaultQueryTests.kt | 40 +++++++++++++++++-- 2 files changed, 46 insertions(+), 11 deletions(-) diff --git a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionPostgreSQLImpl.kt b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionPostgreSQLImpl.kt index fcefd25f83..2fe5a6d0aa 100644 --- a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionPostgreSQLImpl.kt +++ b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionPostgreSQLImpl.kt @@ -1,6 +1,7 @@ package net.corda.finance.contracts.asset.cash.selection import net.corda.core.contracts.Amount +import net.corda.core.crypto.toStringShort import net.corda.core.identity.AbstractParty import net.corda.core.identity.Party import net.corda.core.utilities.* @@ -44,8 +45,12 @@ class CashSelectionPostgreSQLImpl : AbstractCashSelection() { " AND vs.notary_name = ?" else "") + (if (onlyFromIssuerParties.isNotEmpty()) " AND ccs.issuer_key_hash = ANY (?)" else "") + - (if (withIssuerRefs.isNotEmpty()) - " AND ccs.issuer_ref = ANY (?)" else "") + + (if (withIssuerRefs.isNotEmpty()) { + val repeats = generateSequence { "?" } + .take(withIssuerRefs.size) + .joinToString(",") + " AND ccs.issuer_ref IN ($repeats)" + } else "") + """) nested WHERE nested.total < ? """ @@ -60,14 +65,12 @@ class CashSelectionPostgreSQLImpl : AbstractCashSelection() { } if (onlyFromIssuerParties.isNotEmpty()) { val issuerKeys = connection.createArrayOf("VARCHAR", onlyFromIssuerParties.map - { it.owningKey.toBase58String() }.toTypedArray()) + { it.owningKey.toStringShort() }.toTypedArray()) statement.setArray(3 + paramOffset, issuerKeys) paramOffset += 1 } - if (withIssuerRefs.isNotEmpty()) { - val issuerRefs = connection.createArrayOf("BYTEA", withIssuerRefs.map - { it.bytes }.toTypedArray()) - statement.setArray(3 + paramOffset, issuerRefs) + withIssuerRefs.map { it.bytes }.forEach { + statement.setBytes( 3 + paramOffset, it) paramOffset += 1 } statement.setLong(3 + paramOffset, amount.quantity) diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt index 9f8663c47d..92e33ab2e9 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt @@ -9,15 +9,14 @@ import net.corda.core.internal.packageName import net.corda.core.node.services.* import net.corda.core.node.services.vault.* import net.corda.core.node.services.vault.QueryCriteria.* -import net.corda.core.utilities.NonEmptySet -import net.corda.core.utilities.days -import net.corda.core.utilities.seconds -import net.corda.core.utilities.toHexString +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.* import net.corda.finance.* import net.corda.finance.contracts.CommercialPaper import net.corda.finance.contracts.Commodity import net.corda.finance.contracts.DealState import net.corda.finance.contracts.asset.Cash +import net.corda.finance.contracts.asset.cash.selection.AbstractCashSelection import net.corda.finance.schemas.CashSchemaV1 import net.corda.finance.schemas.CashSchemaV1.PersistentCashState import net.corda.finance.schemas.CommercialPaperSchemaV1 @@ -2056,6 +2055,39 @@ class VaultQueryTests { } } + @Test + fun `unconsumedCashStatesForSpending_single_issuer_reference`() { + database.transaction { + vaultFiller.fillWithSomeTestCash(1000.DOLLARS, notaryServices, 1, DUMMY_CASH_ISSUER) + } + database.transaction { + val builder = TransactionBuilder() + val issuer = DUMMY_CASH_ISSUER + val exitStates = AbstractCashSelection + .getInstance { services.jdbcSession().metaData } + .unconsumedCashStatesForSpending(services, 300.DOLLARS, setOf(issuer.party), + builder.notary, builder.lockId, setOf(issuer.reference)) + + assertThat(exitStates).hasSize(1) + assertThat(exitStates[0].state.data.amount.quantity).isEqualTo(100000) + } + } + + @Test + fun `unconsumedCashStatesForSpending_single_issuer_reference_not_matching`() { + database.transaction { + vaultFiller.fillWithSomeTestCash(1000.DOLLARS, notaryServices, 1, DUMMY_CASH_ISSUER) + } + database.transaction { + val builder = TransactionBuilder() + val issuer = DUMMY_CASH_ISSUER + val exitStates = AbstractCashSelection + .getInstance { services.jdbcSession().metaData } + .unconsumedCashStatesForSpending(services, 300.DOLLARS, setOf(issuer.party), + builder.notary, builder.lockId, setOf(OpaqueBytes.of(13))) + assertThat(exitStates).hasSize(0) + } + } /** * USE CASE demonstrations (outside of mainline Corda) * From f6e79cdbea276eeaf3ef2b44cfff61df3878a17a Mon Sep 17 00:00:00 2001 From: Chris Rankin Date: Wed, 11 Apr 2018 13:38:06 +0100 Subject: [PATCH 2/4] Upgrade to H2 1.4.197. (#2918) --- build.gradle | 2 +- docs/source/changelog.rst | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 55d66b0994..d2d38f8fde 100644 --- a/build.gradle +++ b/build.gradle @@ -56,7 +56,7 @@ buildscript { ext.jopt_simple_version = '5.0.2' ext.jansi_version = '1.14' ext.hibernate_version = '5.2.6.Final' - ext.h2_version = '1.4.194' // Update docs if renamed or removed. + ext.h2_version = '1.4.197' // Update docs if renamed or removed. ext.postgresql_version = '42.1.4' ext.rxjava_version = '1.2.4' ext.dokka_version = '0.9.16-eap-2' diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 2f967ce0ca..64cd538e2d 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -44,6 +44,8 @@ Unreleased * java.security.cert.X509CRL serialization support added. +* Upgraded H2 to v1.4.197. + * Shell (embedded available only in dev mode or via SSH) connects to the node via RPC instead of using the ``CordaRPCOps`` object directly. To enable RPC connectivity ensure node’s ``rpcSettings.address`` and ``rpcSettings.adminAddress`` settings are present. From b46c3b89bdcd36ed6555fe0a77b77d30ae7eb06e Mon Sep 17 00:00:00 2001 From: szymonsztuka Date: Wed, 11 Apr 2018 14:06:13 +0100 Subject: [PATCH 3/4] CORDA-1326 Add default shell user only when local shell is started. (#2953) In productionMode ssh could login to a node using "shell/shell" which is supposed to be available only in dev mode (to enable embedded shell). --- node/src/main/kotlin/net/corda/node/internal/Node.kt | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 0c05f24bc7..0ea0fc399b 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -28,11 +28,8 @@ import net.corda.node.internal.security.RPCSecurityManagerWithAdditionalUser import net.corda.node.serialization.KryoServerSerializationScheme import net.corda.node.services.api.NodePropertiesStore import net.corda.node.services.api.SchemaService -import net.corda.node.services.config.NodeConfiguration -import net.corda.node.services.config.SecurityConfiguration -import net.corda.node.services.config.VerifierType +import net.corda.node.services.config.* import net.corda.node.services.config.shell.localShellUser -import net.corda.node.services.config.shouldInitCrashShell import net.corda.node.services.messaging.* import net.corda.node.services.rpc.ArtemisRpcBroker import net.corda.node.services.transactions.InMemoryTransactionVerifierService @@ -163,7 +160,7 @@ open class Node(configuration: NodeConfiguration, val securityManagerConfig = configuration.security?.authService ?: SecurityConfiguration.AuthService.fromUsers(configuration.rpcUsers) securityManager = with(RPCSecurityManagerImpl(securityManagerConfig)) { - if (configuration.shouldInitCrashShell()) RPCSecurityManagerWithAdditionalUser(this, localShellUser()) else this + if (configuration.shouldStartLocalShell()) RPCSecurityManagerWithAdditionalUser(this, localShellUser()) else this } if (!configuration.messagingServerExternal) { From f88542faa21e067f6656403cad89630f476e9a84 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Wed, 11 Apr 2018 15:33:55 +0100 Subject: [PATCH 4/4] CORDA-1095: Fixed rare race where the startNode future completes before the default notary is visible (#2947) --- .../corda/finance/flows/CashSelectionTest.kt | 10 +- .../FlowsDrainingModeContentionTest.kt | 4 - .../persistence/NodeStatePersistenceTests.kt | 4 - .../registration/NodeRegistrationTest.kt | 9 +- .../services/schema/NodeSchemaServiceTest.kt | 4 +- .../net/corda/traderdemo/TraderDemoTest.kt | 1 - .../net/corda/testing/driver/DriverTests.kt | 58 +++--- .../net/corda/testing/driver/DriverDSL.kt | 3 +- .../testing/node/internal/DriverDSLImpl.kt | 174 +++++++++--------- 9 files changed, 130 insertions(+), 137 deletions(-) diff --git a/finance/src/integration-test/kotlin/net/corda/finance/flows/CashSelectionTest.kt b/finance/src/integration-test/kotlin/net/corda/finance/flows/CashSelectionTest.kt index 948bd80f22..14a6531d25 100644 --- a/finance/src/integration-test/kotlin/net/corda/finance/flows/CashSelectionTest.kt +++ b/finance/src/integration-test/kotlin/net/corda/finance/flows/CashSelectionTest.kt @@ -1,13 +1,10 @@ package net.corda.finance.flows -import net.corda.core.internal.packageName import net.corda.core.messaging.startFlow import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.getOrThrow import net.corda.finance.DOLLARS -import net.corda.finance.contracts.asset.Cash import net.corda.finance.contracts.getCashBalance -import net.corda.finance.schemas.CashSchemaV1 import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.driver import net.corda.testing.driver.internal.InProcessImpl @@ -17,11 +14,8 @@ import org.junit.Test class CashSelectionTest { @Test - fun unconsumed_cash_states() { - - driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf(Cash::class, CashSchemaV1::class).map { it.packageName })) { - - defaultNotaryNode.getOrThrow() + fun `unconsumed cash states`() { + driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) { val node = startNode().getOrThrow() as InProcessImpl val issuerRef = OpaqueBytes.of(0) val issuedAmount = 1000.DOLLARS diff --git a/node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt b/node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt index 27e542d5a8..7a3ed586cd 100644 --- a/node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt @@ -52,14 +52,10 @@ class FlowsDrainingModeContentionTest { @Test fun `draining mode does not deadlock with acks between 2 nodes`() { - val message = "Ground control to Major Tom" - driver(DriverParameters(isDebug = true, startNodesInProcess = true, portAllocation = portAllocation, extraCordappPackagesToScan = listOf(MessageState::class.packageName))) { - val nodeA = startNode(providedName = ALICE_NAME, rpcUsers = users).getOrThrow() val nodeB = startNode(providedName = BOB_NAME, rpcUsers = users).getOrThrow() - defaultNotaryNode.getOrThrow() val nodeARpcInfo = RpcInfo(nodeA.rpcAddress, user.username, user.password) val flow = nodeA.rpc.startFlow(::ProposeTransactionAndWaitForCommit, message, nodeARpcInfo, nodeB.nodeInfo.singleIdentity(), defaultNotaryIdentity) diff --git a/node/src/integration-test/kotlin/net/corda/node/persistence/NodeStatePersistenceTests.kt b/node/src/integration-test/kotlin/net/corda/node/persistence/NodeStatePersistenceTests.kt index 7f7153d94c..27b2f22f78 100644 --- a/node/src/integration-test/kotlin/net/corda/node/persistence/NodeStatePersistenceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/persistence/NodeStatePersistenceTests.kt @@ -42,8 +42,6 @@ class NodeStatePersistenceTests { val nodeName = { val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow() val nodeName = nodeHandle.nodeInfo.singleIdentity().name - // Ensure the notary node has finished starting up, before starting a flow that needs a notary - defaultNotaryNode.getOrThrow() CordaRPCClient(nodeHandle.rpcAddress).start(user.username, user.password).use { it.proxy.startFlow(::SendMessageFlow, message, defaultNotaryIdentity).returnValue.getOrThrow() } @@ -76,8 +74,6 @@ class NodeStatePersistenceTests { val nodeName = { val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow() val nodeName = nodeHandle.nodeInfo.singleIdentity().name - // Ensure the notary node has finished starting up, before starting a flow that needs a notary - defaultNotaryNode.getOrThrow() CordaRPCClient(nodeHandle.rpcAddress).start(user.username, user.password).use { it.proxy.startFlow(::SendMessageFlow, message, defaultNotaryIdentity).returnValue.getOrThrow() } diff --git a/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt b/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt index 685b47c057..0bfaa223a1 100644 --- a/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt @@ -90,16 +90,11 @@ class NodeRegistrationTest { notarySpecs = listOf(NotarySpec(notaryName)), extraCordappPackagesToScan = listOf("net.corda.finance") ) { - val nodes = listOf( + val (alice, genevieve) = listOf( startNode(providedName = aliceName), - startNode(providedName = genevieveName), - defaultNotaryNode + startNode(providedName = genevieveName) ).transpose().getOrThrow() - log.info("Nodes started") - - val (alice, genevieve) = nodes - assertThat(registrationHandler.idsPolled).containsOnly( aliceName.organisation, genevieveName.organisation, diff --git a/node/src/test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt index 2734ed17b0..b4536dd550 100644 --- a/node/src/test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/schema/NodeSchemaServiceTest.kt @@ -107,8 +107,8 @@ class NodeSchemaServiceTest { @Test fun `check node runs inclusive of notary node schema set using driverDSL`() { driver(DriverParameters(startNodesInProcess = true)) { - val notaryNode = defaultNotaryNode.getOrThrow().rpc.startFlow(::MappedSchemasFlow) - val mappedSchemas = notaryNode.returnValue.getOrThrow() + val notary = defaultNotaryNode.getOrThrow() + val mappedSchemas = notary.rpc.startFlow(::MappedSchemasFlow).returnValue.getOrThrow() // check against NodeCore + NodeNotary Schemas assertTrue(mappedSchemas.contains(NodeCoreV1.name)) assertTrue(mappedSchemas.contains(NodeNotaryV1.name)) diff --git a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt index c0d3844960..938e4243f5 100644 --- a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt +++ b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt @@ -34,7 +34,6 @@ class TraderDemoTest { startFlow(), all())) driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) { - defaultNotaryNode.getOrThrow() val (nodeA, nodeB, bankNode) = listOf( startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)), startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)), diff --git a/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt b/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt index 7d23099fa6..dd8a94dc4b 100644 --- a/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt +++ b/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt @@ -11,15 +11,13 @@ import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.getOrThrow import net.corda.node.internal.NodeStartup import net.corda.testing.common.internal.ProjectStructure.projectRootDir -import net.corda.testing.node.internal.addressMustBeBound -import net.corda.testing.node.internal.addressMustNotBeBound -import net.corda.testing.node.internal.internalDriver -import net.corda.testing.core.DUMMY_BANK_A_NAME -import net.corda.testing.core.DUMMY_BANK_B_NAME -import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.core.* import net.corda.testing.driver.internal.RandomFree import net.corda.testing.http.HttpApi import net.corda.testing.node.NotarySpec +import net.corda.testing.node.internal.addressMustBeBound +import net.corda.testing.node.internal.addressMustNotBeBound +import net.corda.testing.node.internal.internalDriver import org.assertj.core.api.Assertions.* import org.json.simple.JSONObject import org.junit.Test @@ -70,9 +68,20 @@ class DriverTests { } } + @Test + fun `default notary is visible when the startNode future completes`() { + // Based on local testing, running this 3 times gives us a high confidence that we'll spot if the feature is not working + repeat(3) { + driver(DriverParameters(startNodesInProcess = true)) { + val bob = startNode(providedName = BOB_NAME).getOrThrow() + assertThat(bob.rpc.networkMapSnapshot().flatMap { it.legalIdentities }).contains(defaultNotaryIdentity) + } + } + } + @Test fun `random free port allocation`() { - val nodeHandle = driver(DriverParameters(portAllocation = RandomFree)) { + val nodeHandle = driver(DriverParameters(portAllocation = RandomFree, notarySpecs = emptyList())) { val nodeInfo = startNode(providedName = DUMMY_BANK_A_NAME) nodeMustBeUp(nodeInfo) } @@ -84,7 +93,11 @@ class DriverTests { // Make sure we're using the log4j2 config which writes to the log file val logConfigFile = projectRootDir / "config" / "dev" / "log4j2.xml" assertThat(logConfigFile).isRegularFile() - driver(DriverParameters(isDebug = true, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) { + driver(DriverParameters( + isDebug = true, + notarySpecs = emptyList(), + systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()) + )) { val baseDirectory = startNode(providedName = DUMMY_BANK_A_NAME).getOrThrow().baseDirectory val logFile = (baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).list { it.sorted().findFirst().get() } val debugLinesPresent = logFile.readLines { lines -> lines.anyMatch { line -> line.startsWith("[DEBUG]") } } @@ -94,7 +107,7 @@ class DriverTests { @Test fun `monitoring mode enables jolokia exporting of JMX metrics via HTTP JSON`() { - driver(DriverParameters(startNodesInProcess = false)) { + driver(DriverParameters(startNodesInProcess = false, notarySpecs = emptyList())) { // start another node so we gain access to node JMX metrics val webAddress = NetworkHostAndPort("localhost", 7006) startNode(providedName = DUMMY_REGULATOR_NAME, @@ -123,33 +136,32 @@ class DriverTests { @Test fun `driver rejects multiple nodes with the same name`() { - - driver(DriverParameters(startNodesInProcess = true)) { - - assertThatThrownBy { listOf(newNode(DUMMY_BANK_A_NAME)(), newNode(DUMMY_BANK_B_NAME)(), newNode(DUMMY_BANK_A_NAME)()).transpose().getOrThrow() }.isInstanceOf(IllegalArgumentException::class.java) + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { + assertThatThrownBy { + listOf( + newNode(DUMMY_BANK_A_NAME)(), + newNode(DUMMY_BANK_B_NAME)(), + newNode(DUMMY_BANK_A_NAME)() + ).transpose().getOrThrow() + }.isInstanceOf(IllegalArgumentException::class.java) } } @Test fun `driver rejects multiple nodes with the same name parallel`() { - - driver(DriverParameters(startNodesInProcess = true)) { - + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { val nodes = listOf(newNode(DUMMY_BANK_A_NAME), newNode(DUMMY_BANK_B_NAME), newNode(DUMMY_BANK_A_NAME)) - - assertThatThrownBy { nodes.parallelStream().map { it.invoke() }.toList().transpose().getOrThrow() }.isInstanceOf(IllegalArgumentException::class.java) + assertThatThrownBy { + nodes.parallelStream().map { it.invoke() }.toList().transpose().getOrThrow() + }.isInstanceOf(IllegalArgumentException::class.java) } } @Test fun `driver allows reusing names of nodes that have been stopped`() { - - driver(DriverParameters(startNodesInProcess = true)) { - + driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { val nodeA = newNode(DUMMY_BANK_A_NAME)().getOrThrow() - nodeA.stop() - assertThatCode { newNode(DUMMY_BANK_A_NAME)().getOrThrow() }.doesNotThrowAnyException() } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt index 71c6db96b1..6f8e3c6dbf 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt @@ -68,7 +68,8 @@ interface DriverDSL { * @param maximumHeapSize The maximum JVM heap size to use for the node as a [String]. By default a number is interpreted * as being in bytes. Append the letter 'k' or 'K' to the value to indicate Kilobytes, 'm' or 'M' to indicate * megabytes, and 'g' or 'G' to indicate gigabytes. The default value is "512m" = 512 megabytes. - * @return A [CordaFuture] on the [NodeHandle] to the node. The future will complete when the node is available. + * @return A [CordaFuture] on the [NodeHandle] to the node. The future will complete when the node is available and + * it sees all previously started nodes, including the notaries. */ fun startNode( defaultParameters: NodeParameters = NodeParameters(), diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt index 06497dfc54..08f7028bb3 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt @@ -19,14 +19,12 @@ import net.corda.core.messaging.CordaRPCOps import net.corda.core.node.NetworkParameters import net.corda.core.node.NotaryInfo import net.corda.core.node.services.NetworkMapCache -import net.corda.core.toFuture import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.millis import net.corda.node.NodeRegistrationOption import net.corda.node.internal.Node -import net.corda.node.internal.NodeStartup import net.corda.node.internal.StartedNode import net.corda.node.services.Permissions import net.corda.node.services.config.* @@ -57,8 +55,7 @@ import net.corda.testing.node.internal.DriverDSLImpl.ClusterType.NON_VALIDATING_ import net.corda.testing.node.internal.DriverDSLImpl.ClusterType.VALIDATING_RAFT import okhttp3.OkHttpClient import okhttp3.Request -import rx.Observable -import rx.observables.ConnectableObservable +import rx.Subscription import rx.schedulers.Schedulers import java.lang.management.ManagementFactory import java.net.ConnectException @@ -73,11 +70,10 @@ import java.time.Instant import java.time.ZoneOffset.UTC import java.time.format.DateTimeFormatter import java.util.* -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicInteger +import kotlin.collections.HashMap import kotlin.concurrent.thread import net.corda.nodeapi.internal.config.User as InternalUser @@ -102,12 +98,11 @@ class DriverDSLImpl( override val shutdownManager get() = _shutdownManager!! private val cordappPackages = extraCordappPackagesToScan + getCallerPackage() // Map from a nodes legal name to an observable emitting the number of nodes in its network map. - private val countObservables = ConcurrentHashMap>() - private val nodeNames = mutableSetOf() + private val networkVisibilityController = NetworkVisibilityController() /** - * Future which completes when the network map is available, whether a local one or one from the CZ. This future acts - * as a gate to prevent nodes from starting too early. The value of the future is a [LocalNetworkMap] object, which - * is null if the network map is being provided by the CZ. + * Future which completes when the network map infrastructure is available, whether a local one or one from the CZ. + * This future acts as a gate to prevent nodes from starting too early. The value of the future is a [LocalNetworkMap] + * object, which is null if the network map is being provided by the CZ. */ private lateinit var networkMapAvailability: CordaFuture private lateinit var _notaries: CordaFuture> @@ -120,13 +115,9 @@ class DriverDSLImpl( private val state = ThreadBox(State()) //TODO: remove this once we can bundle quasar properly. - private val quasarJarPath: String by lazy { - resolveJar(".*quasar.*\\.jar$") - } + private val quasarJarPath: String by lazy { resolveJar(".*quasar.*\\.jar$") } - private val jolokiaJarPath: String by lazy { - resolveJar(".*jolokia-jvm-.*-agent\\.jar$") - } + private val jolokiaJarPath: String by lazy { resolveJar(".*jolokia-jvm-.*-agent\\.jar$") } private fun resolveJar(jarNamePattern: String): String { return try { @@ -189,12 +180,7 @@ class DriverDSLImpl( val p2pAddress = portAllocation.nextHostAndPort() // TODO: Derive name from the full picked name, don't just wrap the common name val name = providedName ?: CordaX500Name("${oneOf(names).organisation}-${p2pAddress.port}", "London", "GB") - synchronized(nodeNames) { - val wasANewNode = nodeNames.add(name) - if (!wasANewNode) { - throw IllegalArgumentException("Node with name $name is already started or starting.") - } - } + val registrationFuture = if (compatibilityZone?.rootCert != null) { // We don't need the network map to be available to be able to register the node startNodeRegistration(name, compatibilityZone.rootCert, compatibilityZone.url) @@ -262,14 +248,20 @@ class DriverDSLImpl( return if (startNodesInProcess) { executorService.fork { - NetworkRegistrationHelper(config.corda, HTTPNetworkRegistrationService(compatibilityZoneURL), NodeRegistrationOption(rootTruststorePath, rootTruststorePassword)).buildKeystore() + NetworkRegistrationHelper( + config.corda, + HTTPNetworkRegistrationService(compatibilityZoneURL), + NodeRegistrationOption(rootTruststorePath, rootTruststorePassword) + ).buildKeystore() config } } else { - startOutOfProcessMiniNode(config, + startOutOfProcessMiniNode( + config, "--initial-registration", "--network-root-truststore=${rootTruststorePath.toAbsolutePath()}", - "--network-root-truststore-password=$rootTruststorePassword").map { config } + "--network-root-truststore-password=$rootTruststorePassword" + ).map { config } } } @@ -575,54 +567,6 @@ class DriverDSLImpl( return driverDirectory / nodeDirectoryName } - /** - * @nodeName the name of the node which performs counting - * @param initial number of nodes currently in the network map of a running node. - * @param networkMapCacheChangeObservable an observable returning the updates to the node network map. - * @return a [ConnectableObservable] which emits a new [Int] every time the number of registered nodes changes - * the initial value emitted is always [initial] - */ - private fun nodeCountObservable(nodeName: CordaX500Name, initial: Int, networkMapCacheChangeObservable: Observable): - ConnectableObservable { - val count = AtomicInteger(initial) - return networkMapCacheChangeObservable.map { - log.debug("nodeCountObservable for '$nodeName' received '$it'") - when (it) { - is NetworkMapCache.MapChange.Added -> count.incrementAndGet() - is NetworkMapCache.MapChange.Removed -> count.decrementAndGet() - is NetworkMapCache.MapChange.Modified -> count.get() - } - }.startWith(initial).replay() - } - - /** - * @param rpc the [CordaRPCOps] of a newly started node. - * @return a [CordaFuture] which resolves when every node started by driver has in its network map a number of nodes - * equal to the number of running nodes. The future will yield the number of connected nodes. - */ - private fun allNodesConnected(rpc: CordaRPCOps): CordaFuture { - val (snapshot, updates) = rpc.networkMapFeed() - val nodeName = rpc.nodeInfo().legalIdentities[0].name - val counterObservable = nodeCountObservable(nodeName, snapshot.size, updates) - countObservables[nodeName] = counterObservable - /* TODO: this might not always be the exact number of nodes one has to wait for, - * for example in the following sequence - * 1 start 3 nodes in order, A, B, C. - * 2 before the future returned by this function resolves, kill B - * At that point this future won't ever resolve as it will wait for nodes to know 3 other nodes. - */ - val requiredNodes = countObservables.size - - // This is an observable which yield the minimum number of nodes in each node network map. - val smallestSeenNetworkMapSize = Observable.combineLatest(countObservables.values.toList()) { args: Array -> - log.debug("smallestSeenNetworkMapSize for '$nodeName' is: ${args.toList()}") - args.map { it as Int }.min() ?: 0 - } - val future = smallestSeenNetworkMapSize.filter { it >= requiredNodes }.toFuture() - counterObservable.connect() - return future - } - /** * Start the node with the given flag which is expected to start the node for some function, which once complete will * terminate the node. @@ -652,16 +596,14 @@ class DriverDSLImpl( startInProcess: Boolean?, maximumHeapSize: String, localNetworkMap: LocalNetworkMap?): CordaFuture { + val visibilityHandle = networkVisibilityController.register(config.corda.myLegalName) val baseDirectory = config.corda.baseDirectory.createDirectories() localNetworkMap?.networkParametersCopier?.install(baseDirectory) localNetworkMap?.nodeInfosCopier?.addConfig(baseDirectory) val onNodeExit: () -> Unit = { localNetworkMap?.nodeInfosCopier?.removeConfig(baseDirectory) - countObservables.remove(config.corda.myLegalName) - synchronized(nodeNames) { - nodeNames.remove(config.corda.myLegalName) - } + visibilityHandle.close() } val useHTTPS = config.typesafe.run { hasPath("useHTTPS") && getBoolean("useHTTPS") } @@ -678,7 +620,7 @@ class DriverDSLImpl( ) return nodeAndThreadFuture.flatMap { (node, thread) -> establishRpc(config, openFuture()).flatMap { rpc -> - allNodesConnected(rpc).map { + visibilityHandle.listen(rpc).map { InProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, thread, onNodeExit, node) } } @@ -701,12 +643,13 @@ class DriverDSLImpl( } establishRpc(config, processDeathFuture).flatMap { rpc -> // Check for all nodes to have all other nodes in background in case RPC is failing over: - val networkMapFuture = executorService.fork { allNodesConnected(rpc) }.flatMap { it } + val networkMapFuture = executorService.fork { visibilityHandle.listen(rpc) }.flatMap { it } firstOf(processDeathFuture, networkMapFuture) { if (it == processDeathFuture) { throw ListenProcessDeathException(config.corda.p2pAddress, process) } - // Will interrupt polling for process death as this is no longer relevant since the process been successfully started and reflected itself in the NetworkMap. + // Will interrupt polling for process death as this is no longer relevant since the process been + // successfully started and reflected itself in the NetworkMap. processDeathFuture.cancel(true) log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: $webAddress") OutOfProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, debugPort, process, onNodeExit) @@ -725,7 +668,7 @@ class DriverDSLImpl( /** * The local version of the network map, which is a bunch of classes that copy the relevant files to the node directories. */ - private inner class LocalNetworkMap(notaryInfos: List) { + inner class LocalNetworkMap(notaryInfos: List) { val networkParametersCopier = NetworkParametersCopier(networkParameters.copy(notaries = notaryInfos)) // TODO: this object will copy NodeInfo files from started nodes to other nodes additional-node-infos/ // This uses the FileSystem and adds a delay (~5 seconds) given by the time we wait before polling the file system. @@ -737,12 +680,12 @@ class DriverDSLImpl( * Simple holder class to capture the node configuration both as the raw [Config] object and the parsed [NodeConfiguration]. * Keeping [Config] around is needed as the user may specify extra config options not specified in [NodeConfiguration]. */ - private class NodeConfig(val typesafe: Config, val corda: NodeConfiguration = typesafe.parseAsNodeConfiguration().also { nodeConfiguration -> - val errors = nodeConfiguration.validate() - if (errors.isNotEmpty()) { - throw IllegalStateException("Invalid node configuration. Errors where:${System.lineSeparator()}${errors.joinToString(System.lineSeparator())}") + private class NodeConfig(val typesafe: Config, val corda: NodeConfiguration = typesafe.parseAsNodeConfiguration()) { + init { + val errors = corda.validate() + require(errors.isEmpty()) { "Invalid node configuration. Errors where:\n${errors.joinToString("\n")}" } } - }) + } companion object { internal val log = contextLogger() @@ -910,6 +853,63 @@ class DriverDSLImpl( } } +/** + * Keeps track of how many nodes each node sees and gates nodes from completing their startNode [CordaFuture] until all + * current nodes see everyone. + */ +private class NetworkVisibilityController { + private val nodeVisibilityHandles = ThreadBox(HashMap()) + + fun register(name: CordaX500Name): VisibilityHandle { + val handle = VisibilityHandle() + nodeVisibilityHandles.locked { + require(putIfAbsent(name, handle) == null) { "Node with name $name is already started or starting" } + } + return handle + } + + private fun checkIfAllVisible() { + nodeVisibilityHandles.locked { + val minView = values.stream().mapToInt { it.visibleNodeCount }.min().orElse(0) + if (minView >= size) { + values.forEach { it.future.set(Unit) } + } + } + } + + inner class VisibilityHandle : AutoCloseable { + internal val future = openFuture() + internal var visibleNodeCount = 0 + private var subscription: Subscription? = null + + fun listen(rpc: CordaRPCOps): CordaFuture { + check(subscription == null) + val (snapshot, updates) = rpc.networkMapFeed() + visibleNodeCount = snapshot.size + checkIfAllVisible() + subscription = updates.subscribe { when (it) { + is NetworkMapCache.MapChange.Added -> { + visibleNodeCount++ + checkIfAllVisible() + } + is NetworkMapCache.MapChange.Removed -> { + visibleNodeCount-- + checkIfAllVisible() + } + } } + return future + } + + override fun close() { + subscription?.unsubscribe() + nodeVisibilityHandles.locked { + values -= this@VisibilityHandle + checkIfAllVisible() + } + } + } +} + interface InternalDriverDSL : DriverDSL, CordformContext { private companion object { private val DEFAULT_POLL_INTERVAL = 500.millis