Internal driver now also does the registration for the notaries. (#2304)

Using the --just-generate-node-info flag for the notary nodes so that their identities can be submitted to the network map server, which does the network parameters generation.
This commit is contained in:
Shams Asari 2018-01-02 15:12:30 +00:00 committed by GitHub
parent fe3c2b3983
commit 730fec2eb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 370 additions and 261 deletions

View File

@ -14,7 +14,6 @@ import net.corda.nodeapi.internal.config.NodeSSLConfiguration
import net.corda.nodeapi.internal.crypto.*
import org.slf4j.LoggerFactory
import java.nio.file.Path
import java.security.cert.X509Certificate
/**
* Contains utility methods for generating identities for a node.
@ -29,11 +28,8 @@ object DevIdentityGenerator {
const val NODE_IDENTITY_ALIAS_PREFIX = "identity"
const val DISTRIBUTED_NOTARY_ALIAS_PREFIX = "distributed-notary"
/**
* Install a node key store for the given node directory using the given legal name and an optional root cert. If no
* root cert is specified then the default one in certificates/cordadevcakeys.jks is used.
*/
fun installKeyStoreWithNodeIdentity(nodeDir: Path, legalName: CordaX500Name, customRootCert: X509Certificate? = null): Party {
/** Install a node key store for the given node directory using the given legal name. */
fun installKeyStoreWithNodeIdentity(nodeDir: Path, legalName: CordaX500Name): Party {
val nodeSslConfig = object : NodeSSLConfiguration {
override val baseDirectory = nodeDir
override val keyStorePassword: String = "cordacadevpass"
@ -43,8 +39,7 @@ object DevIdentityGenerator {
// TODO The passwords for the dev key stores are spread everywhere and should be constants in a single location
val caKeyStore = loadKeyStore(javaClass.classLoader.getResourceAsStream("certificates/cordadevcakeys.jks"), "cordacadevpass")
val intermediateCa = caKeyStore.getCertificateAndKeyPair(X509Utilities.CORDA_INTERMEDIATE_CA, "cordacadevkeypass")
// TODO If using a custom root cert, then the intermidate cert needs to be generated from it as well, and not taken from the default
val rootCert = customRootCert ?: caKeyStore.getCertificate(X509Utilities.CORDA_ROOT_CA)
val rootCert = caKeyStore.getCertificate(X509Utilities.CORDA_ROOT_CA)
nodeSslConfig.certificatesDirectory.createDirectories()
nodeSslConfig.createDevKeyStores(rootCert.toX509CertHolder(), intermediateCa, legalName)
@ -54,7 +49,7 @@ object DevIdentityGenerator {
return identity.party
}
fun generateDistributedNotaryIdentity(dirs: List<Path>, notaryName: CordaX500Name, threshold: Int = 1, customRootCert: X509Certificate? = null): Party {
fun generateDistributedNotaryIdentity(dirs: List<Path>, notaryName: CordaX500Name, threshold: Int = 1): Party {
require(dirs.isNotEmpty())
log.trace { "Generating identity \"$notaryName\" for nodes: ${dirs.joinToString()}" }
@ -63,8 +58,7 @@ object DevIdentityGenerator {
val caKeyStore = loadKeyStore(javaClass.classLoader.getResourceAsStream("certificates/cordadevcakeys.jks"), "cordacadevpass")
val intermediateCa = caKeyStore.getCertificateAndKeyPair(X509Utilities.CORDA_INTERMEDIATE_CA, "cordacadevkeypass")
// TODO If using a custom root cert, then the intermidate cert needs to be generated from it as well, and not taken from the default
val rootCert = customRootCert ?: caKeyStore.getCertificate(X509Utilities.CORDA_ROOT_CA)
val rootCert = caKeyStore.getCertificate(X509Utilities.CORDA_ROOT_CA)
keyPairs.zip(dirs) { keyPair, nodeDir ->
val (serviceKeyCert, compositeKeyCert) = listOf(keyPair.public, compositeKey).map { publicKey ->

View File

@ -4,12 +4,8 @@ import net.corda.core.CordaOID
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SignatureScheme
import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.CertRole
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.cert
import net.corda.core.internal.reader
import net.corda.core.internal.writer
import net.corda.core.internal.x500Name
import net.corda.core.internal.*
import net.corda.core.utilities.days
import net.corda.core.utilities.millis
import org.bouncycastle.asn1.*
@ -43,6 +39,7 @@ object X509Utilities {
val DEFAULT_IDENTITY_SIGNATURE_SCHEME = Crypto.EDDSA_ED25519_SHA512
val DEFAULT_TLS_SIGNATURE_SCHEME = Crypto.ECDSA_SECP256R1_SHA256
// TODO This class is more of a general purpose utility class and as such these constants belong elsewhere
// Aliases for private keys and certificates.
const val CORDA_ROOT_CA = "cordarootca"
const val CORDA_INTERMEDIATE_CA = "cordaintermediateca"

View File

@ -170,10 +170,10 @@ class X509UtilitiesTest {
override val trustStorePassword = "trustpass"
}
val (rootCert, intermediateCa) = createDevIntermediateCaCertPath()
val (rootCa, intermediateCa) = createDevIntermediateCaCertPath()
// Generate server cert and private key and populate another keystore suitable for SSL
sslConfig.createDevKeyStores(rootCert.certificate, intermediateCa, MEGA_CORP.name)
sslConfig.createDevKeyStores(rootCa.certificate, intermediateCa, MEGA_CORP.name)
// Load back server certificate
val serverKeyStore = loadKeyStore(sslConfig.nodeKeystore, sslConfig.keyStorePassword)
@ -206,11 +206,11 @@ class X509UtilitiesTest {
override val trustStorePassword = "trustpass"
}
val (rootCert, intermediateCa) = createDevIntermediateCaCertPath()
val (rootCa, intermediateCa) = createDevIntermediateCaCertPath()
// Generate server cert and private key and populate another keystore suitable for SSL
sslConfig.createDevKeyStores(rootCert.certificate, intermediateCa, MEGA_CORP.name)
sslConfig.createTrustStore(rootCert.certificate.cert)
sslConfig.createDevKeyStores(rootCa.certificate, intermediateCa, MEGA_CORP.name)
sslConfig.createTrustStore(rootCa.certificate.cert)
val keyStore = loadKeyStore(sslConfig.sslKeystore, sslConfig.keyStorePassword)
val trustStore = loadKeyStore(sslConfig.trustStoreFile, sslConfig.trustStorePassword)

View File

@ -1,6 +1,11 @@
package net.corda.node.services.network
import net.corda.cordform.CordformNode
import net.corda.core.crypto.SignedData
import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.div
import net.corda.core.internal.exists
import net.corda.core.internal.list
import net.corda.core.internal.readAll
import net.corda.core.node.NodeInfo
@ -10,9 +15,9 @@ import net.corda.core.utilities.seconds
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
import net.corda.nodeapi.internal.network.NetworkParameters
import net.corda.testing.ALICE_NAME
import net.corda.testing.ROOT_CA
import net.corda.testing.BOB_NAME
import net.corda.testing.SerializationEnvironmentRule
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.PortAllocation
import net.corda.testing.node.internal.CompatibilityZoneParams
@ -24,12 +29,15 @@ import org.junit.Before
import org.junit.Rule
import org.junit.Test
import java.net.URL
import java.time.Instant
import kotlin.streams.toList
import kotlin.test.assertEquals
class NetworkMapTest {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule(true)
private val cacheTimeout = 1.seconds
private val portAllocation = PortAllocation.Incremental(10000)
@ -40,7 +48,9 @@ class NetworkMapTest {
fun start() {
networkMapServer = NetworkMapServer(cacheTimeout, portAllocation.nextHostAndPort())
val address = networkMapServer.start()
compatibilityZone = CompatibilityZoneParams(URL("http://$address"))
compatibilityZone = CompatibilityZoneParams(URL("http://$address"), publishNotaries = {
networkMapServer.networkParameters = testNetworkParameters(it, modifiedTime = Instant.ofEpochMilli(random63BitValue()))
})
}
@After
@ -50,27 +60,35 @@ class NetworkMapTest {
@Test
fun `node correctly downloads and saves network parameters file on startup`() {
internalDriver(portAllocation = portAllocation, compatibilityZone = compatibilityZone,
initialiseSerialization = false, notarySpecs = emptyList()) {
internalDriver(
portAllocation = portAllocation,
compatibilityZone = compatibilityZone,
initialiseSerialization = false,
notarySpecs = emptyList()
) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val networkParameters = alice.configuration.baseDirectory
.list { paths -> paths.filter { it.fileName.toString() == NETWORK_PARAMS_FILE_NAME }.findFirst().get() }
val networkParameters = (alice.configuration.baseDirectory / NETWORK_PARAMS_FILE_NAME)
.readAll()
.deserialize<SignedData<NetworkParameters>>()
.verified()
// We use a random modified time above to make the network parameters unqiue so that we're sure they came
// from the server
assertEquals(networkMapServer.networkParameters, networkParameters)
}
}
@Test
fun `nodes can see each other using the http network map`() {
internalDriver(portAllocation = portAllocation, compatibilityZone = compatibilityZone,
initialiseSerialization = false, onNetworkParametersGeneration = { networkMapServer.networkParameters = it }) {
val alice = startNode(providedName = ALICE_NAME)
val bob = startNode(providedName = BOB_NAME)
val notaryNode = defaultNotaryNode.get()
val aliceNode = alice.get()
val bobNode = bob.get()
internalDriver(
portAllocation = portAllocation,
compatibilityZone = compatibilityZone,
initialiseSerialization = false
) {
val (aliceNode, bobNode, notaryNode) = listOf(
startNode(providedName = ALICE_NAME),
startNode(providedName = BOB_NAME),
defaultNotaryNode
).transpose().getOrThrow()
notaryNode.onlySees(notaryNode.nodeInfo, aliceNode.nodeInfo, bobNode.nodeInfo)
aliceNode.onlySees(notaryNode.nodeInfo, aliceNode.nodeInfo, bobNode.nodeInfo)
@ -80,16 +98,20 @@ class NetworkMapTest {
@Test
fun `nodes process network map add updates correctly when adding new node to network map`() {
internalDriver(portAllocation = portAllocation, compatibilityZone = compatibilityZone,
initialiseSerialization = false, onNetworkParametersGeneration = { networkMapServer.networkParameters = it }) {
val alice = startNode(providedName = ALICE_NAME)
val notaryNode = defaultNotaryNode.get()
val aliceNode = alice.get()
internalDriver(
portAllocation = portAllocation,
compatibilityZone = compatibilityZone,
initialiseSerialization = false
) {
val (aliceNode, notaryNode) = listOf(
startNode(providedName = ALICE_NAME),
defaultNotaryNode
).transpose().getOrThrow()
notaryNode.onlySees(notaryNode.nodeInfo, aliceNode.nodeInfo)
aliceNode.onlySees(notaryNode.nodeInfo, aliceNode.nodeInfo)
val bob = startNode(providedName = BOB_NAME)
val bobNode = bob.get()
val bobNode = startNode(providedName = BOB_NAME).getOrThrow()
// Wait for network map client to poll for the next update.
Thread.sleep(cacheTimeout.toMillis() * 2)
@ -102,13 +124,16 @@ class NetworkMapTest {
@Test
fun `nodes process network map remove updates correctly`() {
internalDriver(portAllocation = portAllocation, compatibilityZone = compatibilityZone,
initialiseSerialization = false, onNetworkParametersGeneration = { networkMapServer.networkParameters = it }) {
val alice = startNode(providedName = ALICE_NAME)
val bob = startNode(providedName = BOB_NAME)
val notaryNode = defaultNotaryNode.get()
val aliceNode = alice.get()
val bobNode = bob.get()
internalDriver(
portAllocation = portAllocation,
compatibilityZone = compatibilityZone,
initialiseSerialization = false
) {
val (aliceNode, bobNode, notaryNode) = listOf(
startNode(providedName = ALICE_NAME),
startNode(providedName = BOB_NAME),
defaultNotaryNode
).transpose().getOrThrow()
notaryNode.onlySees(notaryNode.nodeInfo, aliceNode.nodeInfo, bobNode.nodeInfo)
aliceNode.onlySees(notaryNode.nodeInfo, aliceNode.nodeInfo, bobNode.nodeInfo)
@ -124,5 +149,12 @@ class NetworkMapTest {
}
}
private fun NodeHandle.onlySees(vararg nodes: NodeInfo) = assertThat(rpc.networkMapSnapshot()).containsOnly(*nodes)
private fun NodeHandle.onlySees(vararg nodes: NodeInfo) {
// Make sure the nodes aren't getting the node infos from their additional directories
val nodeInfosDir = configuration.baseDirectory / CordformNode.NODE_INFO_DIRECTORY
if (nodeInfosDir.exists()) {
assertThat(nodeInfosDir.list { it.toList() }).isEmpty()
}
assertThat(rpc.networkMapSnapshot()).containsOnly(*nodes)
}
}

View File

@ -21,6 +21,7 @@ import net.corda.nodeapi.internal.crypto.X509Utilities.CORDA_INTERMEDIATE_CA
import net.corda.nodeapi.internal.crypto.X509Utilities.CORDA_ROOT_CA
import net.corda.testing.ROOT_CA
import net.corda.testing.SerializationEnvironmentRule
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.driver.PortAllocation
import net.corda.testing.node.NotarySpec
import net.corda.testing.node.internal.CompatibilityZoneParams
@ -31,10 +32,7 @@ import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.bouncycastle.pkcs.PKCS10CertificationRequest
import org.bouncycastle.pkcs.jcajce.JcaPKCS10CertificationRequest
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.*
import java.io.ByteArrayOutputStream
import java.io.InputStream
import java.net.URL
@ -49,6 +47,12 @@ import javax.ws.rs.core.MediaType
import javax.ws.rs.core.Response
class NodeRegistrationTest {
companion object {
private val notaryName = CordaX500Name("NotaryService", "Zurich", "CH")
private val aliceName = CordaX500Name("Alice", "London", "GB")
private val genevieveName = CordaX500Name("Genevieve", "London", "GB")
}
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule(true)
@ -72,37 +76,30 @@ class NodeRegistrationTest {
@Test
fun `node registration correct root cert`() {
val compatibilityZone = CompatibilityZoneParams(URL("http://$serverHostAndPort"), rootCert = ROOT_CA.certificate.cert)
val compatibilityZone = CompatibilityZoneParams(
URL("http://$serverHostAndPort"),
publishNotaries = { server.networkParameters = testNetworkParameters(it) },
rootCert = ROOT_CA.certificate.cert)
internalDriver(
portAllocation = portAllocation,
compatibilityZone = compatibilityZone,
initialiseSerialization = false,
notarySpecs = listOf(NotarySpec(CordaX500Name("NotaryService", "Zurich", "CH"), validating = false)),
extraCordappPackagesToScan = listOf("net.corda.finance"),
onNetworkParametersGeneration = { server.networkParameters = it }
notarySpecs = listOf(NotarySpec(notaryName)),
extraCordappPackagesToScan = listOf("net.corda.finance")
) {
val aliceName = "Alice"
val genevieveName = "Genevieve"
val nodes = listOf(
startNode(providedName = CordaX500Name(aliceName, "London", "GB")),
startNode(providedName = CordaX500Name(genevieveName, "London", "GB")),
startNode(providedName = aliceName),
startNode(providedName = genevieveName),
defaultNotaryNode
).transpose().getOrThrow()
val (alice, genevieve) = nodes
assertThat(registrationHandler.idsPolled).contains(aliceName, genevieveName)
// Notary identities are generated beforehand hence notary nodes don't go through registration.
// This test isn't specifically testing this, or relying on this behavior, though if this check fail,
// this will probably lead to the rest of the test to fail.
assertThat(registrationHandler.idsPolled).doesNotContain("NotaryService")
assertThat(registrationHandler.idsPolled).containsOnly(
aliceName.organisation,
genevieveName.organisation,
notaryName.organisation)
// Check each node has each other identity in their network map cache.
for (node in nodes) {
assertThat(node.rpc.networkMapSnapshot()).containsOnlyElementsOf(nodes.map { it.nodeInfo })
}
// Check we nodes communicate among themselves (and the notary).
// Check the nodes can communicate among themselves (and the notary).
val anonymous = false
genevieve.rpc.startFlow(
::CashIssueAndPaymentFlow,
@ -120,20 +117,22 @@ class NodeRegistrationTest {
val someRootCert = X509Utilities.createSelfSignedCACertificate(
CordaX500Name("Integration Test Corda Node Root CA", "R3 Ltd", "London", "GB"),
Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME))
val compatibilityZone = CompatibilityZoneParams(URL("http://$serverHostAndPort"), rootCert = someRootCert.cert)
val compatibilityZone = CompatibilityZoneParams(
URL("http://$serverHostAndPort"),
publishNotaries = { server.networkParameters = testNetworkParameters(it) },
rootCert = someRootCert.cert)
internalDriver(
portAllocation = portAllocation,
notarySpecs = emptyList(),
compatibilityZone = compatibilityZone,
initialiseSerialization = false,
notarySpecs = listOf(NotarySpec(notaryName)),
startNodesInProcess = true // We need to run the nodes in the same process so that we can capture the correct exception
) {
assertThatThrownBy {
startNode(providedName = CordaX500Name("Alice", "London", "GB")).getOrThrow()
defaultNotaryNode.getOrThrow()
}.isInstanceOf(CertPathValidatorException::class.java)
}
}
}
@Path("certificate")
@ -150,6 +149,7 @@ class RegistrationHandler(private val rootCertAndKeyPair: CertificateAndKeyPair)
certificationRequest,
rootCertAndKeyPair.keyPair,
arrayOf(rootCertAndKeyPair.certificate.cert))
require(!name.organisation.contains("\\s".toRegex())) { "Whitespace in the organisation name not supported" }
certPaths[name.organisation] = certPath
return Response.ok(name.organisation).build()
}

View File

@ -180,13 +180,13 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
return SignedNodeInfo(serialised, listOf(signature))
}
open fun generateNodeInfo() {
open fun generateAndSaveNodeInfo(): NodeInfo {
check(started == null) { "Node has already been started" }
log.info("Generating nodeInfo ...")
initCertificate()
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas)
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)) { database ->
return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)) { database ->
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like
// a code smell.
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
@ -196,6 +196,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
privateKey.sign(serialised.bytes)
}
NodeInfoWatcher.saveToFile(configuration.baseDirectory, signedNodeInfo)
info
}
}

View File

@ -19,7 +19,9 @@ import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.serialization.KryoServerSerializationScheme
import net.corda.node.services.api.SchemaService
import net.corda.node.services.config.*
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.SecurityConfiguration
import net.corda.node.services.config.VerifierType
import net.corda.node.services.messaging.*
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.utilities.AddressUtils
@ -32,6 +34,7 @@ import net.corda.nodeapi.internal.serialization.*
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import rx.Scheduler
import rx.schedulers.Schedulers
import java.time.Clock
import java.util.concurrent.atomic.AtomicInteger
@ -264,15 +267,13 @@ open class Node(configuration: NodeConfiguration,
private val _startupComplete = openFuture<Unit>()
val startupComplete: CordaFuture<Unit> get() = _startupComplete
override fun generateNodeInfo() {
override fun generateAndSaveNodeInfo(): NodeInfo {
initialiseSerialization()
super.generateNodeInfo()
return super.generateAndSaveNodeInfo()
}
override fun start(): StartedNode<Node> {
if (initialiseSerialization) {
initialiseSerialization()
}
initialiseSerialization()
val started: StartedNode<Node> = uncheckedCast(super.start())
nodeReadyFuture.thenMatch({
serverThread.execute {
@ -305,8 +306,10 @@ open class Node(configuration: NodeConfiguration,
return started
}
override fun getRxIoScheduler() = Schedulers.io()!!
override fun getRxIoScheduler(): Scheduler = Schedulers.io()
private fun initialiseSerialization() {
if (!initialiseSerialization) return
val classloader = cordappLoader.appClassLoader
nodeSerializationEnv = SerializationEnvironmentImpl(
SerializationFactoryImpl().apply {

View File

@ -1,7 +1,6 @@
package net.corda.node.internal
import com.jcabi.manifests.Manifests
import com.typesafe.config.ConfigException
import joptsimple.OptionException
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.thenMatch
@ -126,7 +125,7 @@ open class NodeStartup(val args: Array<String>) {
val node = createNode(conf, versionInfo)
if (cmdlineOptions.justGenerateNodeInfo) {
// Perform the minimum required start-up logic to be able to write a nodeInfo to disk
node.generateNodeInfo()
node.generateAndSaveNodeInfo()
return
}
val startedNode = node.start()

View File

@ -5,7 +5,10 @@ import net.corda.core.crypto.sha256
import net.corda.core.internal.cert
import net.corda.core.serialization.serialize
import net.corda.core.utilities.seconds
import net.corda.testing.*
import net.corda.testing.ALICE_NAME
import net.corda.testing.BOB_NAME
import net.corda.testing.DEV_TRUST_ROOT
import net.corda.testing.SerializationEnvironmentRule
import net.corda.testing.driver.PortAllocation
import net.corda.testing.internal.createNodeInfoAndSigned
import net.corda.testing.node.internal.network.NetworkMapServer
@ -30,7 +33,7 @@ class NetworkMapClientTest {
@Before
fun setUp() {
server = NetworkMapServer(cacheTimeout, PortAllocation.Incremental(10000).nextHostAndPort(), root_ca = ROOT_CA)
server = NetworkMapServer(cacheTimeout, PortAllocation.Incremental(10000).nextHostAndPort())
val hostAndPort = server.start()
networkMapClient = NetworkMapClient(URL("http://${hostAndPort.host}:${hostAndPort.port}"), DEV_TRUST_ROOT.cert)
}

View File

@ -14,11 +14,11 @@ import net.corda.node.internal.StartedNode
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.VerifierType
import net.corda.nodeapi.internal.config.User
import net.corda.testing.DUMMY_NOTARY_NAME
import net.corda.testing.node.NotarySpec
import net.corda.testing.node.internal.DriverDSLImpl
import net.corda.testing.node.internal.genericDriver
import net.corda.testing.node.internal.getTimestampAsDirectoryName
import net.corda.testing.DUMMY_NOTARY_NAME
import net.corda.testing.node.NotarySpec
import java.net.InetSocketAddress
import java.net.ServerSocket
import java.nio.file.Path
@ -193,8 +193,7 @@ fun <A> driver(
notarySpecs = notarySpecs,
extraCordappPackagesToScan = extraCordappPackagesToScan,
jmxPolicy = jmxPolicy,
compatibilityZone = null,
onNetworkParametersGeneration = { }
compatibilityZone = null
),
coerce = { it },
dsl = dsl,

View File

@ -11,13 +11,11 @@ import net.corda.cordform.CordformNode
import net.corda.core.concurrent.CordaFuture
import net.corda.core.concurrent.firstOf
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.*
import net.corda.core.internal.copyTo
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.serialization.deserialize
import net.corda.core.toFuture
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
@ -31,6 +29,7 @@ import net.corda.node.services.config.*
import net.corda.node.utilities.registration.HTTPNetworkRegistrationService
import net.corda.node.utilities.registration.NetworkRegistrationHelper
import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.addShutdownHook
import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.config.parseAs
@ -39,7 +38,6 @@ import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.crypto.addOrReplaceCertificate
import net.corda.nodeapi.internal.crypto.loadOrCreateKeyStore
import net.corda.nodeapi.internal.crypto.save
import net.corda.nodeapi.internal.network.NetworkParameters
import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier
import net.corda.nodeapi.internal.network.NotaryInfo
@ -89,24 +87,23 @@ class DriverDSLImpl(
extraCordappPackagesToScan: List<String>,
val jmxPolicy: JmxPolicy,
val notarySpecs: List<NotarySpec>,
val compatibilityZone: CompatibilityZoneParams?,
val onNetworkParametersGeneration: (NetworkParameters) -> Unit
val compatibilityZone: CompatibilityZoneParams?
) : InternalDriverDSL {
private var _executorService: ScheduledExecutorService? = null
val executorService get() = _executorService!!
private var _shutdownManager: ShutdownManager? = null
override val shutdownManager get() = _shutdownManager!!
private val cordappPackages = extraCordappPackagesToScan + getCallerPackage()
// TODO: this object will copy NodeInfo files from started nodes to other nodes additional-node-infos/
// This uses the FileSystem and adds a delay (~5 seconds) given by the time we wait before polling the file system.
// Investigate whether we can avoid that.
private var nodeInfoFilesCopier: NodeInfoFilesCopier? = null
// Map from a nodes legal name to an observable emitting the number of nodes in its network map.
private val countObservables = mutableMapOf<CordaX500Name, Observable<Int>>()
private lateinit var _notaries: List<NotaryHandle>
override val notaryHandles: List<NotaryHandle> get() = _notaries
private var networkParametersCopier: NetworkParametersCopier? = null
lateinit var networkParameters: NetworkParameters
/**
* Future which completes when the network map is available, whether a local one or one from the CZ. This future acts
* as a gate to prevent nodes from starting too early. The value of the future is a [LocalNetworkMap] object, which
* is null if the network map is being provided by the CZ.
*/
private lateinit var networkMapAvailability: CordaFuture<LocalNetworkMap?>
private lateinit var _notaries: CordaFuture<List<NotaryHandle>>
override val notaryHandles: List<NotaryHandle> get() = _notaries.getOrThrow()
class State {
val processes = ArrayList<Process>()
@ -147,12 +144,12 @@ class DriverDSLImpl(
_executorService?.shutdownNow()
}
private fun establishRpc(config: NodeConfiguration, processDeathFuture: CordaFuture<out Process>): CordaFuture<CordaRPCOps> {
val rpcAddress = config.rpcAddress!!
private fun establishRpc(config: NodeConfig, processDeathFuture: CordaFuture<out Process>): CordaFuture<CordaRPCOps> {
val rpcAddress = config.corda.rpcAddress!!
val client = CordaRPCClient(rpcAddress)
val connectionFuture = poll(executorService, "RPC connection") {
try {
client.start(config.rpcUsers[0].username, config.rpcUsers[0].password)
config.corda.rpcUsers[0].run { client.start(username, password) }
} catch (e: Exception) {
if (processDeathFuture.isDone) throw e
log.error("Exception $e, Retrying RPC connection at $rpcAddress")
@ -180,65 +177,74 @@ class DriverDSLImpl(
): CordaFuture<NodeHandle> {
val p2pAddress = portAllocation.nextHostAndPort()
// TODO: Derive name from the full picked name, don't just wrap the common name
val name = providedName ?: CordaX500Name(organisation = "${oneOf(names).organisation}-${p2pAddress.port}", locality = "London", country = "GB")
val isNotary = name in notarySpecs.map { it.name }
val registrationFuture = if (compatibilityZone?.rootCert != null && !isNotary) {
nodeRegistration(name, compatibilityZone.rootCert, compatibilityZone.url)
val name = providedName ?: CordaX500Name("${oneOf(names).organisation}-${p2pAddress.port}", "London", "GB")
val registrationFuture = if (compatibilityZone?.rootCert != null) {
// We don't need the network map to be available to be able to register the node
startNodeRegistration(name, compatibilityZone.rootCert, compatibilityZone.url)
} else {
doneFuture(Unit)
}
return registrationFuture.flatMap {
val rpcAddress = portAllocation.nextHostAndPort()
val webAddress = portAllocation.nextHostAndPort()
val users = rpcUsers.map { it.copy(permissions = it.permissions + DRIVER_REQUIRED_PERMISSIONS) }
val configMap = configOf(
"myLegalName" to name.toString(),
"p2pAddress" to p2pAddress.toString(),
"rpcAddress" to rpcAddress.toString(),
"webAddress" to webAddress.toString(),
"useTestClock" to useTestClock,
"rpcUsers" to if (users.isEmpty()) defaultRpcUserList else users.map { it.toConfig().root().unwrapped() },
"verifierType" to verifierType.name
) + customOverrides
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
configOverrides = if (compatibilityZone != null) {
configMap + mapOf("compatibilityZoneURL" to compatibilityZone.url.toString())
} else {
configMap
}
)
startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize)
networkMapAvailability.flatMap {
// But starting the node proper does require the network map
startRegisteredNode(name, it, rpcUsers, verifierType, customOverrides, startInSameProcess, maximumHeapSize, p2pAddress)
}
}
}
private fun nodeRegistration(providedName: CordaX500Name, rootCert: X509Certificate, compatibilityZoneURL: URL): CordaFuture<Unit> {
private fun startRegisteredNode(name: CordaX500Name,
localNetworkMap: LocalNetworkMap?,
rpcUsers: List<User>,
verifierType: VerifierType,
customOverrides: Map<String, Any?>,
startInSameProcess: Boolean? = null,
maximumHeapSize: String = "200m",
p2pAddress: NetworkHostAndPort = portAllocation.nextHostAndPort()): CordaFuture<NodeHandle> {
val rpcAddress = portAllocation.nextHostAndPort()
val webAddress = portAllocation.nextHostAndPort()
val users = rpcUsers.map { it.copy(permissions = it.permissions + DRIVER_REQUIRED_PERMISSIONS) }
val czUrlConfig = if (compatibilityZone != null) mapOf("compatibilityZoneURL" to compatibilityZone.url.toString()) else emptyMap()
val config = NodeConfig(ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
configOverrides = configOf(
"myLegalName" to name.toString(),
"p2pAddress" to p2pAddress.toString(),
"rpcAddress" to rpcAddress.toString(),
"webAddress" to webAddress.toString(),
"useTestClock" to useTestClock,
"rpcUsers" to if (users.isEmpty()) defaultRpcUserList else users.map { it.toConfig().root().unwrapped() },
"verifierType" to verifierType.name
) + czUrlConfig + customOverrides
))
return startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize, localNetworkMap)
}
private fun startNodeRegistration(providedName: CordaX500Name, rootCert: X509Certificate, compatibilityZoneURL: URL): CordaFuture<NodeConfig> {
val baseDirectory = baseDirectory(providedName).createDirectories()
val config = ConfigHelper.loadConfig(
val config = NodeConfig(ConfigHelper.loadConfig(
baseDirectory = baseDirectory,
allowMissingConfig = true,
configOverrides = configOf(
"p2pAddress" to "localhost:1222", // required argument, not really used
"compatibilityZoneURL" to compatibilityZoneURL.toString(),
"myLegalName" to providedName.toString())
)
val configuration = config.parseAsNodeConfiguration()
))
configuration.trustStoreFile.parent.createDirectories()
loadOrCreateKeyStore(configuration.trustStoreFile, configuration.trustStorePassword).also {
it.addOrReplaceCertificate(X509Utilities.CORDA_ROOT_CA, rootCert)
it.save(configuration.trustStoreFile, configuration.trustStorePassword)
config.corda.certificatesDirectory.createDirectories()
loadOrCreateKeyStore(config.corda.trustStoreFile, config.corda.trustStorePassword).apply {
addOrReplaceCertificate(X509Utilities.CORDA_ROOT_CA, rootCert)
save(config.corda.trustStoreFile, config.corda.trustStorePassword)
}
return if (startNodesInProcess) {
// This is a bit cheating, we're not starting a full node, we're just calling the code nodes call
// when registering.
NetworkRegistrationHelper(configuration, HTTPNetworkRegistrationService(compatibilityZoneURL)).buildKeystore()
doneFuture(Unit)
executorService.fork {
NetworkRegistrationHelper(config.corda, HTTPNetworkRegistrationService(compatibilityZoneURL)).buildKeystore()
config
}
} else {
startOutOfProcessNodeRegistration(config, configuration)
startOutOfProcessMiniNode(config, "--initial-registration").map { config }
}
}
@ -249,7 +255,9 @@ class DriverDSLImpl(
}
internal fun startCordformNodes(cordforms: List<CordformNode>): CordaFuture<*> {
check(compatibilityZone == null) { "Cordform nodes should be run without compatibilityZone configuration" }
check(notarySpecs.isEmpty()) { "Specify notaries in the CordformDefinition" }
check(compatibilityZone == null) { "Cordform nodes cannot be run with compatibilityZoneURL" }
val clusterNodes = HashMultimap.create<ClusterType, CordaX500Name>()
val notaryInfos = ArrayList<NotaryInfo>()
@ -281,12 +289,10 @@ class DriverDSLImpl(
notaryInfos += NotaryInfo(identity, type.validating)
}
networkParameters = testNetworkParameters(notaryInfos)
onNetworkParametersGeneration(networkParameters)
networkParametersCopier = NetworkParametersCopier(networkParameters)
val localNetworkMap = LocalNetworkMap(notaryInfos)
return cordforms.map {
val startedNode = startCordformNode(it)
val startedNode = startCordformNode(it, localNetworkMap)
if (it.webAddress != null) {
// Start a webserver if an address for it was specified
startedNode.flatMap { startWebserver(it) }
@ -296,21 +302,21 @@ class DriverDSLImpl(
}.transpose()
}
private fun startCordformNode(cordform: CordformNode): CordaFuture<NodeHandle> {
private fun startCordformNode(cordform: CordformNode, localNetworkMap: LocalNetworkMap): CordaFuture<NodeHandle> {
val name = CordaX500Name.parse(cordform.name)
// TODO We shouldn't have to allocate an RPC or web address if they're not specified. We're having to do this because of startNodeInternal
val rpcAddress = if (cordform.rpcAddress == null) mapOf("rpcAddress" to portAllocation.nextHostAndPort().toString()) else emptyMap()
val webAddress = cordform.webAddress?.let { NetworkHostAndPort.parse(it) } ?: portAllocation.nextHostAndPort()
val notary = if (cordform.notary != null) mapOf("notary" to cordform.notary) else emptyMap()
val rpcUsers = cordform.rpcUsers
val config = ConfigHelper.loadConfig(
val config = NodeConfig(ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
configOverrides = cordform.config + rpcAddress + notary + mapOf(
"rpcUsers" to if (rpcUsers.isEmpty()) defaultRpcUserList else rpcUsers
)
)
return startNodeInternal(config, webAddress, null, "200m")
))
return startNodeInternal(config, webAddress, null, "200m", localNetworkMap)
}
private fun queryWebserver(handle: NodeHandle, process: Process): WebserverHandle {
@ -344,33 +350,80 @@ class DriverDSLImpl(
}
_executorService = Executors.newScheduledThreadPool(2, ThreadFactoryBuilder().setNameFormat("driver-pool-thread-%d").build())
_shutdownManager = ShutdownManager(executorService)
if (compatibilityZone == null) {
// Without a compatibility zone URL we have to copy the node info files ourselves to make sure the nodes see each other
nodeInfoFilesCopier = NodeInfoFilesCopier().also {
shutdownManager.registerShutdown(it::close)
val notaryInfosFuture = if (compatibilityZone == null) {
// If no CZ is specified then the driver does the generation of the network parameters and the copying of the
// node info files.
startNotaryIdentityGeneration().map { notaryInfos -> Pair(notaryInfos, LocalNetworkMap(notaryInfos)) }
} else {
// Otherwise it's the CZ's job to distribute thse via the HTTP network map, as that is what the nodes will be expecting.
val notaryInfosFuture = if (compatibilityZone.rootCert == null) {
// No root cert specified so we use the dev root cert to generate the notary identities.
startNotaryIdentityGeneration()
} else {
// With a root cert specified we delegate generation of the notary identities to the CZ.
startAllNotaryRegistrations(compatibilityZone.rootCert, compatibilityZone.url)
}
notaryInfosFuture.map { notaryInfos ->
compatibilityZone.publishNotaries(notaryInfos)
Pair(notaryInfos, null)
}
}
networkMapAvailability = notaryInfosFuture.map { it.second }
_notaries = notaryInfosFuture.map { (notaryInfos, localNetworkMap) ->
val listOfFutureNodeHandles = startNotaries(localNetworkMap)
notaryInfos.zip(listOfFutureNodeHandles) { (identity, validating), nodeHandlesFuture ->
NotaryHandle(identity, validating, nodeHandlesFuture)
}
}
val notaryInfos = generateNotaryIdentities()
networkParameters = testNetworkParameters(notaryInfos)
onNetworkParametersGeneration(networkParameters)
// The network parameters must be serialised before starting any of the nodes
networkParametersCopier = NetworkParametersCopier(networkParameters)
val nodeHandles = startNotaries()
_notaries = notaryInfos.zip(nodeHandles) { (identity, validating), nodes -> NotaryHandle(identity, validating, nodes) }
}
private fun generateNotaryIdentities(): List<NotaryInfo> {
return notarySpecs.map { spec ->
val identity = if (spec.cluster == null) {
DevIdentityGenerator.installKeyStoreWithNodeIdentity(baseDirectory(spec.name), spec.name, compatibilityZone?.rootCert)
} else {
DevIdentityGenerator.generateDistributedNotaryIdentity(
dirs = generateNodeNames(spec).map { baseDirectory(it) },
notaryName = spec.name,
customRootCert = compatibilityZone?.rootCert
)
private fun startNotaryIdentityGeneration(): CordaFuture<List<NotaryInfo>> {
return executorService.fork {
notarySpecs.map { spec ->
val identity = if (spec.cluster == null) {
DevIdentityGenerator.installKeyStoreWithNodeIdentity(baseDirectory(spec.name), spec.name)
} else {
DevIdentityGenerator.generateDistributedNotaryIdentity(
dirs = generateNodeNames(spec).map { baseDirectory(it) },
notaryName = spec.name
)
}
NotaryInfo(identity, spec.validating)
}
}
}
private fun startAllNotaryRegistrations(rootCert: X509Certificate, compatibilityZoneURL: URL): CordaFuture<List<NotaryInfo>> {
// Start the registration process for all the notaries together then wait for their responses.
return notarySpecs.map { spec ->
require(spec.cluster == null) { "Registering distributed notaries not supported" }
startNotaryRegistration(spec, rootCert, compatibilityZoneURL)
}.transpose()
}
private fun startNotaryRegistration(spec: NotarySpec, rootCert: X509Certificate, compatibilityZoneURL: URL): CordaFuture<NotaryInfo> {
return startNodeRegistration(spec.name, rootCert, compatibilityZoneURL).flatMap { config ->
// Node registration only gives us the node CA cert, not the identity cert. That is only created on first
// startup or when the node is told to just generate its node info file. We do that here.
if (startNodesInProcess) {
executorService.fork {
val nodeInfo = Node(config.corda, MOCK_VERSION_INFO, initialiseSerialization = false).generateAndSaveNodeInfo()
NotaryInfo(nodeInfo.legalIdentities[0], spec.validating)
}
} else {
// TODO The config we use here is uses a hardocded p2p port which changes when the node is run proper
// This causes two node info files to be generated.
startOutOfProcessMiniNode(config, "--just-generate-node-info").map {
// Once done we have to read the signed node info file that's been generated
val nodeInfoFile = config.corda.baseDirectory.list { paths ->
paths.filter { it.fileName.toString().startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }.findFirst().get()
}
val nodeInfo = nodeInfoFile.readAll().deserialize<SignedNodeInfo>().verified()
NotaryInfo(nodeInfo.legalIdentities[0], spec.validating)
}
}
NotaryInfo(identity, spec.validating)
}
}
@ -378,11 +431,11 @@ class DriverDSLImpl(
return (0 until spec.cluster!!.clusterSize).map { spec.name.copy(organisation = "${spec.name.organisation}-$it") }
}
private fun startNotaries(): List<CordaFuture<List<NodeHandle>>> {
private fun startNotaries(localNetworkMap: LocalNetworkMap?): List<CordaFuture<List<NodeHandle>>> {
return notarySpecs.map {
when {
it.cluster == null -> startSingleNotary(it)
it.cluster is ClusterSpec.Raft -> startRaftNotaryCluster(it)
it.cluster == null -> startSingleNotary(it, localNetworkMap)
it.cluster is ClusterSpec.Raft -> startRaftNotaryCluster(it, localNetworkMap)
else -> throw IllegalArgumentException("BFT-SMaRt not supported")
}
}
@ -392,16 +445,17 @@ class DriverDSLImpl(
// generating the configs for the nodes, probably making use of Any.toConfig()
private fun NotaryConfig.toConfigMap(): Map<String, Any> = mapOf("notary" to toConfig().root().unwrapped())
private fun startSingleNotary(spec: NotarySpec): CordaFuture<List<NodeHandle>> {
return startNode(
providedName = spec.name,
rpcUsers = spec.rpcUsers,
verifierType = spec.verifierType,
private fun startSingleNotary(spec: NotarySpec, localNetworkMap: LocalNetworkMap?): CordaFuture<List<NodeHandle>> {
return startRegisteredNode(
spec.name,
localNetworkMap,
spec.rpcUsers,
spec.verifierType,
customOverrides = NotaryConfig(spec.validating).toConfigMap()
).map { listOf(it) }
}
private fun startRaftNotaryCluster(spec: NotarySpec): CordaFuture<List<NodeHandle>> {
private fun startRaftNotaryCluster(spec: NotarySpec, localNetworkMap: LocalNetworkMap?): CordaFuture<List<NodeHandle>> {
fun notaryConfig(nodeAddress: NetworkHostAndPort, clusterAddress: NetworkHostAndPort? = null): Map<String, Any> {
val clusterAddresses = if (clusterAddress != null) listOf(clusterAddress) else emptyList()
val config = NotaryConfig(
@ -414,10 +468,11 @@ class DriverDSLImpl(
val clusterAddress = portAllocation.nextHostAndPort()
// Start the first node that will bootstrap the cluster
val firstNodeFuture = startNode(
providedName = nodeNames[0],
rpcUsers = spec.rpcUsers,
verifierType = spec.verifierType,
val firstNodeFuture = startRegisteredNode(
nodeNames[0],
localNetworkMap,
spec.rpcUsers,
spec.verifierType,
customOverrides = notaryConfig(clusterAddress) + mapOf(
"database.serverNameTablePrefix" to nodeNames[0].toString().replace(Regex("[^0-9A-Za-z]+"), "")
)
@ -426,10 +481,11 @@ class DriverDSLImpl(
// All other nodes will join the cluster
val restNodeFutures = nodeNames.drop(1).map {
val nodeAddress = portAllocation.nextHostAndPort()
startNode(
providedName = it,
rpcUsers = spec.rpcUsers,
verifierType = spec.verifierType,
startRegisteredNode(
it,
localNetworkMap,
spec.rpcUsers,
spec.verifierType,
customOverrides = notaryConfig(nodeAddress, clusterAddress) + mapOf(
"database.serverNameTablePrefix" to it.toString().replace(Regex("[^0-9A-Za-z]+"), "")
)
@ -446,8 +502,6 @@ class DriverDSLImpl(
return driverDirectory / nodeDirectoryName
}
override fun baseDirectory(nodeName: String): Path = baseDirectory(CordaX500Name.parse(nodeName))
/**
* @param initial number of nodes currently in the network map of a running node.
* @param networkMapCacheChangeObservable an observable returning the updates to the node network map.
@ -457,7 +511,7 @@ class DriverDSLImpl(
private fun nodeCountObservable(initial: Int, networkMapCacheChangeObservable: Observable<NetworkMapCache.MapChange>):
ConnectableObservable<Int> {
val count = AtomicInteger(initial)
return networkMapCacheChangeObservable.map { it ->
return networkMapCacheChangeObservable.map {
when (it) {
is NetworkMapCache.MapChange.Added -> count.incrementAndGet()
is NetworkMapCache.MapChange.Removed -> count.decrementAndGet()
@ -492,35 +546,44 @@ class DriverDSLImpl(
return future
}
private fun startOutOfProcessNodeRegistration(config: Config, configuration: NodeConfiguration): CordaFuture<Unit> {
/**
* Start the node with the given flag which is expected to start the node for some function, which once complete will
* terminate the node.
*/
private fun startOutOfProcessMiniNode(config: NodeConfig, extraCmdLineFlag: String): CordaFuture<Unit> {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val monitorPort = if (jmxPolicy.startJmxHttpServer) jmxPolicy.jmxHttpServerPortAllocation?.nextPort() else null
val process = startOutOfProcessNode(configuration, config, quasarJarPath, debugPort, jolokiaJarPath, monitorPort,
systemProperties, cordappPackages, "200m", initialRegistration = true)
val process = startOutOfProcessNode(
config,
quasarJarPath,
debugPort,
jolokiaJarPath,
monitorPort,
systemProperties,
cordappPackages,
"200m",
extraCmdLineFlag
)
return poll(executorService, "node registration (${configuration.myLegalName})") {
return poll(executorService, "$extraCmdLineFlag (${config.corda.myLegalName})") {
if (process.isAlive) null else Unit
}
}
private fun startNodeInternal(config: Config,
private fun startNodeInternal(config: NodeConfig,
webAddress: NetworkHostAndPort,
startInProcess: Boolean?,
maximumHeapSize: String): CordaFuture<NodeHandle> {
val configuration = config.parseAsNodeConfiguration()
val baseDirectory = configuration.baseDirectory.createDirectories()
nodeInfoFilesCopier?.addConfig(baseDirectory)
if (configuration.myLegalName in networkParameters.notaries.map { it.identity.name } || compatibilityZone == null) {
// If a notary is being started, then its identity appears in networkParameters (generated by Driver),
// and Driver itself must pass the network parameters to the notary.
networkParametersCopier?.install(baseDirectory)
}
maximumHeapSize: String,
localNetworkMap: LocalNetworkMap?): CordaFuture<NodeHandle> {
val baseDirectory = config.corda.baseDirectory.createDirectories()
localNetworkMap?.networkParametersCopier?.install(baseDirectory)
localNetworkMap?.nodeInfosCopier?.addConfig(baseDirectory)
val onNodeExit: () -> Unit = {
nodeInfoFilesCopier?.removeConfig(baseDirectory)
countObservables.remove(configuration.myLegalName)
localNetworkMap?.nodeInfosCopier?.removeConfig(baseDirectory)
countObservables.remove(config.corda.myLegalName)
}
if (startInProcess ?: startNodesInProcess) {
val nodeAndThreadFuture = startInProcessNode(executorService, configuration, config, cordappPackages)
val nodeAndThreadFuture = startInProcessNode(executorService, config, cordappPackages)
shutdownManager.registerShutdown(
nodeAndThreadFuture.map { (node, thread) ->
{
@ -530,16 +593,16 @@ class DriverDSLImpl(
}
)
return nodeAndThreadFuture.flatMap { (node, thread) ->
establishRpc(configuration, openFuture()).flatMap { rpc ->
establishRpc(config, openFuture()).flatMap { rpc ->
allNodesConnected(rpc).map {
NodeHandle.InProcess(rpc.nodeInfo(), rpc, configuration, webAddress, node, thread, onNodeExit)
NodeHandle.InProcess(rpc.nodeInfo(), rpc, config.corda, webAddress, node, thread, onNodeExit)
}
}
}
} else {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val monitorPort = if (jmxPolicy.startJmxHttpServer) jmxPolicy.jmxHttpServerPortAllocation?.nextPort() else null
val process = startOutOfProcessNode(configuration, config, quasarJarPath, debugPort, jolokiaJarPath, monitorPort, systemProperties, cordappPackages, maximumHeapSize, initialRegistration = false)
val process = startOutOfProcessNode(config, quasarJarPath, debugPort, jolokiaJarPath, monitorPort, systemProperties, cordappPackages, maximumHeapSize, null)
if (waitForNodesToFinish) {
state.locked {
processes += process
@ -547,22 +610,21 @@ class DriverDSLImpl(
} else {
shutdownManager.registerProcessShutdown(process)
}
val p2pReadyFuture = addressMustBeBoundFuture(executorService, configuration.p2pAddress, process)
val p2pReadyFuture = addressMustBeBoundFuture(executorService, config.corda.p2pAddress, process)
return p2pReadyFuture.flatMap {
val processDeathFuture = poll(executorService, "process death while waiting for RPC (${configuration.myLegalName})") {
val processDeathFuture = poll(executorService, "process death while waiting for RPC (${config.corda.myLegalName})") {
if (process.isAlive) null else process
}
establishRpc(configuration, processDeathFuture).flatMap { rpc ->
establishRpc(config, processDeathFuture).flatMap { rpc ->
// Check for all nodes to have all other nodes in background in case RPC is failing over:
val networkMapFuture = executorService.fork { allNodesConnected(rpc) }.flatMap { it }
firstOf(processDeathFuture, networkMapFuture) {
if (it == processDeathFuture) {
throw ListenProcessDeathException(configuration.p2pAddress, process)
throw ListenProcessDeathException(config.corda.p2pAddress, process)
}
processDeathFuture.cancel(false)
log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: $webAddress")
NodeHandle.OutOfProcess(rpc.nodeInfo(), rpc, configuration, webAddress, debugPort, process,
onNodeExit)
NodeHandle.OutOfProcess(rpc.nodeInfo(), rpc, config.corda, webAddress, debugPort, process, onNodeExit)
}
}
}
@ -575,6 +637,25 @@ class DriverDSLImpl(
return pollFuture
}
/**
* The local version of the network map, which is a bunch of classes that copy the relevant files to the node directories.
*/
private inner class LocalNetworkMap(notaryInfos: List<NotaryInfo>) {
val networkParametersCopier = NetworkParametersCopier(testNetworkParameters(notaryInfos))
// TODO: this object will copy NodeInfo files from started nodes to other nodes additional-node-infos/
// This uses the FileSystem and adds a delay (~5 seconds) given by the time we wait before polling the file system.
// Investigate whether we can avoid that.
val nodeInfosCopier = NodeInfoFilesCopier().also { shutdownManager.registerShutdown(it::close) }
}
/**
* Simple holder class to capture the node configuration both as the raw [Config] object and the parsed [NodeConfiguration].
* Keeping [Config] around is needed as the user may specify extra config options not specified in [NodeConfiguration].
*/
private class NodeConfig(val typesafe: Config) {
val corda: NodeConfiguration = typesafe.parseAsNodeConfiguration()
}
companion object {
internal val log = contextLogger()
@ -602,28 +683,26 @@ class DriverDSLImpl(
private fun startInProcessNode(
executorService: ScheduledExecutorService,
nodeConf: NodeConfiguration,
config: Config,
config: NodeConfig,
cordappPackages: List<String>
): CordaFuture<Pair<StartedNode<Node>, Thread>> {
return executorService.fork {
log.info("Starting in-process Node ${nodeConf.myLegalName.organisation}")
log.info("Starting in-process Node ${config.corda.myLegalName.organisation}")
// Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config)
writeConfig(config.corda.baseDirectory, "node.conf", config.typesafe)
// TODO pass the version in?
val node = InProcessNode(nodeConf, MOCK_VERSION_INFO, cordappPackages).start()
val nodeThread = thread(name = nodeConf.myLegalName.organisation) {
val node = InProcessNode(config.corda, MOCK_VERSION_INFO, cordappPackages).start()
val nodeThread = thread(name = config.corda.myLegalName.organisation) {
node.internals.run()
}
node to nodeThread
}.flatMap { nodeAndThread ->
addressMustBeBoundFuture(executorService, nodeConf.p2pAddress).map { nodeAndThread }
addressMustBeBoundFuture(executorService, config.corda.p2pAddress).map { nodeAndThread }
}
}
private fun startOutOfProcessNode(
nodeConf: NodeConfiguration,
config: Config,
config: NodeConfig,
quasarJarPath: String,
debugPort: Int?,
jolokiaJarPath: String,
@ -631,15 +710,17 @@ class DriverDSLImpl(
overriddenSystemProperties: Map<String, String>,
cordappPackages: List<String>,
maximumHeapSize: String,
initialRegistration: Boolean
extraCmdLineFlag: String?
): Process {
log.info("Starting out-of-process Node ${nodeConf.myLegalName.organisation}, debug port is " + (debugPort ?: "not enabled") + ", jolokia monitoring port is " + (monitorPort ?: "not enabled"))
log.info("Starting out-of-process Node ${config.corda.myLegalName.organisation}, " +
"debug port is " + (debugPort ?: "not enabled") + ", " +
"jolokia monitoring port is " + (monitorPort ?: "not enabled"))
// Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config)
writeConfig(config.corda.baseDirectory, "node.conf", config.typesafe)
val systemProperties = mutableMapOf(
"name" to nodeConf.myLegalName,
"visualvm.display.name" to "corda-${nodeConf.myLegalName}",
"name" to config.corda.myLegalName,
"visualvm.display.name" to "corda-${config.corda.myLegalName}",
"java.io.tmpdir" to System.getProperty("java.io.tmpdir"), // Inherit from parent process
"log4j2.debug" to if(debugPort != null) "true" else "false"
)
@ -664,11 +745,11 @@ class DriverDSLImpl(
val loggingLevel = if (debugPort == null) "INFO" else "DEBUG"
val arguments = mutableListOf(
"--base-directory=${nodeConf.baseDirectory}",
"--base-directory=${config.corda.baseDirectory}",
"--logging-level=$loggingLevel",
"--no-local-shell").also {
if (initialRegistration) {
it += "--initial-registration"
if (extraCmdLineFlag != null) {
it += extraCmdLineFlag
}
}.toList()
@ -677,8 +758,8 @@ class DriverDSLImpl(
arguments = arguments,
jdwpPort = debugPort,
extraJvmArguments = extraJvmArguments + listOfNotNull(jolokiaAgent),
errorLogPath = nodeConf.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "error.log",
workingDirectory = nodeConf.baseDirectory,
errorLogPath = config.corda.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "error.log",
workingDirectory = config.corda.baseDirectory,
maximumHeapSize = maximumHeapSize
)
}
@ -827,8 +908,7 @@ fun <DI : DriverDSL, D : InternalDriverDSL, A> genericDriver(
extraCordappPackagesToScan = extraCordappPackagesToScan,
jmxPolicy = jmxPolicy,
notarySpecs = notarySpecs,
compatibilityZone = null,
onNetworkParametersGeneration = {}
compatibilityZone = null
)
)
val shutdownHook = addShutdownHook(driverDsl::shutdown)
@ -846,11 +926,16 @@ fun <DI : DriverDSL, D : InternalDriverDSL, A> genericDriver(
}
/**
* Internal API to enable testing of the network map service and node registration process using the internal driver.
* @property url The base CZ URL for registration and network map updates
* @property rootCert If specified then the node will register itself using [url] and expect the registration response
* to be rooted at this cert.
* @property publishNotaries Hook for a network map server to capture the generated [NotaryInfo] objects needed for
* creating the network parameters. This is needed as the network map server is expected to distribute it. The callback
* will occur on a different thread to the driver-calling thread.
* @property rootCert If specified then the nodes will register themselves with the doorman service using [url] and expect
* the registration response to be rooted at this cert. If not specified then no registration is performed and the dev
* root cert is used as normal.
*/
data class CompatibilityZoneParams(val url: URL, val rootCert: X509Certificate? = null)
data class CompatibilityZoneParams(val url: URL, val publishNotaries: (List<NotaryInfo>) -> Unit, val rootCert: X509Certificate? = null)
fun <A> internalDriver(
isDebug: Boolean = DriverParameters().isDebug,
@ -866,7 +951,6 @@ fun <A> internalDriver(
extraCordappPackagesToScan: List<String> = DriverParameters().extraCordappPackagesToScan,
jmxPolicy: JmxPolicy = DriverParameters().jmxPolicy,
compatibilityZone: CompatibilityZoneParams? = null,
onNetworkParametersGeneration: (NetworkParameters) -> Unit = {},
dsl: DriverDSLImpl.() -> A
): A {
return genericDriver(
@ -882,8 +966,7 @@ fun <A> internalDriver(
notarySpecs = notarySpecs,
extraCordappPackagesToScan = extraCordappPackagesToScan,
jmxPolicy = jmxPolicy,
compatibilityZone = compatibilityZone,
onNetworkParametersGeneration = onNetworkParametersGeneration
compatibilityZone = compatibilityZone
),
coerce = { it },
dsl = dsl,

View File

@ -121,8 +121,7 @@ fun <A> rpcDriver(
extraCordappPackagesToScan = extraCordappPackagesToScan,
notarySpecs = notarySpecs,
jmxPolicy = jmxPolicy,
compatibilityZone = null,
onNetworkParametersGeneration = {}
compatibilityZone = null
), externalTrace
),
coerce = { it },

View File

@ -36,7 +36,7 @@ import javax.ws.rs.core.Response.ok
class NetworkMapServer(cacheTimeout: Duration,
hostAndPort: NetworkHostAndPort,
root_ca: CertificateAndKeyPair = ROOT_CA, // Default to ROOT_CA for testing.
rootCa: CertificateAndKeyPair = ROOT_CA, // Default to ROOT_CA for testing.
private val myHostNameValue: String = "test.host.name",
vararg additionalServices: Any) : Closeable {
companion object {
@ -64,7 +64,7 @@ class NetworkMapServer(cacheTimeout: Duration,
field = networkParameters
}
private val serializedParameters get() = networkParameters.serialize()
private val service = InMemoryNetworkMapService(cacheTimeout, networkMapKeyAndCert(root_ca))
private val service = InMemoryNetworkMapService(cacheTimeout, networkMapKeyAndCert(rootCa))
init {

View File

@ -76,8 +76,7 @@ fun <A> verifierDriver(
extraCordappPackagesToScan = extraCordappPackagesToScan,
notarySpecs = notarySpecs,
jmxPolicy = jmxPolicy,
compatibilityZone = null,
onNetworkParametersGeneration = { }
compatibilityZone = null
)
),
coerce = { it },