mirror of
https://github.com/corda/corda.git
synced 2025-01-14 00:39:57 +00:00
Merge remote-tracking branch 'open/master' into os-merge-f88542f
# Conflicts: # docs/source/changelog.rst # finance/src/integration-test/kotlin/net/corda/finance/flows/CashSelectionTest.kt # node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt # testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt
This commit is contained in:
commit
9b58114146
@ -69,7 +69,7 @@ buildscript {
|
|||||||
ext.jopt_simple_version = '5.0.2'
|
ext.jopt_simple_version = '5.0.2'
|
||||||
ext.jansi_version = '1.14'
|
ext.jansi_version = '1.14'
|
||||||
ext.hibernate_version = '5.2.6.Final'
|
ext.hibernate_version = '5.2.6.Final'
|
||||||
ext.h2_version = '1.4.194' // Update docs if renamed or removed.
|
ext.h2_version = '1.4.197' // Update docs if renamed or removed.
|
||||||
ext.postgresql_version = '42.1.4'
|
ext.postgresql_version = '42.1.4'
|
||||||
ext.rxjava_version = '1.2.4'
|
ext.rxjava_version = '1.2.4'
|
||||||
ext.dokka_version = '0.9.16-eap-2'
|
ext.dokka_version = '0.9.16-eap-2'
|
||||||
|
@ -44,9 +44,15 @@ Unreleased
|
|||||||
|
|
||||||
* java.security.cert.X509CRL serialization support added.
|
* java.security.cert.X509CRL serialization support added.
|
||||||
|
|
||||||
* Added ``NetworkMapCache.getNodesByLegalName`` for querying nodes belonging to a distributed service such as a notary cluster
|
* Upgraded H2 to v1.4.197.
|
||||||
where they all share a common identity. ``NetworkMapCache.getNodeByLegalName`` has been tightened to throw if more than
|
|
||||||
one node with the legal name is found.
|
* Shell (embedded available only in dev mode or via SSH) connects to the node via RPC instead of using the ``CordaRPCOps`` object directly.
|
||||||
|
To enable RPC connectivity ensure node’s ``rpcSettings.address`` and ``rpcSettings.adminAddress`` settings are present.
|
||||||
|
|
||||||
|
.. _changelog_v3:
|
||||||
|
|
||||||
|
Version 3.0
|
||||||
|
-----------
|
||||||
|
|
||||||
* Per CorDapp configuration is now exposed. ``CordappContext`` now exposes a ``CordappConfig`` object that is populated
|
* Per CorDapp configuration is now exposed. ``CordappContext`` now exposes a ``CordappConfig`` object that is populated
|
||||||
at CorDapp context creation time from a file source during runtime.
|
at CorDapp context creation time from a file source during runtime.
|
||||||
|
@ -1,13 +1,10 @@
|
|||||||
package net.corda.finance.flows
|
package net.corda.finance.flows
|
||||||
|
|
||||||
import net.corda.core.internal.packageName
|
|
||||||
import net.corda.core.messaging.startFlow
|
import net.corda.core.messaging.startFlow
|
||||||
import net.corda.core.utilities.OpaqueBytes
|
import net.corda.core.utilities.OpaqueBytes
|
||||||
import net.corda.core.utilities.getOrThrow
|
import net.corda.core.utilities.getOrThrow
|
||||||
import net.corda.finance.DOLLARS
|
import net.corda.finance.DOLLARS
|
||||||
import net.corda.finance.contracts.asset.Cash
|
|
||||||
import net.corda.finance.contracts.getCashBalance
|
import net.corda.finance.contracts.getCashBalance
|
||||||
import net.corda.finance.schemas.CashSchemaV1
|
|
||||||
import net.corda.testing.core.ALICE_NAME
|
import net.corda.testing.core.ALICE_NAME
|
||||||
import net.corda.testing.core.BOB_NAME
|
import net.corda.testing.core.BOB_NAME
|
||||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||||
@ -31,11 +28,8 @@ class CashSelectionTest : IntegrationTest() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun unconsumed_cash_states() {
|
fun `unconsumed cash states`() {
|
||||||
|
driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) {
|
||||||
driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf(Cash::class, CashSchemaV1::class).map { it.packageName })) {
|
|
||||||
|
|
||||||
defaultNotaryNode.getOrThrow()
|
|
||||||
val node = startNode().getOrThrow() as InProcessImpl
|
val node = startNode().getOrThrow() as InProcessImpl
|
||||||
val issuerRef = OpaqueBytes.of(0)
|
val issuerRef = OpaqueBytes.of(0)
|
||||||
val issuedAmount = 1000.DOLLARS
|
val issuedAmount = 1000.DOLLARS
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
package net.corda.finance.contracts.asset.cash.selection
|
package net.corda.finance.contracts.asset.cash.selection
|
||||||
|
|
||||||
import net.corda.core.contracts.Amount
|
import net.corda.core.contracts.Amount
|
||||||
|
import net.corda.core.crypto.toStringShort
|
||||||
import net.corda.core.identity.AbstractParty
|
import net.corda.core.identity.AbstractParty
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
import net.corda.core.utilities.*
|
import net.corda.core.utilities.*
|
||||||
@ -54,8 +55,12 @@ class CashSelectionPostgreSQLImpl : AbstractCashSelection() {
|
|||||||
" AND vs.notary_name = ?" else "") +
|
" AND vs.notary_name = ?" else "") +
|
||||||
(if (onlyFromIssuerParties.isNotEmpty())
|
(if (onlyFromIssuerParties.isNotEmpty())
|
||||||
" AND ccs.issuer_key_hash = ANY (?)" else "") +
|
" AND ccs.issuer_key_hash = ANY (?)" else "") +
|
||||||
(if (withIssuerRefs.isNotEmpty())
|
(if (withIssuerRefs.isNotEmpty()) {
|
||||||
" AND ccs.issuer_ref = ANY (?)" else "") +
|
val repeats = generateSequence { "?" }
|
||||||
|
.take(withIssuerRefs.size)
|
||||||
|
.joinToString(",")
|
||||||
|
" AND ccs.issuer_ref IN ($repeats)"
|
||||||
|
} else "") +
|
||||||
""")
|
""")
|
||||||
nested WHERE nested.total < ?
|
nested WHERE nested.total < ?
|
||||||
"""
|
"""
|
||||||
@ -70,14 +75,12 @@ class CashSelectionPostgreSQLImpl : AbstractCashSelection() {
|
|||||||
}
|
}
|
||||||
if (onlyFromIssuerParties.isNotEmpty()) {
|
if (onlyFromIssuerParties.isNotEmpty()) {
|
||||||
val issuerKeys = connection.createArrayOf("VARCHAR", onlyFromIssuerParties.map
|
val issuerKeys = connection.createArrayOf("VARCHAR", onlyFromIssuerParties.map
|
||||||
{ it.owningKey.toBase58String() }.toTypedArray())
|
{ it.owningKey.toStringShort() }.toTypedArray())
|
||||||
statement.setArray(3 + paramOffset, issuerKeys)
|
statement.setArray(3 + paramOffset, issuerKeys)
|
||||||
paramOffset += 1
|
paramOffset += 1
|
||||||
}
|
}
|
||||||
if (withIssuerRefs.isNotEmpty()) {
|
withIssuerRefs.map { it.bytes }.forEach {
|
||||||
val issuerRefs = connection.createArrayOf("BYTEA", withIssuerRefs.map
|
statement.setBytes( 3 + paramOffset, it)
|
||||||
{ it.bytes }.toTypedArray())
|
|
||||||
statement.setArray(3 + paramOffset, issuerRefs)
|
|
||||||
paramOffset += 1
|
paramOffset += 1
|
||||||
}
|
}
|
||||||
statement.setLong(3 + paramOffset, amount.quantity)
|
statement.setLong(3 + paramOffset, amount.quantity)
|
||||||
|
@ -66,14 +66,10 @@ class FlowsDrainingModeContentionTest : IntegrationTest() {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `draining mode does not deadlock with acks between 2 nodes`() {
|
fun `draining mode does not deadlock with acks between 2 nodes`() {
|
||||||
|
|
||||||
val message = "Ground control to Major Tom"
|
val message = "Ground control to Major Tom"
|
||||||
|
|
||||||
driver(DriverParameters(isDebug = true, startNodesInProcess = true, portAllocation = portAllocation, extraCordappPackagesToScan = listOf(MessageState::class.packageName))) {
|
driver(DriverParameters(isDebug = true, startNodesInProcess = true, portAllocation = portAllocation, extraCordappPackagesToScan = listOf(MessageState::class.packageName))) {
|
||||||
|
|
||||||
val nodeA = startNode(providedName = ALICE_NAME, rpcUsers = users).getOrThrow()
|
val nodeA = startNode(providedName = ALICE_NAME, rpcUsers = users).getOrThrow()
|
||||||
val nodeB = startNode(providedName = BOB_NAME, rpcUsers = users).getOrThrow()
|
val nodeB = startNode(providedName = BOB_NAME, rpcUsers = users).getOrThrow()
|
||||||
defaultNotaryNode.getOrThrow()
|
|
||||||
|
|
||||||
val nodeARpcInfo = RpcInfo(nodeA.rpcAddress, user.username, user.password)
|
val nodeARpcInfo = RpcInfo(nodeA.rpcAddress, user.username, user.password)
|
||||||
val flow = nodeA.rpc.startFlow(::ProposeTransactionAndWaitForCommit, message, nodeARpcInfo, nodeB.nodeInfo.singleIdentity(), defaultNotaryIdentity)
|
val flow = nodeA.rpc.startFlow(::ProposeTransactionAndWaitForCommit, message, nodeARpcInfo, nodeB.nodeInfo.singleIdentity(), defaultNotaryIdentity)
|
||||||
|
@ -62,8 +62,6 @@ class NodeStatePersistenceTests : IntegrationTest() {
|
|||||||
val nodeName = {
|
val nodeName = {
|
||||||
val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow()
|
val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow()
|
||||||
val nodeName = nodeHandle.nodeInfo.singleIdentity().name
|
val nodeName = nodeHandle.nodeInfo.singleIdentity().name
|
||||||
// Ensure the notary node has finished starting up, before starting a flow that needs a notary
|
|
||||||
defaultNotaryNode.getOrThrow()
|
|
||||||
CordaRPCClient(nodeHandle.rpcAddress).start(user.username, user.password).use {
|
CordaRPCClient(nodeHandle.rpcAddress).start(user.username, user.password).use {
|
||||||
it.proxy.startFlow(::SendMessageFlow, message, defaultNotaryIdentity).returnValue.getOrThrow()
|
it.proxy.startFlow(::SendMessageFlow, message, defaultNotaryIdentity).returnValue.getOrThrow()
|
||||||
}
|
}
|
||||||
@ -96,8 +94,6 @@ class NodeStatePersistenceTests : IntegrationTest() {
|
|||||||
val nodeName = {
|
val nodeName = {
|
||||||
val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow()
|
val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow()
|
||||||
val nodeName = nodeHandle.nodeInfo.singleIdentity().name
|
val nodeName = nodeHandle.nodeInfo.singleIdentity().name
|
||||||
// Ensure the notary node has finished starting up, before starting a flow that needs a notary
|
|
||||||
defaultNotaryNode.getOrThrow()
|
|
||||||
CordaRPCClient(nodeHandle.rpcAddress).start(user.username, user.password).use {
|
CordaRPCClient(nodeHandle.rpcAddress).start(user.username, user.password).use {
|
||||||
it.proxy.startFlow(::SendMessageFlow, message, defaultNotaryIdentity).returnValue.getOrThrow()
|
it.proxy.startFlow(::SendMessageFlow, message, defaultNotaryIdentity).returnValue.getOrThrow()
|
||||||
}
|
}
|
||||||
|
@ -103,16 +103,11 @@ class NodeRegistrationTest : IntegrationTest() {
|
|||||||
notarySpecs = listOf(NotarySpec(notaryName)),
|
notarySpecs = listOf(NotarySpec(notaryName)),
|
||||||
extraCordappPackagesToScan = listOf("net.corda.finance")
|
extraCordappPackagesToScan = listOf("net.corda.finance")
|
||||||
) {
|
) {
|
||||||
val nodes = listOf(
|
val (alice, genevieve) = listOf(
|
||||||
startNode(providedName = aliceName),
|
startNode(providedName = aliceName),
|
||||||
startNode(providedName = genevieveName),
|
startNode(providedName = genevieveName)
|
||||||
defaultNotaryNode
|
|
||||||
).transpose().getOrThrow()
|
).transpose().getOrThrow()
|
||||||
|
|
||||||
log.info("Nodes started")
|
|
||||||
|
|
||||||
val (alice, genevieve) = nodes
|
|
||||||
|
|
||||||
assertThat(registrationHandler.idsPolled).containsOnly(
|
assertThat(registrationHandler.idsPolled).containsOnly(
|
||||||
aliceName.organisation,
|
aliceName.organisation,
|
||||||
genevieveName.organisation,
|
genevieveName.organisation,
|
||||||
|
@ -38,11 +38,8 @@ import net.corda.node.internal.security.RPCSecurityManagerWithAdditionalUser
|
|||||||
import net.corda.node.serialization.KryoServerSerializationScheme
|
import net.corda.node.serialization.KryoServerSerializationScheme
|
||||||
import net.corda.node.services.api.NodePropertiesStore
|
import net.corda.node.services.api.NodePropertiesStore
|
||||||
import net.corda.node.services.api.SchemaService
|
import net.corda.node.services.api.SchemaService
|
||||||
import net.corda.node.services.config.NodeConfiguration
|
import net.corda.node.services.config.*
|
||||||
import net.corda.node.services.config.SecurityConfiguration
|
|
||||||
import net.corda.node.services.config.VerifierType
|
|
||||||
import net.corda.node.services.config.shell.localShellUser
|
import net.corda.node.services.config.shell.localShellUser
|
||||||
import net.corda.node.services.config.shouldInitCrashShell
|
|
||||||
import net.corda.node.services.messaging.*
|
import net.corda.node.services.messaging.*
|
||||||
import net.corda.node.services.rpc.ArtemisRpcBroker
|
import net.corda.node.services.rpc.ArtemisRpcBroker
|
||||||
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
||||||
@ -174,7 +171,7 @@ open class Node(configuration: NodeConfiguration,
|
|||||||
val securityManagerConfig = configuration.security?.authService ?: SecurityConfiguration.AuthService.fromUsers(configuration.rpcUsers)
|
val securityManagerConfig = configuration.security?.authService ?: SecurityConfiguration.AuthService.fromUsers(configuration.rpcUsers)
|
||||||
|
|
||||||
securityManager = with(RPCSecurityManagerImpl(securityManagerConfig)) {
|
securityManager = with(RPCSecurityManagerImpl(securityManagerConfig)) {
|
||||||
if (configuration.shouldInitCrashShell()) RPCSecurityManagerWithAdditionalUser(this, localShellUser()) else this
|
if (configuration.shouldStartLocalShell()) RPCSecurityManagerWithAdditionalUser(this, localShellUser()) else this
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!configuration.messagingServerExternal) {
|
if (!configuration.messagingServerExternal) {
|
||||||
|
@ -117,8 +117,8 @@ class NodeSchemaServiceTest {
|
|||||||
@Test
|
@Test
|
||||||
fun `check node runs inclusive of notary node schema set using driverDSL`() {
|
fun `check node runs inclusive of notary node schema set using driverDSL`() {
|
||||||
driver(DriverParameters(startNodesInProcess = true)) {
|
driver(DriverParameters(startNodesInProcess = true)) {
|
||||||
val notaryNode = defaultNotaryNode.getOrThrow().rpc.startFlow(::MappedSchemasFlow)
|
val notary = defaultNotaryNode.getOrThrow()
|
||||||
val mappedSchemas = notaryNode.returnValue.getOrThrow()
|
val mappedSchemas = notary.rpc.startFlow(::MappedSchemasFlow).returnValue.getOrThrow()
|
||||||
// check against NodeCore + NodeNotary Schemas
|
// check against NodeCore + NodeNotary Schemas
|
||||||
assertTrue(mappedSchemas.contains(NodeCoreV1.name))
|
assertTrue(mappedSchemas.contains(NodeCoreV1.name))
|
||||||
assertTrue(mappedSchemas.contains(NodeNotaryV1.name))
|
assertTrue(mappedSchemas.contains(NodeNotaryV1.name))
|
||||||
|
@ -19,15 +19,14 @@ import net.corda.core.internal.packageName
|
|||||||
import net.corda.core.node.services.*
|
import net.corda.core.node.services.*
|
||||||
import net.corda.core.node.services.vault.*
|
import net.corda.core.node.services.vault.*
|
||||||
import net.corda.core.node.services.vault.QueryCriteria.*
|
import net.corda.core.node.services.vault.QueryCriteria.*
|
||||||
import net.corda.core.utilities.NonEmptySet
|
import net.corda.core.transactions.TransactionBuilder
|
||||||
import net.corda.core.utilities.days
|
import net.corda.core.utilities.*
|
||||||
import net.corda.core.utilities.seconds
|
|
||||||
import net.corda.core.utilities.toHexString
|
|
||||||
import net.corda.finance.*
|
import net.corda.finance.*
|
||||||
import net.corda.finance.contracts.CommercialPaper
|
import net.corda.finance.contracts.CommercialPaper
|
||||||
import net.corda.finance.contracts.Commodity
|
import net.corda.finance.contracts.Commodity
|
||||||
import net.corda.finance.contracts.DealState
|
import net.corda.finance.contracts.DealState
|
||||||
import net.corda.finance.contracts.asset.Cash
|
import net.corda.finance.contracts.asset.Cash
|
||||||
|
import net.corda.finance.contracts.asset.cash.selection.AbstractCashSelection
|
||||||
import net.corda.finance.sampleschemas.SampleCashSchemaV3
|
import net.corda.finance.sampleschemas.SampleCashSchemaV3
|
||||||
import net.corda.finance.schemas.CashSchemaV1
|
import net.corda.finance.schemas.CashSchemaV1
|
||||||
import net.corda.finance.schemas.CashSchemaV1.PersistentCashState
|
import net.corda.finance.schemas.CashSchemaV1.PersistentCashState
|
||||||
@ -52,7 +51,6 @@ import java.time.LocalDate
|
|||||||
import java.time.ZoneOffset
|
import java.time.ZoneOffset
|
||||||
import java.time.temporal.ChronoUnit
|
import java.time.temporal.ChronoUnit
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import kotlin.test.assertTrue
|
|
||||||
|
|
||||||
open class VaultQueryTests {
|
open class VaultQueryTests {
|
||||||
private companion object {
|
private companion object {
|
||||||
@ -2064,6 +2062,39 @@ open class VaultQueryTests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `unconsumedCashStatesForSpending_single_issuer_reference`() {
|
||||||
|
database.transaction {
|
||||||
|
vaultFiller.fillWithSomeTestCash(1000.DOLLARS, notaryServices, 1, DUMMY_CASH_ISSUER)
|
||||||
|
}
|
||||||
|
database.transaction {
|
||||||
|
val builder = TransactionBuilder()
|
||||||
|
val issuer = DUMMY_CASH_ISSUER
|
||||||
|
val exitStates = AbstractCashSelection
|
||||||
|
.getInstance { services.jdbcSession().metaData }
|
||||||
|
.unconsumedCashStatesForSpending(services, 300.DOLLARS, setOf(issuer.party),
|
||||||
|
builder.notary, builder.lockId, setOf(issuer.reference))
|
||||||
|
|
||||||
|
assertThat(exitStates).hasSize(1)
|
||||||
|
assertThat(exitStates[0].state.data.amount.quantity).isEqualTo(100000)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `unconsumedCashStatesForSpending_single_issuer_reference_not_matching`() {
|
||||||
|
database.transaction {
|
||||||
|
vaultFiller.fillWithSomeTestCash(1000.DOLLARS, notaryServices, 1, DUMMY_CASH_ISSUER)
|
||||||
|
}
|
||||||
|
database.transaction {
|
||||||
|
val builder = TransactionBuilder()
|
||||||
|
val issuer = DUMMY_CASH_ISSUER
|
||||||
|
val exitStates = AbstractCashSelection
|
||||||
|
.getInstance { services.jdbcSession().metaData }
|
||||||
|
.unconsumedCashStatesForSpending(services, 300.DOLLARS, setOf(issuer.party),
|
||||||
|
builder.notary, builder.lockId, setOf(OpaqueBytes.of(13)))
|
||||||
|
assertThat(exitStates).hasSize(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* USE CASE demonstrations (outside of mainline Corda)
|
* USE CASE demonstrations (outside of mainline Corda)
|
||||||
*
|
*
|
||||||
|
@ -54,7 +54,6 @@ class TraderDemoTest : IntegrationTest() {
|
|||||||
startFlow<CommercialPaperIssueFlow>(),
|
startFlow<CommercialPaperIssueFlow>(),
|
||||||
all()))
|
all()))
|
||||||
driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) {
|
driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) {
|
||||||
defaultNotaryNode.getOrThrow()
|
|
||||||
val (nodeA, nodeB, bankNode) = listOf(
|
val (nodeA, nodeB, bankNode) = listOf(
|
||||||
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)),
|
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)),
|
||||||
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)),
|
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)),
|
||||||
|
@ -21,6 +21,7 @@ import net.corda.core.utilities.NetworkHostAndPort
|
|||||||
import net.corda.core.utilities.getOrThrow
|
import net.corda.core.utilities.getOrThrow
|
||||||
import net.corda.node.internal.NodeStartup
|
import net.corda.node.internal.NodeStartup
|
||||||
import net.corda.testing.common.internal.ProjectStructure.projectRootDir
|
import net.corda.testing.common.internal.ProjectStructure.projectRootDir
|
||||||
|
import net.corda.testing.core.BOB_NAME
|
||||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||||
import net.corda.testing.core.DUMMY_BANK_B_NAME
|
import net.corda.testing.core.DUMMY_BANK_B_NAME
|
||||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||||
@ -89,9 +90,20 @@ class DriverTests : IntegrationTest() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `default notary is visible when the startNode future completes`() {
|
||||||
|
// Based on local testing, running this 3 times gives us a high confidence that we'll spot if the feature is not working
|
||||||
|
repeat(3) {
|
||||||
|
driver(DriverParameters(startNodesInProcess = true)) {
|
||||||
|
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||||
|
assertThat(bob.rpc.networkMapSnapshot().flatMap { it.legalIdentities }).contains(defaultNotaryIdentity)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `random free port allocation`() {
|
fun `random free port allocation`() {
|
||||||
val nodeHandle = driver(DriverParameters(portAllocation = RandomFree)) {
|
val nodeHandle = driver(DriverParameters(portAllocation = RandomFree, notarySpecs = emptyList())) {
|
||||||
val nodeInfo = startNode(providedName = DUMMY_BANK_A_NAME)
|
val nodeInfo = startNode(providedName = DUMMY_BANK_A_NAME)
|
||||||
nodeMustBeUp(nodeInfo)
|
nodeMustBeUp(nodeInfo)
|
||||||
}
|
}
|
||||||
@ -103,10 +115,7 @@ class DriverTests : IntegrationTest() {
|
|||||||
// Make sure we're using the log4j2 config which writes to the log file
|
// Make sure we're using the log4j2 config which writes to the log file
|
||||||
val logConfigFile = projectRootDir / "config" / "dev" / "log4j2.xml"
|
val logConfigFile = projectRootDir / "config" / "dev" / "log4j2.xml"
|
||||||
assertThat(logConfigFile).isRegularFile()
|
assertThat(logConfigFile).isRegularFile()
|
||||||
driver(DriverParameters(
|
driver(DriverParameters(isDebug = true,notarySpecs = emptyList(), systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) {
|
||||||
isDebug = true,
|
|
||||||
systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString())
|
|
||||||
)) {
|
|
||||||
val baseDirectory = startNode(providedName = DUMMY_BANK_A_NAME).getOrThrow().baseDirectory
|
val baseDirectory = startNode(providedName = DUMMY_BANK_A_NAME).getOrThrow().baseDirectory
|
||||||
val logFile = (baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).list { it.sorted().findFirst().get() }
|
val logFile = (baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).list { it.sorted().findFirst().get() }
|
||||||
val debugLinesPresent = logFile.readLines { lines -> lines.anyMatch { line -> line.startsWith("[DEBUG]") } }
|
val debugLinesPresent = logFile.readLines { lines -> lines.anyMatch { line -> line.startsWith("[DEBUG]") } }
|
||||||
@ -116,7 +125,7 @@ class DriverTests : IntegrationTest() {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `monitoring mode enables jolokia exporting of JMX metrics via HTTP JSON`() {
|
fun `monitoring mode enables jolokia exporting of JMX metrics via HTTP JSON`() {
|
||||||
driver(DriverParameters(startNodesInProcess = false, jmxPolicy = JmxPolicy(true))) {
|
driver(DriverParameters(startNodesInProcess = false, jmxPolicy = JmxPolicy(true), notarySpecs = emptyList())) {
|
||||||
// start another node so we gain access to node JMX metrics
|
// start another node so we gain access to node JMX metrics
|
||||||
val webAddress = NetworkHostAndPort("localhost", 7006)
|
val webAddress = NetworkHostAndPort("localhost", 7006)
|
||||||
startNode(providedName = DUMMY_REGULATOR_NAME,
|
startNode(providedName = DUMMY_REGULATOR_NAME,
|
||||||
@ -145,33 +154,32 @@ class DriverTests : IntegrationTest() {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `driver rejects multiple nodes with the same name`() {
|
fun `driver rejects multiple nodes with the same name`() {
|
||||||
|
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) {
|
||||||
driver(DriverParameters(startNodesInProcess = true)) {
|
assertThatThrownBy {
|
||||||
|
listOf(
|
||||||
assertThatThrownBy { listOf(newNode(DUMMY_BANK_A_NAME)(), newNode(DUMMY_BANK_B_NAME)(), newNode(DUMMY_BANK_A_NAME)()).transpose().getOrThrow() }.isInstanceOf(IllegalArgumentException::class.java)
|
newNode(DUMMY_BANK_A_NAME)(),
|
||||||
|
newNode(DUMMY_BANK_B_NAME)(),
|
||||||
|
newNode(DUMMY_BANK_A_NAME)()
|
||||||
|
).transpose().getOrThrow()
|
||||||
|
}.isInstanceOf(IllegalArgumentException::class.java)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `driver rejects multiple nodes with the same name parallel`() {
|
fun `driver rejects multiple nodes with the same name parallel`() {
|
||||||
|
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) {
|
||||||
driver(DriverParameters(startNodesInProcess = true)) {
|
|
||||||
|
|
||||||
val nodes = listOf(newNode(DUMMY_BANK_A_NAME), newNode(DUMMY_BANK_B_NAME), newNode(DUMMY_BANK_A_NAME))
|
val nodes = listOf(newNode(DUMMY_BANK_A_NAME), newNode(DUMMY_BANK_B_NAME), newNode(DUMMY_BANK_A_NAME))
|
||||||
|
assertThatThrownBy {
|
||||||
assertThatThrownBy { nodes.parallelStream().map { it.invoke() }.toList().transpose().getOrThrow() }.isInstanceOf(IllegalArgumentException::class.java)
|
nodes.parallelStream().map { it.invoke() }.toList().transpose().getOrThrow()
|
||||||
|
}.isInstanceOf(IllegalArgumentException::class.java)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `driver allows reusing names of nodes that have been stopped`() {
|
fun `driver allows reusing names of nodes that have been stopped`() {
|
||||||
|
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) {
|
||||||
driver(DriverParameters(startNodesInProcess = true)) {
|
|
||||||
|
|
||||||
val nodeA = newNode(DUMMY_BANK_A_NAME)().getOrThrow()
|
val nodeA = newNode(DUMMY_BANK_A_NAME)().getOrThrow()
|
||||||
|
|
||||||
nodeA.stop()
|
nodeA.stop()
|
||||||
|
|
||||||
assertThatCode { newNode(DUMMY_BANK_A_NAME)().getOrThrow() }.doesNotThrowAnyException()
|
assertThatCode { newNode(DUMMY_BANK_A_NAME)().getOrThrow() }.doesNotThrowAnyException()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -78,7 +78,8 @@ interface DriverDSL {
|
|||||||
* @param maximumHeapSize The maximum JVM heap size to use for the node as a [String]. By default a number is interpreted
|
* @param maximumHeapSize The maximum JVM heap size to use for the node as a [String]. By default a number is interpreted
|
||||||
* as being in bytes. Append the letter 'k' or 'K' to the value to indicate Kilobytes, 'm' or 'M' to indicate
|
* as being in bytes. Append the letter 'k' or 'K' to the value to indicate Kilobytes, 'm' or 'M' to indicate
|
||||||
* megabytes, and 'g' or 'G' to indicate gigabytes. The default value is "512m" = 512 megabytes.
|
* megabytes, and 'g' or 'G' to indicate gigabytes. The default value is "512m" = 512 megabytes.
|
||||||
* @return A [CordaFuture] on the [NodeHandle] to the node. The future will complete when the node is available.
|
* @return A [CordaFuture] on the [NodeHandle] to the node. The future will complete when the node is available and
|
||||||
|
* it sees all previously started nodes, including the notaries.
|
||||||
*/
|
*/
|
||||||
fun startNode(
|
fun startNode(
|
||||||
defaultParameters: NodeParameters = NodeParameters(),
|
defaultParameters: NodeParameters = NodeParameters(),
|
||||||
|
@ -29,14 +29,12 @@ import net.corda.core.messaging.CordaRPCOps
|
|||||||
import net.corda.core.node.NetworkParameters
|
import net.corda.core.node.NetworkParameters
|
||||||
import net.corda.core.node.NotaryInfo
|
import net.corda.core.node.NotaryInfo
|
||||||
import net.corda.core.node.services.NetworkMapCache
|
import net.corda.core.node.services.NetworkMapCache
|
||||||
import net.corda.core.toFuture
|
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.core.utilities.getOrThrow
|
import net.corda.core.utilities.getOrThrow
|
||||||
import net.corda.core.utilities.millis
|
import net.corda.core.utilities.millis
|
||||||
import net.corda.node.NodeRegistrationOption
|
import net.corda.node.NodeRegistrationOption
|
||||||
import net.corda.node.internal.Node
|
import net.corda.node.internal.Node
|
||||||
import net.corda.node.internal.NodeStartup
|
|
||||||
import net.corda.node.internal.StartedNode
|
import net.corda.node.internal.StartedNode
|
||||||
import net.corda.node.services.Permissions
|
import net.corda.node.services.Permissions
|
||||||
import net.corda.node.services.config.*
|
import net.corda.node.services.config.*
|
||||||
@ -67,8 +65,7 @@ import net.corda.testing.node.internal.DriverDSLImpl.ClusterType.NON_VALIDATING_
|
|||||||
import net.corda.testing.node.internal.DriverDSLImpl.ClusterType.VALIDATING_RAFT
|
import net.corda.testing.node.internal.DriverDSLImpl.ClusterType.VALIDATING_RAFT
|
||||||
import okhttp3.OkHttpClient
|
import okhttp3.OkHttpClient
|
||||||
import okhttp3.Request
|
import okhttp3.Request
|
||||||
import rx.Observable
|
import rx.Subscription
|
||||||
import rx.observables.ConnectableObservable
|
|
||||||
import rx.schedulers.Schedulers
|
import rx.schedulers.Schedulers
|
||||||
import java.lang.management.ManagementFactory
|
import java.lang.management.ManagementFactory
|
||||||
import java.net.ConnectException
|
import java.net.ConnectException
|
||||||
@ -83,11 +80,10 @@ import java.time.Instant
|
|||||||
import java.time.ZoneOffset.UTC
|
import java.time.ZoneOffset.UTC
|
||||||
import java.time.format.DateTimeFormatter
|
import java.time.format.DateTimeFormatter
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
|
||||||
import java.util.concurrent.Executors
|
import java.util.concurrent.Executors
|
||||||
import java.util.concurrent.ScheduledExecutorService
|
import java.util.concurrent.ScheduledExecutorService
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import kotlin.collections.HashMap
|
||||||
import kotlin.concurrent.thread
|
import kotlin.concurrent.thread
|
||||||
import net.corda.nodeapi.internal.config.User as InternalUser
|
import net.corda.nodeapi.internal.config.User as InternalUser
|
||||||
|
|
||||||
@ -112,12 +108,11 @@ class DriverDSLImpl(
|
|||||||
override val shutdownManager get() = _shutdownManager!!
|
override val shutdownManager get() = _shutdownManager!!
|
||||||
private val cordappPackages = extraCordappPackagesToScan + getCallerPackage()
|
private val cordappPackages = extraCordappPackagesToScan + getCallerPackage()
|
||||||
// Map from a nodes legal name to an observable emitting the number of nodes in its network map.
|
// Map from a nodes legal name to an observable emitting the number of nodes in its network map.
|
||||||
private val countObservables = ConcurrentHashMap<CordaX500Name, Observable<Int>>()
|
private val networkVisibilityController = NetworkVisibilityController()
|
||||||
private val nodeNames = mutableSetOf<CordaX500Name>()
|
|
||||||
/**
|
/**
|
||||||
* Future which completes when the network map is available, whether a local one or one from the CZ. This future acts
|
* Future which completes when the network map infrastructure is available, whether a local one or one from the CZ.
|
||||||
* as a gate to prevent nodes from starting too early. The value of the future is a [LocalNetworkMap] object, which
|
* This future acts as a gate to prevent nodes from starting too early. The value of the future is a [LocalNetworkMap]
|
||||||
* is null if the network map is being provided by the CZ.
|
* object, which is null if the network map is being provided by the CZ.
|
||||||
*/
|
*/
|
||||||
private lateinit var networkMapAvailability: CordaFuture<LocalNetworkMap?>
|
private lateinit var networkMapAvailability: CordaFuture<LocalNetworkMap?>
|
||||||
private lateinit var _notaries: CordaFuture<List<NotaryHandle>>
|
private lateinit var _notaries: CordaFuture<List<NotaryHandle>>
|
||||||
@ -130,13 +125,9 @@ class DriverDSLImpl(
|
|||||||
private val state = ThreadBox(State())
|
private val state = ThreadBox(State())
|
||||||
|
|
||||||
//TODO: remove this once we can bundle quasar properly.
|
//TODO: remove this once we can bundle quasar properly.
|
||||||
private val quasarJarPath: String by lazy {
|
private val quasarJarPath: String by lazy { resolveJar(".*quasar.*\\.jar$") }
|
||||||
resolveJar(".*quasar.*\\.jar$")
|
|
||||||
}
|
|
||||||
|
|
||||||
private val jolokiaJarPath: String by lazy {
|
private val jolokiaJarPath: String by lazy { resolveJar(".*jolokia-jvm-.*-agent\\.jar$") }
|
||||||
resolveJar(".*jolokia-jvm-.*-agent\\.jar$")
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun resolveJar(jarNamePattern: String): String {
|
private fun resolveJar(jarNamePattern: String): String {
|
||||||
return try {
|
return try {
|
||||||
@ -199,12 +190,7 @@ class DriverDSLImpl(
|
|||||||
val p2pAddress = portAllocation.nextHostAndPort()
|
val p2pAddress = portAllocation.nextHostAndPort()
|
||||||
// TODO: Derive name from the full picked name, don't just wrap the common name
|
// TODO: Derive name from the full picked name, don't just wrap the common name
|
||||||
val name = providedName ?: CordaX500Name("${oneOf(names).organisation}-${p2pAddress.port}", "London", "GB")
|
val name = providedName ?: CordaX500Name("${oneOf(names).organisation}-${p2pAddress.port}", "London", "GB")
|
||||||
synchronized(nodeNames) {
|
|
||||||
val wasANewNode = nodeNames.add(name)
|
|
||||||
if (!wasANewNode) {
|
|
||||||
throw IllegalArgumentException("Node with name $name is already started or starting.")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
val registrationFuture = if (compatibilityZone?.rootCert != null) {
|
val registrationFuture = if (compatibilityZone?.rootCert != null) {
|
||||||
// We don't need the network map to be available to be able to register the node
|
// We don't need the network map to be available to be able to register the node
|
||||||
startNodeRegistration(name, compatibilityZone.rootCert, compatibilityZone.url)
|
startNodeRegistration(name, compatibilityZone.rootCert, compatibilityZone.url)
|
||||||
@ -272,14 +258,20 @@ class DriverDSLImpl(
|
|||||||
|
|
||||||
return if (startNodesInProcess) {
|
return if (startNodesInProcess) {
|
||||||
executorService.fork {
|
executorService.fork {
|
||||||
NetworkRegistrationHelper(config.corda, HTTPNetworkRegistrationService(compatibilityZoneURL), NodeRegistrationOption(rootTruststorePath, rootTruststorePassword)).buildKeystore()
|
NetworkRegistrationHelper(
|
||||||
|
config.corda,
|
||||||
|
HTTPNetworkRegistrationService(compatibilityZoneURL),
|
||||||
|
NodeRegistrationOption(rootTruststorePath, rootTruststorePassword)
|
||||||
|
).buildKeystore()
|
||||||
config
|
config
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
startOutOfProcessMiniNode(config,
|
startOutOfProcessMiniNode(
|
||||||
|
config,
|
||||||
"--initial-registration",
|
"--initial-registration",
|
||||||
"--network-root-truststore=${rootTruststorePath.toAbsolutePath()}",
|
"--network-root-truststore=${rootTruststorePath.toAbsolutePath()}",
|
||||||
"--network-root-truststore-password=$rootTruststorePassword").map { config }
|
"--network-root-truststore-password=$rootTruststorePassword"
|
||||||
|
).map { config }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -581,54 +573,6 @@ class DriverDSLImpl(
|
|||||||
return driverDirectory / nodeDirectoryName
|
return driverDirectory / nodeDirectoryName
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @nodeName the name of the node which performs counting
|
|
||||||
* @param initial number of nodes currently in the network map of a running node.
|
|
||||||
* @param networkMapCacheChangeObservable an observable returning the updates to the node network map.
|
|
||||||
* @return a [ConnectableObservable] which emits a new [Int] every time the number of registered nodes changes
|
|
||||||
* the initial value emitted is always [initial]
|
|
||||||
*/
|
|
||||||
private fun nodeCountObservable(nodeName: CordaX500Name, initial: Int, networkMapCacheChangeObservable: Observable<NetworkMapCache.MapChange>):
|
|
||||||
ConnectableObservable<Int> {
|
|
||||||
val count = AtomicInteger(initial)
|
|
||||||
return networkMapCacheChangeObservable.map {
|
|
||||||
log.debug("nodeCountObservable for '$nodeName' received '$it'")
|
|
||||||
when (it) {
|
|
||||||
is NetworkMapCache.MapChange.Added -> count.incrementAndGet()
|
|
||||||
is NetworkMapCache.MapChange.Removed -> count.decrementAndGet()
|
|
||||||
is NetworkMapCache.MapChange.Modified -> count.get()
|
|
||||||
}
|
|
||||||
}.startWith(initial).replay()
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param rpc the [CordaRPCOps] of a newly started node.
|
|
||||||
* @return a [CordaFuture] which resolves when every node started by driver has in its network map a number of nodes
|
|
||||||
* equal to the number of running nodes. The future will yield the number of connected nodes.
|
|
||||||
*/
|
|
||||||
private fun allNodesConnected(rpc: CordaRPCOps): CordaFuture<Int> {
|
|
||||||
val (snapshot, updates) = rpc.networkMapFeed()
|
|
||||||
val nodeName = rpc.nodeInfo().legalIdentities[0].name
|
|
||||||
val counterObservable = nodeCountObservable(nodeName, snapshot.size, updates)
|
|
||||||
countObservables[nodeName] = counterObservable
|
|
||||||
/* TODO: this might not always be the exact number of nodes one has to wait for,
|
|
||||||
* for example in the following sequence
|
|
||||||
* 1 start 3 nodes in order, A, B, C.
|
|
||||||
* 2 before the future returned by this function resolves, kill B
|
|
||||||
* At that point this future won't ever resolve as it will wait for nodes to know 3 other nodes.
|
|
||||||
*/
|
|
||||||
val requiredNodes = countObservables.size
|
|
||||||
|
|
||||||
// This is an observable which yield the minimum number of nodes in each node network map.
|
|
||||||
val smallestSeenNetworkMapSize = Observable.combineLatest(countObservables.values.toList()) { args: Array<Any> ->
|
|
||||||
log.debug("smallestSeenNetworkMapSize for '$nodeName' is: ${args.toList()}")
|
|
||||||
args.map { it as Int }.min() ?: 0
|
|
||||||
}
|
|
||||||
val future = smallestSeenNetworkMapSize.filter { it >= requiredNodes }.toFuture()
|
|
||||||
counterObservable.connect()
|
|
||||||
return future
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start the node with the given flag which is expected to start the node for some function, which once complete will
|
* Start the node with the given flag which is expected to start the node for some function, which once complete will
|
||||||
* terminate the node.
|
* terminate the node.
|
||||||
@ -658,16 +602,14 @@ class DriverDSLImpl(
|
|||||||
startInProcess: Boolean?,
|
startInProcess: Boolean?,
|
||||||
maximumHeapSize: String,
|
maximumHeapSize: String,
|
||||||
localNetworkMap: LocalNetworkMap?): CordaFuture<NodeHandle> {
|
localNetworkMap: LocalNetworkMap?): CordaFuture<NodeHandle> {
|
||||||
|
val visibilityHandle = networkVisibilityController.register(config.corda.myLegalName)
|
||||||
val baseDirectory = config.corda.baseDirectory.createDirectories()
|
val baseDirectory = config.corda.baseDirectory.createDirectories()
|
||||||
localNetworkMap?.networkParametersCopier?.install(baseDirectory)
|
localNetworkMap?.networkParametersCopier?.install(baseDirectory)
|
||||||
localNetworkMap?.nodeInfosCopier?.addConfig(baseDirectory)
|
localNetworkMap?.nodeInfosCopier?.addConfig(baseDirectory)
|
||||||
|
|
||||||
val onNodeExit: () -> Unit = {
|
val onNodeExit: () -> Unit = {
|
||||||
localNetworkMap?.nodeInfosCopier?.removeConfig(baseDirectory)
|
localNetworkMap?.nodeInfosCopier?.removeConfig(baseDirectory)
|
||||||
countObservables.remove(config.corda.myLegalName)
|
visibilityHandle.close()
|
||||||
synchronized(nodeNames) {
|
|
||||||
nodeNames.remove(config.corda.myLegalName)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
val useHTTPS = config.typesafe.run { hasPath("useHTTPS") && getBoolean("useHTTPS") }
|
val useHTTPS = config.typesafe.run { hasPath("useHTTPS") && getBoolean("useHTTPS") }
|
||||||
@ -684,7 +626,7 @@ class DriverDSLImpl(
|
|||||||
)
|
)
|
||||||
return nodeAndThreadFuture.flatMap { (node, thread) ->
|
return nodeAndThreadFuture.flatMap { (node, thread) ->
|
||||||
establishRpc(config, openFuture()).flatMap { rpc ->
|
establishRpc(config, openFuture()).flatMap { rpc ->
|
||||||
allNodesConnected(rpc).map {
|
visibilityHandle.listen(rpc).map {
|
||||||
InProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, thread, onNodeExit, node)
|
InProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, thread, onNodeExit, node)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -707,12 +649,13 @@ class DriverDSLImpl(
|
|||||||
}
|
}
|
||||||
establishRpc(config, processDeathFuture).flatMap { rpc ->
|
establishRpc(config, processDeathFuture).flatMap { rpc ->
|
||||||
// Check for all nodes to have all other nodes in background in case RPC is failing over:
|
// Check for all nodes to have all other nodes in background in case RPC is failing over:
|
||||||
val networkMapFuture = executorService.fork { allNodesConnected(rpc) }.flatMap { it }
|
val networkMapFuture = executorService.fork { visibilityHandle.listen(rpc) }.flatMap { it }
|
||||||
firstOf(processDeathFuture, networkMapFuture) {
|
firstOf(processDeathFuture, networkMapFuture) {
|
||||||
if (it == processDeathFuture) {
|
if (it == processDeathFuture) {
|
||||||
throw ListenProcessDeathException(config.corda.p2pAddress, process)
|
throw ListenProcessDeathException(config.corda.p2pAddress, process)
|
||||||
}
|
}
|
||||||
// Will interrupt polling for process death as this is no longer relevant since the process been successfully started and reflected itself in the NetworkMap.
|
// Will interrupt polling for process death as this is no longer relevant since the process been
|
||||||
|
// successfully started and reflected itself in the NetworkMap.
|
||||||
processDeathFuture.cancel(true)
|
processDeathFuture.cancel(true)
|
||||||
log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: $webAddress")
|
log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: $webAddress")
|
||||||
OutOfProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, debugPort, process, onNodeExit)
|
OutOfProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, debugPort, process, onNodeExit)
|
||||||
@ -731,7 +674,7 @@ class DriverDSLImpl(
|
|||||||
/**
|
/**
|
||||||
* The local version of the network map, which is a bunch of classes that copy the relevant files to the node directories.
|
* The local version of the network map, which is a bunch of classes that copy the relevant files to the node directories.
|
||||||
*/
|
*/
|
||||||
private inner class LocalNetworkMap(notaryInfos: List<NotaryInfo>) {
|
inner class LocalNetworkMap(notaryInfos: List<NotaryInfo>) {
|
||||||
val networkParametersCopier = NetworkParametersCopier(networkParameters.copy(notaries = notaryInfos))
|
val networkParametersCopier = NetworkParametersCopier(networkParameters.copy(notaries = notaryInfos))
|
||||||
// TODO: this object will copy NodeInfo files from started nodes to other nodes additional-node-infos/
|
// TODO: this object will copy NodeInfo files from started nodes to other nodes additional-node-infos/
|
||||||
// This uses the FileSystem and adds a delay (~5 seconds) given by the time we wait before polling the file system.
|
// This uses the FileSystem and adds a delay (~5 seconds) given by the time we wait before polling the file system.
|
||||||
@ -743,12 +686,12 @@ class DriverDSLImpl(
|
|||||||
* Simple holder class to capture the node configuration both as the raw [Config] object and the parsed [NodeConfiguration].
|
* Simple holder class to capture the node configuration both as the raw [Config] object and the parsed [NodeConfiguration].
|
||||||
* Keeping [Config] around is needed as the user may specify extra config options not specified in [NodeConfiguration].
|
* Keeping [Config] around is needed as the user may specify extra config options not specified in [NodeConfiguration].
|
||||||
*/
|
*/
|
||||||
private class NodeConfig(val typesafe: Config, val corda: NodeConfiguration = typesafe.parseAsNodeConfiguration().also { nodeConfiguration ->
|
private class NodeConfig(val typesafe: Config, val corda: NodeConfiguration = typesafe.parseAsNodeConfiguration()) {
|
||||||
val errors = nodeConfiguration.validate()
|
init {
|
||||||
if (errors.isNotEmpty()) {
|
val errors = corda.validate()
|
||||||
throw IllegalStateException("Invalid node configuration. Errors where:${System.lineSeparator()}${errors.joinToString(System.lineSeparator())}")
|
require(errors.isEmpty()) { "Invalid node configuration. Errors where:\n${errors.joinToString("\n")}" }
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
internal val log = contextLogger()
|
internal val log = contextLogger()
|
||||||
@ -916,6 +859,63 @@ class DriverDSLImpl(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Keeps track of how many nodes each node sees and gates nodes from completing their startNode [CordaFuture] until all
|
||||||
|
* current nodes see everyone.
|
||||||
|
*/
|
||||||
|
private class NetworkVisibilityController {
|
||||||
|
private val nodeVisibilityHandles = ThreadBox(HashMap<CordaX500Name, VisibilityHandle>())
|
||||||
|
|
||||||
|
fun register(name: CordaX500Name): VisibilityHandle {
|
||||||
|
val handle = VisibilityHandle()
|
||||||
|
nodeVisibilityHandles.locked {
|
||||||
|
require(putIfAbsent(name, handle) == null) { "Node with name $name is already started or starting" }
|
||||||
|
}
|
||||||
|
return handle
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun checkIfAllVisible() {
|
||||||
|
nodeVisibilityHandles.locked {
|
||||||
|
val minView = values.stream().mapToInt { it.visibleNodeCount }.min().orElse(0)
|
||||||
|
if (minView >= size) {
|
||||||
|
values.forEach { it.future.set(Unit) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
inner class VisibilityHandle : AutoCloseable {
|
||||||
|
internal val future = openFuture<Unit>()
|
||||||
|
internal var visibleNodeCount = 0
|
||||||
|
private var subscription: Subscription? = null
|
||||||
|
|
||||||
|
fun listen(rpc: CordaRPCOps): CordaFuture<Unit> {
|
||||||
|
check(subscription == null)
|
||||||
|
val (snapshot, updates) = rpc.networkMapFeed()
|
||||||
|
visibleNodeCount = snapshot.size
|
||||||
|
checkIfAllVisible()
|
||||||
|
subscription = updates.subscribe { when (it) {
|
||||||
|
is NetworkMapCache.MapChange.Added -> {
|
||||||
|
visibleNodeCount++
|
||||||
|
checkIfAllVisible()
|
||||||
|
}
|
||||||
|
is NetworkMapCache.MapChange.Removed -> {
|
||||||
|
visibleNodeCount--
|
||||||
|
checkIfAllVisible()
|
||||||
|
}
|
||||||
|
} }
|
||||||
|
return future
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun close() {
|
||||||
|
subscription?.unsubscribe()
|
||||||
|
nodeVisibilityHandles.locked {
|
||||||
|
values -= this@VisibilityHandle
|
||||||
|
checkIfAllVisible()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
interface InternalDriverDSL : DriverDSL, CordformContext {
|
interface InternalDriverDSL : DriverDSL, CordformContext {
|
||||||
private companion object {
|
private companion object {
|
||||||
private val DEFAULT_POLL_INTERVAL = 500.millis
|
private val DEFAULT_POLL_INTERVAL = 500.millis
|
||||||
|
Loading…
Reference in New Issue
Block a user