Preliminary work to make merge with master manageable

This commit is contained in:
Shams Asari 2017-12-08 11:46:10 +00:00
parent 4a677815ef
commit db9eb8a63f
9 changed files with 842 additions and 789 deletions

View File

@ -25,7 +25,10 @@ import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import rx.subjects.UnicastSubject import rx.subjects.UnicastSubject
import java.time.Duration import java.time.Duration
import java.util.concurrent.* import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
class RPCStabilityTests { class RPCStabilityTests {

View File

@ -20,7 +20,7 @@ import net.corda.node.internal.cordapp.CordappProviderImpl
import net.corda.testing.* import net.corda.testing.*
import net.corda.testing.DUMMY_BANK_A import net.corda.testing.DUMMY_BANK_A
import net.corda.testing.DUMMY_NOTARY import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.driver.DriverDSLExposedInterface import net.corda.testing.driver.DriverDSL
import net.corda.testing.driver.NodeHandle import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.driver import net.corda.testing.driver.driver
import net.corda.testing.node.MockServices import net.corda.testing.node.MockServices
@ -51,14 +51,14 @@ class AttachmentLoadingTests {
Class.forName("net.corda.finance.contracts.isolated.IsolatedDummyFlow\$Initiator", true, URLClassLoader(arrayOf(isolatedJAR))) Class.forName("net.corda.finance.contracts.isolated.IsolatedDummyFlow\$Initiator", true, URLClassLoader(arrayOf(isolatedJAR)))
.asSubclass(FlowLogic::class.java) .asSubclass(FlowLogic::class.java)
private fun DriverDSLExposedInterface.createTwoNodes(): List<NodeHandle> { private fun DriverDSL.createTwoNodes(): List<NodeHandle> {
return listOf( return listOf(
startNode(providedName = bankAName), startNode(providedName = bankAName),
startNode(providedName = bankBName) startNode(providedName = bankBName)
).transpose().getOrThrow() ).transpose().getOrThrow()
} }
private fun DriverDSLExposedInterface.installIsolatedCordappTo(nodeName: CordaX500Name) { private fun DriverDSL.installIsolatedCordappTo(nodeName: CordaX500Name) {
// Copy the app jar to the first node. The second won't have it. // Copy the app jar to the first node. The second won't have it.
val path = (baseDirectory(nodeName.toString()) / "cordapps").createDirectories() / "isolated.jar" val path = (baseDirectory(nodeName.toString()) / "cordapps").createDirectories() / "isolated.jar"
logger.info("Installing isolated jar to $path") logger.info("Installing isolated jar to $path")

View File

@ -18,7 +18,7 @@ import net.corda.node.services.messaging.*
import net.corda.node.services.transactions.RaftValidatingNotaryService import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.testing.ALICE import net.corda.testing.ALICE
import net.corda.testing.chooseIdentity import net.corda.testing.chooseIdentity
import net.corda.testing.driver.DriverDSLExposedInterface import net.corda.testing.driver.DriverDSL
import net.corda.testing.driver.NodeHandle import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.driver import net.corda.testing.driver.driver
import net.corda.testing.node.ClusterSpec import net.corda.testing.node.ClusterSpec
@ -116,13 +116,13 @@ class P2PMessagingTest {
} }
} }
private fun startDriverWithDistributedService(dsl: DriverDSLExposedInterface.(List<StartedNode<Node>>) -> Unit) { private fun startDriverWithDistributedService(dsl: DriverDSL.(List<StartedNode<Node>>) -> Unit) {
driver(startNodesInProcess = true, notarySpecs = listOf(NotarySpec(DISTRIBUTED_SERVICE_NAME, cluster = ClusterSpec.Raft(clusterSize = 2)))) { driver(startNodesInProcess = true, notarySpecs = listOf(NotarySpec(DISTRIBUTED_SERVICE_NAME, cluster = ClusterSpec.Raft(clusterSize = 2)))) {
dsl(defaultNotaryHandle.nodeHandles.getOrThrow().map { (it as NodeHandle.InProcess).node }) dsl(defaultNotaryHandle.nodeHandles.getOrThrow().map { (it as NodeHandle.InProcess).node })
} }
} }
private fun DriverDSLExposedInterface.startAlice(): StartedNode<Node> { private fun DriverDSL.startAlice(): StartedNode<Node> {
return startNode(providedName = ALICE.name, customOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)) return startNode(providedName = ALICE.name, customOverrides = mapOf("messageRedeliveryDelaySeconds" to 1))
.map { (it as NodeHandle.InProcess).node } .map { (it as NodeHandle.InProcess).node }
.getOrThrow() .getOrThrow()

View File

@ -4,6 +4,7 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.map
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.testing.driver.* import net.corda.testing.driver.*
import net.corda.testing.internal.DriverDSLImpl
import net.corda.testing.internal.ProcessUtilities import net.corda.testing.internal.ProcessUtilities
import net.corda.testing.node.NotarySpec import net.corda.testing.node.NotarySpec
import okhttp3.OkHttpClient import okhttp3.OkHttpClient
@ -14,7 +15,7 @@ import java.nio.file.Path
import java.nio.file.Paths import java.nio.file.Paths
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
interface SpringDriverExposedDSLInterface : DriverDSLExposedInterface { interface SpringDriverExposedDSLInterface : DriverDSL {
/** /**
* Starts a Spring Boot application, passes the RPC connection data as parameters the process. * Starts a Spring Boot application, passes the RPC connection data as parameters the process.
@ -55,11 +56,11 @@ fun <A> springDriver(
startNodesInProcess = startNodesInProcess, startNodesInProcess = startNodesInProcess,
extraCordappPackagesToScan = extraCordappPackagesToScan, extraCordappPackagesToScan = extraCordappPackagesToScan,
notarySpecs = notarySpecs, notarySpecs = notarySpecs,
driverDslWrapper = { driverDSL:DriverDSL -> SpringBootDriverDSL(driverDSL) }, driverDslWrapper = { driverDSL: DriverDSLImpl -> SpringBootDriverDSL(driverDSL) },
coerce = { it }, dsl = dsl coerce = { it }, dsl = dsl
) )
data class SpringBootDriverDSL(private val driverDSL: DriverDSL) : DriverDSLInternalInterface by driverDSL, SpringDriverInternalDSLInterface { data class SpringBootDriverDSL(private val driverDSL: DriverDSLImpl) : DriverDSLInternalInterface by driverDSL, SpringDriverInternalDSLInterface {
companion object { companion object {
private val log = contextLogger() private val log = contextLogger()
} }

View File

@ -2,56 +2,31 @@
package net.corda.testing.driver package net.corda.testing.driver
import com.google.common.collect.HashMultimap
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigRenderOptions import com.typesafe.config.ConfigRenderOptions
import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClient
import net.corda.cordform.CordformContext
import net.corda.cordform.CordformNode
import net.corda.core.CordaException import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture import net.corda.core.concurrent.CordaFuture
import net.corda.core.concurrent.firstOf
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.* import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.* import net.corda.core.internal.copyTo
import net.corda.core.internal.div
import net.corda.core.internal.times
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NotaryService
import net.corda.core.toFuture
import net.corda.core.utilities.* import net.corda.core.utilities.*
import net.corda.node.internal.Node import net.corda.node.internal.Node
import net.corda.node.internal.NodeStartup
import net.corda.node.internal.StartedNode import net.corda.node.internal.StartedNode
import net.corda.node.services.Permissions.Companion.invokeRpc import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.* import net.corda.node.services.config.VerifierType
import net.corda.node.services.transactions.BFTNonValidatingNotaryService import net.corda.nodeapi.internal.addShutdownHook
import net.corda.node.services.transactions.RaftNonValidatingNotaryService
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.utilities.registration.HTTPNetworkRegistrationService
import net.corda.node.utilities.registration.NetworkRegistrationHelper
import net.corda.nodeapi.internal.*
import net.corda.nodeapi.internal.config.User import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.config.parseAs import net.corda.testing.DUMMY_NOTARY
import net.corda.nodeapi.internal.config.toConfig import net.corda.testing.internal.DriverDSLImpl
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.testing.*
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.driver.DriverDSL.ClusterType.*
import net.corda.testing.internal.InProcessNode
import net.corda.testing.internal.ProcessUtilities
import net.corda.testing.node.ClusterSpec
import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO
import net.corda.testing.node.NotarySpec import net.corda.testing.node.NotarySpec
import okhttp3.OkHttpClient import net.corda.testing.setGlobalSerialization
import okhttp3.Request
import org.slf4j.Logger import org.slf4j.Logger
import rx.Observable
import rx.observables.ConnectableObservable
import rx.schedulers.Schedulers
import java.net.* import java.net.*
import java.nio.file.Path import java.nio.file.Path
import java.nio.file.Paths import java.nio.file.Paths
@ -61,14 +36,9 @@ import java.time.Duration
import java.time.Instant import java.time.Instant
import java.time.ZoneOffset.UTC import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter import java.time.format.DateTimeFormatter
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import kotlin.collections.ArrayList
import kotlin.concurrent.thread
/** /**
* This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests. * This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests.
@ -77,115 +47,18 @@ import kotlin.concurrent.thread
* *
* TODO this file is getting way too big, it should be split into several files. * TODO this file is getting way too big, it should be split into several files.
*/ */
private val log: Logger = loggerFor<DriverDSL>() private val log: Logger = loggerFor<DriverDSLImpl>()
private val DEFAULT_POLL_INTERVAL = 500.millis
private const val DEFAULT_WARN_COUNT = 120
/**
* A sub-set of permissions that grant most of the essential operations used in the unit/integration tests as well as
* in demo application like NodeExplorer.
*/
private val DRIVER_REQUIRED_PERMISSIONS = setOf(
invokeRpc(CordaRPCOps::nodeInfo),
invokeRpc(CordaRPCOps::networkMapFeed),
invokeRpc(CordaRPCOps::networkMapSnapshot),
invokeRpc(CordaRPCOps::notaryIdentities),
invokeRpc(CordaRPCOps::stateMachinesFeed),
invokeRpc(CordaRPCOps::stateMachineRecordedTransactionMappingFeed),
invokeRpc(CordaRPCOps::nodeInfoFromParty),
invokeRpc(CordaRPCOps::internalVerifiedTransactionsFeed),
invokeRpc("vaultQueryBy"),
invokeRpc("vaultTrackBy"),
invokeRpc(CordaRPCOps::registeredFlows)
)
/** /**
* Object ecapsulating a notary started automatically by the driver. * Object ecapsulating a notary started automatically by the driver.
*/ */
data class NotaryHandle(val identity: Party, val validating: Boolean, val nodeHandles: CordaFuture<List<NodeHandle>>) data class NotaryHandle(val identity: Party, val validating: Boolean, val nodeHandles: CordaFuture<List<NodeHandle>>)
/** interface DriverDSLInternalInterface : DriverDSL {
* This is the interface that's exposed to DSL users. private companion object {
*/ private val DEFAULT_POLL_INTERVAL = 500.millis
interface DriverDSLExposedInterface : CordformContext { private const val DEFAULT_WARN_COUNT = 120
/** Returns a list of [NotaryHandle]s matching the list of [NotarySpec]s passed into [driver]. */
val notaryHandles: List<NotaryHandle>
/**
* Returns the [NotaryHandle] for the single notary on the network. Throws if there are none or more than one.
* @see notaryHandles
*/
val defaultNotaryHandle: NotaryHandle
get() {
return when (notaryHandles.size) {
0 -> throw IllegalStateException("There are no notaries defined on the network")
1 -> notaryHandles[0]
else -> throw IllegalStateException("There is more than one notary defined on the network")
} }
}
/**
* Returns the identity of the single notary on the network. Throws if there are none or more than one.
* @see defaultNotaryHandle
*/
val defaultNotaryIdentity: Party get() = defaultNotaryHandle.identity
/**
* Returns a [CordaFuture] on the [NodeHandle] for the single-node notary on the network. Throws if there
* are no notaries or more than one, or if the notary is a distributed cluster.
* @see defaultNotaryHandle
* @see notaryHandles
*/
val defaultNotaryNode: CordaFuture<NodeHandle>
get() {
return defaultNotaryHandle.nodeHandles.map {
it.singleOrNull() ?: throw IllegalStateException("Default notary is not a single node")
}
}
/**
* Start a node.
*
* @param defaultParameters The default parameters for the node. Allows the node to be configured in builder style
* when called from Java code.
* @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something
* random. Note that this must be unique as the driver uses it as a primary key!
* @param verifierType The type of transaction verifier to use. See: [VerifierType]
* @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list.
* @param startInSameProcess Determines if the node should be started inside the same process the Driver is running
* in. If null the Driver-level value will be used.
* @return A [CordaFuture] on the [NodeHandle] to the node. The future will complete when the node is available.
*/
fun startNode(
defaultParameters: NodeParameters = NodeParameters(),
providedName: CordaX500Name? = defaultParameters.providedName,
rpcUsers: List<User> = defaultParameters.rpcUsers,
verifierType: VerifierType = defaultParameters.verifierType,
customOverrides: Map<String, Any?> = defaultParameters.customOverrides,
startInSameProcess: Boolean? = defaultParameters.startInSameProcess,
maximumHeapSize: String = defaultParameters.maximumHeapSize
): CordaFuture<NodeHandle>
/**
* Helper function for starting a [Node] with custom parameters from Java.
*
* @param parameters The default parameters for the driver.
* @return [NodeHandle] that will be available sometime in the future.
*/
fun startNode(parameters: NodeParameters): CordaFuture<NodeHandle> = startNode(defaultParameters = parameters)
/** Call [startWebserver] with a default maximumHeapSize. */
fun startWebserver(handle: NodeHandle): CordaFuture<WebserverHandle> = startWebserver(handle, "200m")
/**
* Starts a web server for a node
* @param handle The handle for the node that this webserver connects to via RPC.
* @param maximumHeapSize Argument for JVM -Xmx option e.g. "200m".
*/
fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture<WebserverHandle>
/** /**
* Polls a function until it returns a non-null value. Note that there is no timeout on the polling. * Polls a function until it returns a non-null value. Note that there is no timeout on the polling.
@ -206,11 +79,8 @@ interface DriverDSLExposedInterface : CordformContext {
return pollUntilNonNull(pollName, pollInterval, warnCount) { if (check()) Unit else null } return pollUntilNonNull(pollName, pollInterval, warnCount) { if (check()) Unit else null }
} }
val shutdownManager: ShutdownManager
}
interface DriverDSLInternalInterface : DriverDSLExposedInterface {
fun start() fun start()
fun shutdown() fun shutdown()
} }
@ -320,7 +190,7 @@ data class NodeParameters(
* (...) * (...)
* } * }
* *
* Note that [DriverDSL.startNode] does not wait for the node to start up synchronously, but rather returns a [CordaFuture] * Note that [DriverDSLImpl.startNode] does not wait for the node to start up synchronously, but rather returns a [CordaFuture]
* of the [NodeInfo] that may be waited on, which completes when the new node registered with the network map service or * of the [NodeInfo] that may be waited on, which completes when the new node registered with the network map service or
* loaded node data from database. * loaded node data from database.
* *
@ -329,15 +199,15 @@ data class NodeParameters(
* @param isDebug Indicates whether the spawned nodes should start in jdwt debug mode and have debug level logging. * @param isDebug Indicates whether the spawned nodes should start in jdwt debug mode and have debug level logging.
* @param driverDirectory The base directory node directories go into, defaults to "build/<timestamp>/". The node * @param driverDirectory The base directory node directories go into, defaults to "build/<timestamp>/". The node
* directories themselves are "<baseDirectory>/<legalName>/", where legalName defaults to "<randomName>-<messagingPort>" * directories themselves are "<baseDirectory>/<legalName>/", where legalName defaults to "<randomName>-<messagingPort>"
* and may be specified in [DriverDSL.startNode]. * and may be specified in [DriverDSLImpl.startNode].
* @param portAllocation The port allocation strategy to use for the messaging and the web server addresses. Defaults to incremental. * @param portAllocation The port allocation strategy to use for the messaging and the web server addresses. Defaults to incremental.
* @param debugPortAllocation The port allocation strategy to use for jvm debugging. Defaults to incremental. * @param debugPortAllocation The port allocation strategy to use for jvm debugging. Defaults to incremental.
* @param systemProperties A Map of extra system properties which will be given to each new node. Defaults to empty. * @param systemProperties A Map of extra system properties which will be given to each new node. Defaults to empty.
* @param useTestClock If true the test clock will be used in Node. * @param useTestClock If true the test clock will be used in Node.
* @param startNodesInProcess Provides the default behaviour of whether new nodes should start inside this process or * @param startNodesInProcess Provides the default behaviour of whether new nodes should start inside this process or
* not. Note that this may be overridden in [DriverDSLExposedInterface.startNode]. * not. Note that this may be overridden in [DriverDSL.startNode].
* @param notarySpecs The notaries advertised for this network. These nodes will be started automatically and will be * @param notarySpecs The notaries advertised for this network. These nodes will be started automatically and will be
* available from [DriverDSLExposedInterface.notaryHandles]. Defaults to a simple validating notary. * available from [DriverDSL.notaryHandles]. Defaults to a simple validating notary.
* @param dsl The dsl itself. * @param dsl The dsl itself.
* @return The value returned in the [dsl] closure. * @return The value returned in the [dsl] closure.
*/ */
@ -354,10 +224,10 @@ fun <A> driver(
waitForAllNodesToFinish: Boolean = defaultParameters.waitForNodesToFinish, waitForAllNodesToFinish: Boolean = defaultParameters.waitForNodesToFinish,
notarySpecs: List<NotarySpec> = defaultParameters.notarySpecs, notarySpecs: List<NotarySpec> = defaultParameters.notarySpecs,
extraCordappPackagesToScan: List<String> = defaultParameters.extraCordappPackagesToScan, extraCordappPackagesToScan: List<String> = defaultParameters.extraCordappPackagesToScan,
dsl: DriverDSLExposedInterface.() -> A dsl: DriverDSL.() -> A
): A { ): A {
return genericDriver( return genericDriver(
driverDsl = DriverDSL( driverDsl = DriverDSLImpl(
portAllocation = portAllocation, portAllocation = portAllocation,
debugPortAllocation = debugPortAllocation, debugPortAllocation = debugPortAllocation,
systemProperties = systemProperties, systemProperties = systemProperties,
@ -398,10 +268,10 @@ fun <A> internalDriver(
notarySpecs: List<NotarySpec> = DriverParameters().notarySpecs, notarySpecs: List<NotarySpec> = DriverParameters().notarySpecs,
extraCordappPackagesToScan: List<String> = DriverParameters().extraCordappPackagesToScan, extraCordappPackagesToScan: List<String> = DriverParameters().extraCordappPackagesToScan,
compatibilityZone: CompatibilityZoneParams? = null, compatibilityZone: CompatibilityZoneParams? = null,
dsl: DriverDSL.() -> A dsl: DriverDSLImpl.() -> A
): A { ): A {
return genericDriver( return genericDriver(
driverDsl = DriverDSL( driverDsl = DriverDSLImpl(
portAllocation = portAllocation, portAllocation = portAllocation,
debugPortAllocation = debugPortAllocation, debugPortAllocation = debugPortAllocation,
systemProperties = systemProperties, systemProperties = systemProperties,
@ -429,7 +299,7 @@ fun <A> internalDriver(
*/ */
fun <A> driver( fun <A> driver(
parameters: DriverParameters, parameters: DriverParameters,
dsl: DriverDSLExposedInterface.() -> A dsl: DriverDSL.() -> A
): A { ): A {
return driver(defaultParameters = parameters, dsl = dsl) return driver(defaultParameters = parameters, dsl = dsl)
} }
@ -464,13 +334,13 @@ data class DriverParameters(
/** /**
* This is a helper method to allow extending of the DSL, along the lines of * This is a helper method to allow extending of the DSL, along the lines of
* interface SomeOtherExposedDSLInterface : DriverDSLExposedInterface * interface SomeOtherExposedDSLInterface : DriverDSL
* interface SomeOtherInternalDSLInterface : DriverDSLInternalInterface, SomeOtherExposedDSLInterface * interface SomeOtherInternalDSLInterface : DriverDSLInternalInterface, SomeOtherExposedDSLInterface
* class SomeOtherDSL(val driverDSL : DriverDSL) : DriverDSLInternalInterface by driverDSL, SomeOtherInternalDSLInterface * class SomeOtherDSL(val driverDSL : DriverDSLImpl) : DriverDSLInternalInterface by driverDSL, SomeOtherInternalDSLInterface
* *
* @param coerce We need this explicit coercion witness because we can't put an extra DI : D bound in a `where` clause. * @param coerce We need this explicit coercion witness because we can't put an extra DI : D bound in a `where` clause.
*/ */
fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericDriver( fun <DI : DriverDSL, D : DriverDSLInternalInterface, A> genericDriver(
driverDsl: D, driverDsl: D,
initialiseSerialization: Boolean = true, initialiseSerialization: Boolean = true,
coerce: (D) -> DI, coerce: (D) -> DI,
@ -493,13 +363,13 @@ fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericD
/** /**
* This is a helper method to allow extending of the DSL, along the lines of * This is a helper method to allow extending of the DSL, along the lines of
* interface SomeOtherExposedDSLInterface : DriverDSLExposedInterface * interface SomeOtherExposedDSLInterface : DriverDSL
* interface SomeOtherInternalDSLInterface : DriverDSLInternalInterface, SomeOtherExposedDSLInterface * interface SomeOtherInternalDSLInterface : DriverDSLInternalInterface, SomeOtherExposedDSLInterface
* class SomeOtherDSL(val driverDSL : DriverDSL) : DriverDSLInternalInterface by driverDSL, SomeOtherInternalDSLInterface * class SomeOtherDSL(val driverDSL : DriverDSLImpl) : DriverDSLInternalInterface by driverDSL, SomeOtherInternalDSLInterface
* *
* @param coerce We need this explicit coercion witness because we can't put an extra DI : D bound in a `where` clause. * @param coerce We need this explicit coercion witness because we can't put an extra DI : D bound in a `where` clause.
*/ */
fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericDriver( fun <DI : DriverDSL, D : DriverDSLInternalInterface, A> genericDriver(
defaultParameters: DriverParameters = DriverParameters(), defaultParameters: DriverParameters = DriverParameters(),
isDebug: Boolean = defaultParameters.isDebug, isDebug: Boolean = defaultParameters.isDebug,
driverDirectory: Path = defaultParameters.driverDirectory, driverDirectory: Path = defaultParameters.driverDirectory,
@ -512,13 +382,13 @@ fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericD
startNodesInProcess: Boolean = defaultParameters.startNodesInProcess, startNodesInProcess: Boolean = defaultParameters.startNodesInProcess,
notarySpecs: List<NotarySpec>, notarySpecs: List<NotarySpec>,
extraCordappPackagesToScan: List<String> = defaultParameters.extraCordappPackagesToScan, extraCordappPackagesToScan: List<String> = defaultParameters.extraCordappPackagesToScan,
driverDslWrapper: (DriverDSL) -> D, driverDslWrapper: (DriverDSLImpl) -> D,
coerce: (D) -> DI, coerce: (D) -> DI,
dsl: DI.() -> A dsl: DI.() -> A
): A { ): A {
val serializationEnv = setGlobalSerialization(initialiseSerialization) val serializationEnv = setGlobalSerialization(initialiseSerialization)
val driverDsl = driverDslWrapper( val driverDsl = driverDslWrapper(
DriverDSL( DriverDSLImpl(
portAllocation = portAllocation, portAllocation = portAllocation,
debugPortAllocation = debugPortAllocation, debugPortAllocation = debugPortAllocation,
systemProperties = systemProperties, systemProperties = systemProperties,
@ -625,614 +495,6 @@ fun <A> poll(
return resultFuture return resultFuture
} }
class DriverDSL(
val portAllocation: PortAllocation,
val debugPortAllocation: PortAllocation,
val systemProperties: Map<String, String>,
val driverDirectory: Path,
val useTestClock: Boolean,
val isDebug: Boolean,
val startNodesInProcess: Boolean,
val waitForNodesToFinish: Boolean,
extraCordappPackagesToScan: List<String>,
val notarySpecs: List<NotarySpec>,
val compatibilityZone: CompatibilityZoneParams?
) : DriverDSLInternalInterface {
private var _executorService: ScheduledExecutorService? = null
val executorService get() = _executorService!!
private var _shutdownManager: ShutdownManager? = null
override val shutdownManager get() = _shutdownManager!!
private val cordappPackages = extraCordappPackagesToScan + getCallerPackage()
// TODO: this object will copy NodeInfo files from started nodes to other nodes additional-node-infos/
// This uses the FileSystem and adds a delay (~5 seconds) given by the time we wait before polling the file system.
// Investigate whether we can avoid that.
// TODO: NodeInfoFilesCopier create observable threads in the init method, we should move that to a start method instead, changing this to lateinit instead to prevent that.
private lateinit var nodeInfoFilesCopier: NodeInfoFilesCopier
// Map from a nodes legal name to an observable emitting the number of nodes in its network map.
private val countObservables = mutableMapOf<CordaX500Name, Observable<Int>>()
private lateinit var _notaries: List<NotaryHandle>
override val notaryHandles: List<NotaryHandle> get() = _notaries
private var networkParameters: NetworkParametersCopier? = null
class State {
val processes = ArrayList<Process>()
}
private val state = ThreadBox(State())
//TODO: remove this once we can bundle quasar properly.
private val quasarJarPath: String by lazy {
val cl = ClassLoader.getSystemClassLoader()
val urls = (cl as URLClassLoader).urLs
val quasarPattern = ".*quasar.*\\.jar$".toRegex()
val quasarFileUrl = urls.first { quasarPattern.matches(it.path) }
Paths.get(quasarFileUrl.toURI()).toString()
}
override fun shutdown() {
if (waitForNodesToFinish) {
state.locked {
processes.forEach { it.waitFor() }
}
}
_shutdownManager?.shutdown()
_executorService?.shutdownNow()
}
private fun establishRpc(config: NodeConfiguration, processDeathFuture: CordaFuture<out Process>): CordaFuture<CordaRPCOps> {
val rpcAddress = config.rpcAddress!!
val client = CordaRPCClient(rpcAddress)
val connectionFuture = poll(executorService, "RPC connection") {
try {
client.start(config.rpcUsers[0].username, config.rpcUsers[0].password)
} catch (e: Exception) {
if (processDeathFuture.isDone) throw e
log.error("Exception $e, Retrying RPC connection at $rpcAddress")
null
}
}
return firstOf(connectionFuture, processDeathFuture) {
if (it == processDeathFuture) {
throw ListenProcessDeathException(rpcAddress, processDeathFuture.getOrThrow())
}
val connection = connectionFuture.getOrThrow()
shutdownManager.registerShutdown(connection::close)
connection.proxy
}
}
override fun startNode(
defaultParameters: NodeParameters,
providedName: CordaX500Name?,
rpcUsers: List<User>,
verifierType: VerifierType,
customOverrides: Map<String, Any?>,
startInSameProcess: Boolean?,
maximumHeapSize: String
): CordaFuture<NodeHandle> {
val p2pAddress = portAllocation.nextHostAndPort()
// TODO: Derive name from the full picked name, don't just wrap the common name
val name = providedName ?: CordaX500Name(organisation = "${oneOf(names).organisation}-${p2pAddress.port}", locality = "London", country = "GB")
val registrationFuture = if (compatibilityZone?.rootCert != null) {
nodeRegistration(name, compatibilityZone.rootCert, compatibilityZone.url)
} else {
doneFuture(Unit)
}
return registrationFuture.flatMap {
val rpcAddress = portAllocation.nextHostAndPort()
val webAddress = portAllocation.nextHostAndPort()
val users = rpcUsers.map { it.copy(permissions = it.permissions + DRIVER_REQUIRED_PERMISSIONS) }
val configMap = configOf(
"myLegalName" to name.toString(),
"p2pAddress" to p2pAddress.toString(),
"rpcAddress" to rpcAddress.toString(),
"webAddress" to webAddress.toString(),
"useTestClock" to useTestClock,
"rpcUsers" to if (users.isEmpty()) defaultRpcUserList else users.map { it.toConfig().root().unwrapped() },
"verifierType" to verifierType.name
) + customOverrides
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
configOverrides = if (compatibilityZone != null) {
configMap + mapOf("compatibilityZoneURL" to compatibilityZone.url.toString())
} else {
configMap
}
)
startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize)
}
}
private fun nodeRegistration(providedName: CordaX500Name, rootCert: X509Certificate, compatibilityZoneURL: URL): CordaFuture<Unit> {
val baseDirectory = baseDirectory(providedName).createDirectories()
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory,
allowMissingConfig = true,
configOverrides = configOf(
"p2pAddress" to "localhost:1222", // required argument, not really used
"compatibilityZoneURL" to compatibilityZoneURL.toString(),
"myLegalName" to providedName.toString())
)
val configuration = config.parseAsNodeConfiguration()
configuration.rootCertFile.parent.createDirectories()
X509Utilities.saveCertificateAsPEMFile(rootCert, configuration.rootCertFile)
return if (startNodesInProcess) {
// This is a bit cheating, we're not starting a full node, we're just calling the code nodes call
// when registering.
NetworkRegistrationHelper(configuration, HTTPNetworkRegistrationService(compatibilityZoneURL)).buildKeystore()
doneFuture(Unit)
} else {
startOutOfProcessNodeRegistration(config, configuration)
}
}
private enum class ClusterType(val validating: Boolean, val clusterName: CordaX500Name) {
VALIDATING_RAFT(true, CordaX500Name(RaftValidatingNotaryService.id, "Raft", "Zurich", "CH")),
NON_VALIDATING_RAFT(false, CordaX500Name(RaftNonValidatingNotaryService.id, "Raft", "Zurich", "CH")),
NON_VALIDATING_BFT(false, CordaX500Name(BFTNonValidatingNotaryService.id, "BFT", "Zurich", "CH"))
}
internal fun startCordformNodes(cordforms: List<CordformNode>): CordaFuture<*> {
val clusterNodes = HashMultimap.create<ClusterType, CordaX500Name>()
val notaryInfos = ArrayList<NotaryInfo>()
// Go though the node definitions and pick out the notaries so that we can generate their identities to be used
// in the network parameters
for (cordform in cordforms) {
if (cordform.notary == null) continue
val name = CordaX500Name.parse(cordform.name)
val notaryConfig = ConfigFactory.parseMap(cordform.notary).parseAs<NotaryConfig>()
// We need to first group the nodes that form part of a cluser. We assume for simplicity that nodes of the
// same cluster type and validating flag are part of the same cluster.
if (notaryConfig.raft != null) {
val key = if (notaryConfig.validating) VALIDATING_RAFT else NON_VALIDATING_RAFT
clusterNodes.put(key, name)
} else if (notaryConfig.bftSMaRt != null) {
clusterNodes.put(NON_VALIDATING_BFT, name)
} else {
// We have all we need here to generate the identity for single node notaries
val identity = ServiceIdentityGenerator.generateToDisk(
dirs = listOf(baseDirectory(name)),
serviceName = name,
serviceId = "identity"
)
notaryInfos += NotaryInfo(identity, notaryConfig.validating)
}
}
clusterNodes.asMap().forEach { type, nodeNames ->
val identity = ServiceIdentityGenerator.generateToDisk(
dirs = nodeNames.map { baseDirectory(it) },
serviceName = type.clusterName,
serviceId = NotaryService.constructId(
validating = type.validating,
raft = type in setOf(VALIDATING_RAFT, NON_VALIDATING_RAFT),
bft = type == NON_VALIDATING_BFT
)
)
notaryInfos += NotaryInfo(identity, type.validating)
}
networkParameters = NetworkParametersCopier(testNetworkParameters(notaryInfos))
return cordforms.map {
val startedNode = startCordformNode(it)
if (it.webAddress != null) {
// Start a webserver if an address for it was specified
startedNode.flatMap { startWebserver(it) }
} else {
startedNode
}
}.transpose()
}
private fun startCordformNode(cordform: CordformNode): CordaFuture<NodeHandle> {
val name = CordaX500Name.parse(cordform.name)
// TODO We shouldn't have to allocate an RPC or web address if they're not specified. We're having to do this because of startNodeInternal
val rpcAddress = if (cordform.rpcAddress == null) mapOf("rpcAddress" to portAllocation.nextHostAndPort().toString()) else emptyMap()
val webAddress = cordform.webAddress?.let { NetworkHostAndPort.parse(it) } ?: portAllocation.nextHostAndPort()
val notary = if (cordform.notary != null) mapOf("notary" to cordform.notary) else emptyMap()
val rpcUsers = cordform.rpcUsers
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
configOverrides = cordform.config + rpcAddress + notary + mapOf(
"rpcUsers" to if (rpcUsers.isEmpty()) defaultRpcUserList else rpcUsers
)
)
return startNodeInternal(config, webAddress, null, "200m")
}
private fun queryWebserver(handle: NodeHandle, process: Process): WebserverHandle {
val protocol = if (handle.configuration.useHTTPS) "https://" else "http://"
val url = URL("$protocol${handle.webAddress}/api/status")
val client = OkHttpClient.Builder().connectTimeout(5, SECONDS).readTimeout(60, SECONDS).build()
while (process.isAlive) try {
val response = client.newCall(Request.Builder().url(url).build()).execute()
if (response.isSuccessful && (response.body().string() == "started")) {
return WebserverHandle(handle.webAddress, process)
}
} catch (e: ConnectException) {
log.debug("Retrying webserver info at ${handle.webAddress}")
}
throw IllegalStateException("Webserver at ${handle.webAddress} has died")
}
override fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture<WebserverHandle> {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val process = DriverDSL.startWebserver(handle, debugPort, maximumHeapSize)
shutdownManager.registerProcessShutdown(process)
val webReadyFuture = addressMustBeBoundFuture(executorService, handle.webAddress, process)
return webReadyFuture.map { queryWebserver(handle, process) }
}
override fun start() {
if (startNodesInProcess) {
Schedulers.reset()
}
_executorService = Executors.newScheduledThreadPool(2, ThreadFactoryBuilder().setNameFormat("driver-pool-thread-%d").build())
_shutdownManager = ShutdownManager(executorService)
nodeInfoFilesCopier = NodeInfoFilesCopier()
shutdownManager.registerShutdown { nodeInfoFilesCopier.close() }
val notaryInfos = generateNotaryIdentities()
// The network parameters must be serialised before starting any of the nodes
networkParameters = NetworkParametersCopier(testNetworkParameters(notaryInfos))
val nodeHandles = startNotaries()
_notaries = notaryInfos.zip(nodeHandles) { (identity, validating), nodes -> NotaryHandle(identity, validating, nodes) }
}
private fun generateNotaryIdentities(): List<NotaryInfo> {
return notarySpecs.map { spec ->
val identity = if (spec.cluster == null) {
ServiceIdentityGenerator.generateToDisk(
dirs = listOf(baseDirectory(spec.name)),
serviceName = spec.name,
serviceId = "identity",
customRootCert = compatibilityZone?.rootCert
)
} else {
ServiceIdentityGenerator.generateToDisk(
dirs = generateNodeNames(spec).map { baseDirectory(it) },
serviceName = spec.name,
serviceId = NotaryService.constructId(
validating = spec.validating,
raft = spec.cluster is ClusterSpec.Raft
),
customRootCert = compatibilityZone?.rootCert
)
}
NotaryInfo(identity, spec.validating)
}
}
private fun generateNodeNames(spec: NotarySpec): List<CordaX500Name> {
return (0 until spec.cluster!!.clusterSize).map { spec.name.copy(organisation = "${spec.name.organisation}-$it") }
}
private fun startNotaries(): List<CordaFuture<List<NodeHandle>>> {
return notarySpecs.map {
when {
it.cluster == null -> startSingleNotary(it)
it.cluster is ClusterSpec.Raft -> startRaftNotaryCluster(it)
else -> throw IllegalArgumentException("BFT-SMaRt not supported")
}
}
}
// TODO This mapping is done is several places including the gradle plugin. In general we need a better way of
// generating the configs for the nodes, probably making use of Any.toConfig()
private fun NotaryConfig.toConfigMap(): Map<String, Any> = mapOf("notary" to toConfig().root().unwrapped())
private fun startSingleNotary(spec: NotarySpec): CordaFuture<List<NodeHandle>> {
return startNode(
providedName = spec.name,
rpcUsers = spec.rpcUsers,
verifierType = spec.verifierType,
customOverrides = NotaryConfig(spec.validating).toConfigMap()
).map { listOf(it) }
}
private fun startRaftNotaryCluster(spec: NotarySpec): CordaFuture<List<NodeHandle>> {
fun notaryConfig(nodeAddress: NetworkHostAndPort, clusterAddress: NetworkHostAndPort? = null): Map<String, Any> {
val clusterAddresses = if (clusterAddress != null) listOf(clusterAddress) else emptyList()
val config = NotaryConfig(
validating = spec.validating,
raft = RaftConfig(nodeAddress = nodeAddress, clusterAddresses = clusterAddresses))
return config.toConfigMap()
}
val nodeNames = generateNodeNames(spec)
val clusterAddress = portAllocation.nextHostAndPort()
// Start the first node that will bootstrap the cluster
val firstNodeFuture = startNode(
providedName = nodeNames[0],
rpcUsers = spec.rpcUsers,
verifierType = spec.verifierType,
customOverrides = notaryConfig(clusterAddress) + mapOf(
"database.serverNameTablePrefix" to nodeNames[0].toString().replace(Regex("[^0-9A-Za-z]+"), "")
)
)
// All other nodes will join the cluster
val restNodeFutures = nodeNames.drop(1).map {
val nodeAddress = portAllocation.nextHostAndPort()
startNode(
providedName = it,
rpcUsers = spec.rpcUsers,
verifierType = spec.verifierType,
customOverrides = notaryConfig(nodeAddress, clusterAddress) + mapOf(
"database.serverNameTablePrefix" to it.toString().replace(Regex("[^0-9A-Za-z]+"), "")
)
)
}
return firstNodeFuture.flatMap { first ->
restNodeFutures.transpose().map { rest -> listOf(first) + rest }
}
}
fun baseDirectory(nodeName: CordaX500Name): Path {
val nodeDirectoryName = nodeName.organisation.filter { !it.isWhitespace() }
return driverDirectory / nodeDirectoryName
}
override fun baseDirectory(nodeName: String): Path = baseDirectory(CordaX500Name.parse(nodeName))
/**
* @param initial number of nodes currently in the network map of a running node.
* @param networkMapCacheChangeObservable an observable returning the updates to the node network map.
* @return a [ConnectableObservable] which emits a new [Int] every time the number of registered nodes changes
* the initial value emitted is always [initial]
*/
private fun nodeCountObservable(initial: Int, networkMapCacheChangeObservable: Observable<NetworkMapCache.MapChange>):
ConnectableObservable<Int> {
val count = AtomicInteger(initial)
return networkMapCacheChangeObservable.map { it ->
when (it) {
is NetworkMapCache.MapChange.Added -> count.incrementAndGet()
is NetworkMapCache.MapChange.Removed -> count.decrementAndGet()
is NetworkMapCache.MapChange.Modified -> count.get()
}
}.startWith(initial).replay()
}
/**
* @param rpc the [CordaRPCOps] of a newly started node.
* @return a [CordaFuture] which resolves when every node started by driver has in its network map a number of nodes
* equal to the number of running nodes. The future will yield the number of connected nodes.
*/
private fun allNodesConnected(rpc: CordaRPCOps): CordaFuture<Int> {
val (snapshot, updates) = rpc.networkMapFeed()
val counterObservable = nodeCountObservable(snapshot.size, updates)
countObservables[rpc.nodeInfo().legalIdentities[0].name] = counterObservable
/* TODO: this might not always be the exact number of nodes one has to wait for,
* for example in the following sequence
* 1 start 3 nodes in order, A, B, C.
* 2 before the future returned by this function resolves, kill B
* At that point this future won't ever resolve as it will wait for nodes to know 3 other nodes.
*/
val requiredNodes = countObservables.size
// This is an observable which yield the minimum number of nodes in each node network map.
val smallestSeenNetworkMapSize = Observable.combineLatest(countObservables.values.toList()) { args: Array<Any> ->
args.map { it as Int }.min() ?: 0
}
val future = smallestSeenNetworkMapSize.filter { it >= requiredNodes }.toFuture()
counterObservable.connect()
return future
}
private fun startOutOfProcessNodeRegistration(config: Config, configuration: NodeConfiguration): CordaFuture<Unit> {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val process = startOutOfProcessNode(configuration, config, quasarJarPath, debugPort,
systemProperties, cordappPackages, "200m", initialRegistration = true)
return poll(executorService, "node registration (${configuration.myLegalName})") {
if (process.isAlive) null else Unit
}
}
private fun startNodeInternal(config: Config,
webAddress: NetworkHostAndPort,
startInProcess: Boolean?,
maximumHeapSize: String): CordaFuture<NodeHandle> {
val configuration = config.parseAsNodeConfiguration()
val baseDirectory = configuration.baseDirectory.createDirectories()
// Distribute node info file using file copier when network map service URL (compatibilityZoneURL) is null.
// TODO: need to implement the same in cordformation?
val nodeInfoFilesCopier = if (compatibilityZone == null) nodeInfoFilesCopier else null
nodeInfoFilesCopier?.addConfig(baseDirectory)
networkParameters!!.install(baseDirectory)
val onNodeExit: () -> Unit = {
nodeInfoFilesCopier?.removeConfig(baseDirectory)
countObservables.remove(configuration.myLegalName)
}
if (startInProcess ?: startNodesInProcess) {
val nodeAndThreadFuture = startInProcessNode(executorService, configuration, config, cordappPackages)
shutdownManager.registerShutdown(
nodeAndThreadFuture.map { (node, thread) ->
{
node.dispose()
thread.interrupt()
}
}
)
return nodeAndThreadFuture.flatMap { (node, thread) ->
establishRpc(configuration, openFuture()).flatMap { rpc ->
allNodesConnected(rpc).map {
NodeHandle.InProcess(rpc.nodeInfo(), rpc, configuration, webAddress, node, thread, onNodeExit)
}
}
}
} else {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val process = startOutOfProcessNode(configuration, config, quasarJarPath, debugPort,
systemProperties, cordappPackages, maximumHeapSize, initialRegistration = false)
if (waitForNodesToFinish) {
state.locked {
processes += process
}
} else {
shutdownManager.registerProcessShutdown(process)
}
val p2pReadyFuture = addressMustBeBoundFuture(executorService, configuration.p2pAddress, process)
return p2pReadyFuture.flatMap {
val processDeathFuture = poll(executorService, "process death while waiting for RPC (${configuration.myLegalName})") {
if (process.isAlive) null else process
}
establishRpc(configuration, processDeathFuture).flatMap { rpc ->
// Check for all nodes to have all other nodes in background in case RPC is failing over:
val networkMapFuture = executorService.fork { allNodesConnected(rpc) }.flatMap { it }
firstOf(processDeathFuture, networkMapFuture) {
if (it == processDeathFuture) {
throw ListenProcessDeathException(configuration.p2pAddress, process)
}
processDeathFuture.cancel(false)
log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: $webAddress")
NodeHandle.OutOfProcess(rpc.nodeInfo(), rpc, configuration, webAddress, debugPort, process,
onNodeExit)
}
}
}
}
}
override fun <A> pollUntilNonNull(pollName: String, pollInterval: Duration, warnCount: Int, check: () -> A?): CordaFuture<A> {
val pollFuture = poll(executorService, pollName, pollInterval, warnCount, check)
shutdownManager.registerShutdown { pollFuture.cancel(true) }
return pollFuture
}
companion object {
private val defaultRpcUserList = listOf(User("default", "default", setOf("ALL")).toConfig().root().unwrapped())
private val names = arrayOf(
ALICE.name,
BOB.name,
DUMMY_BANK_A.name
)
private fun <A> oneOf(array: Array<A>) = array[Random().nextInt(array.size)]
private fun startInProcessNode(
executorService: ScheduledExecutorService,
nodeConf: NodeConfiguration,
config: Config,
cordappPackages: List<String>
): CordaFuture<Pair<StartedNode<Node>, Thread>> {
return executorService.fork {
log.info("Starting in-process Node ${nodeConf.myLegalName.organisation}")
// Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config)
// TODO pass the version in?
val node = InProcessNode(nodeConf, MOCK_VERSION_INFO, cordappPackages).start()
val nodeThread = thread(name = nodeConf.myLegalName.organisation) {
node.internals.run()
}
node to nodeThread
}.flatMap { nodeAndThread ->
addressMustBeBoundFuture(executorService, nodeConf.p2pAddress).map { nodeAndThread }
}
}
private fun startOutOfProcessNode(
nodeConf: NodeConfiguration,
config: Config,
quasarJarPath: String,
debugPort: Int?,
overriddenSystemProperties: Map<String, String>,
cordappPackages: List<String>,
maximumHeapSize: String,
initialRegistration: Boolean
): Process {
log.info("Starting out-of-process Node ${nodeConf.myLegalName.organisation}, debug port is " + (debugPort ?: "not enabled"))
// Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config)
val systemProperties = overriddenSystemProperties + mapOf(
"name" to nodeConf.myLegalName,
"visualvm.display.name" to "corda-${nodeConf.myLegalName}",
Node.scanPackagesSystemProperty to cordappPackages.joinToString(Node.scanPackagesSeparator),
"java.io.tmpdir" to System.getProperty("java.io.tmpdir"), // Inherit from parent process
"log4j2.debug" to if(debugPort != null) "true" else "false"
)
// See experimental/quasar-hook/README.md for how to generate.
val excludePattern = "x(antlr**;bftsmart**;ch**;co.paralleluniverse**;com.codahale**;com.esotericsoftware**;" +
"com.fasterxml**;com.google**;com.ibm**;com.intellij**;com.jcabi**;com.nhaarman**;com.opengamma**;" +
"com.typesafe**;com.zaxxer**;de.javakaffee**;groovy**;groovyjarjarantlr**;groovyjarjarasm**;io.atomix**;" +
"io.github**;io.netty**;jdk**;joptsimple**;junit**;kotlin**;net.bytebuddy**;net.i2p**;org.apache**;" +
"org.assertj**;org.bouncycastle**;org.codehaus**;org.crsh**;org.dom4j**;org.fusesource**;org.h2**;" +
"org.hamcrest**;org.hibernate**;org.jboss**;org.jcp**;org.joda**;org.junit**;org.mockito**;org.objectweb**;" +
"org.objenesis**;org.slf4j**;org.w3c**;org.xml**;org.yaml**;reflectasm**;rx**)"
val extraJvmArguments = systemProperties.removeResolvedClasspath().map { "-D${it.key}=${it.value}" } +
"-javaagent:$quasarJarPath=$excludePattern"
val loggingLevel = if (debugPort == null) "INFO" else "DEBUG"
val arguments = mutableListOf<String>(
"--base-directory=${nodeConf.baseDirectory}",
"--logging-level=$loggingLevel",
"--no-local-shell").also {
if (initialRegistration) {
it += "--initial-registration"
}
}.toList()
return ProcessUtilities.startCordaProcess(
className = "net.corda.node.Corda", // cannot directly get class for this, so just use string
arguments = arguments,
jdwpPort = debugPort,
extraJvmArguments = extraJvmArguments,
errorLogPath = nodeConf.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "error.log",
workingDirectory = nodeConf.baseDirectory,
maximumHeapSize = maximumHeapSize
)
}
private fun startWebserver(handle: NodeHandle, debugPort: Int?, maximumHeapSize: String): Process {
val className = "net.corda.webserver.WebServer"
return ProcessUtilities.startCordaProcess(
className = className, // cannot directly get class for this, so just use string
arguments = listOf("--base-directory", handle.configuration.baseDirectory.toString()),
jdwpPort = debugPort,
extraJvmArguments = listOf(
"-Dname=node-${handle.configuration.p2pAddress}-webserver",
"-Djava.io.tmpdir=${System.getProperty("java.io.tmpdir")}" // Inherit from parent process
),
errorLogPath = Paths.get("error.$className.log"),
workingDirectory = null,
maximumHeapSize = maximumHeapSize
)
}
private fun getCallerPackage(): String {
return Exception()
.stackTrace
.first { it.fileName != "Driver.kt" }
.let { Class.forName(it.className).`package`?.name }
?: throw IllegalStateException("Function instantiating driver must be defined in a package.")
}
/**
* We have an alternative way of specifying classpath for spawned process: by using "-cp" option. So duplicating the setting of this
* rather long string is un-necessary and can be harmful on Windows.
*/
private fun Map<String, Any>.removeResolvedClasspath(): Map<String, Any> {
return filterNot { it.key == "java.class.path" }
}
}
}
fun writeConfig(path: Path, filename: String, config: Config) { fun writeConfig(path: Path, filename: String, config: Config) {
val configString = config.root().render(ConfigRenderOptions.defaults()) val configString = config.root().render(ConfigRenderOptions.defaults())
configString.byteInputStream().copyTo(path / filename, REPLACE_EXISTING) configString.byteInputStream().copyTo(path / filename, REPLACE_EXISTING)

View File

@ -0,0 +1,90 @@
package net.corda.testing.driver
import net.corda.cordform.CordformContext
import net.corda.core.concurrent.CordaFuture
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.map
import net.corda.node.services.config.VerifierType
import net.corda.nodeapi.internal.config.User
interface DriverDSL : CordformContext {
/** Returns a list of [NotaryHandle]s matching the list of [NotarySpec]s passed into [driver]. */
val notaryHandles: List<NotaryHandle>
/**
* Returns the [NotaryHandle] for the single notary on the network. Throws if there are none or more than one.
* @see notaryHandles
*/
val defaultNotaryHandle: NotaryHandle
get() {
return when (notaryHandles.size) {
0 -> throw IllegalStateException("There are no notaries defined on the network")
1 -> notaryHandles[0]
else -> throw IllegalStateException("There is more than one notary defined on the network")
}
}
/**
* Returns the identity of the single notary on the network. Throws if there are none or more than one.
* @see defaultNotaryHandle
*/
val defaultNotaryIdentity: Party get() = defaultNotaryHandle.identity
/**
* Returns a [CordaFuture] on the [NodeHandle] for the single-node notary on the network. Throws if there
* are no notaries or more than one, or if the notary is a distributed cluster.
* @see defaultNotaryHandle
* @see notaryHandles
*/
val defaultNotaryNode: CordaFuture<NodeHandle>
get() {
return defaultNotaryHandle.nodeHandles.map {
it.singleOrNull() ?: throw IllegalStateException("Default notary is not a single node")
}
}
/**
* Start a node.
*
* @param defaultParameters The default parameters for the node. Allows the node to be configured in builder style
* when called from Java code.
* @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something
* random. Note that this must be unique as the driver uses it as a primary key!
* @param verifierType The type of transaction verifier to use. See: [VerifierType]
* @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list.
* @param startInSameProcess Determines if the node should be started inside the same process the Driver is running
* in. If null the Driver-level value will be used.
* @return A [CordaFuture] on the [NodeHandle] to the node. The future will complete when the node is available.
*/
fun startNode(
defaultParameters: NodeParameters = NodeParameters(),
providedName: CordaX500Name? = defaultParameters.providedName,
rpcUsers: List<User> = defaultParameters.rpcUsers,
verifierType: VerifierType = defaultParameters.verifierType,
customOverrides: Map<String, Any?> = defaultParameters.customOverrides,
startInSameProcess: Boolean? = defaultParameters.startInSameProcess,
maximumHeapSize: String = defaultParameters.maximumHeapSize
): CordaFuture<NodeHandle>
/**
* Helper function for starting a [Node] with custom parameters from Java.
*
* @param parameters The default parameters for the driver.
* @return [NodeHandle] that will be available sometime in the future.
*/
fun startNode(parameters: NodeParameters): CordaFuture<NodeHandle> = startNode(defaultParameters = parameters)
/** Call [startWebserver] with a default maximumHeapSize. */
fun startWebserver(handle: NodeHandle): CordaFuture<WebserverHandle> = startWebserver(handle, "200m")
/**
* Starts a web server for a node
* @param handle The handle for the node that this webserver connects to via RPC.
* @param maximumHeapSize Argument for JVM -Xmx option e.g. "200m".
*/
fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture<WebserverHandle>
val shutdownManager: ShutdownManager
}

View File

@ -0,0 +1,696 @@
package net.corda.testing.internal
import com.google.common.collect.HashMultimap
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import net.corda.client.rpc.CordaRPCClient
import net.corda.cordform.CordformNode
import net.corda.core.concurrent.CordaFuture
import net.corda.core.concurrent.firstOf
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.concurrent.*
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NotaryService
import net.corda.core.toFuture
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.Node
import net.corda.node.internal.NodeStartup
import net.corda.node.internal.StartedNode
import net.corda.node.services.Permissions
import net.corda.node.services.config.*
import net.corda.node.services.transactions.BFTNonValidatingNotaryService
import net.corda.node.services.transactions.RaftNonValidatingNotaryService
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.utilities.registration.HTTPNetworkRegistrationService
import net.corda.node.utilities.registration.NetworkRegistrationHelper
import net.corda.nodeapi.internal.NetworkParametersCopier
import net.corda.nodeapi.internal.NodeInfoFilesCopier
import net.corda.nodeapi.internal.NotaryInfo
import net.corda.nodeapi.internal.ServiceIdentityGenerator
import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.config.parseAs
import net.corda.nodeapi.internal.config.toConfig
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.testing.DUMMY_BANK_A
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.driver.*
import net.corda.testing.internal.DriverDSLImpl.ClusterType.NON_VALIDATING_RAFT
import net.corda.testing.internal.DriverDSLImpl.ClusterType.VALIDATING_RAFT
import net.corda.testing.node.ClusterSpec
import net.corda.testing.node.MockServices
import net.corda.testing.node.NotarySpec
import okhttp3.OkHttpClient
import okhttp3.Request
import rx.Observable
import rx.observables.ConnectableObservable
import rx.schedulers.Schedulers
import java.net.ConnectException
import java.net.URL
import java.net.URLClassLoader
import java.nio.file.Path
import java.nio.file.Paths
import java.security.cert.X509Certificate
import java.time.Duration
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
class DriverDSLImpl(
val portAllocation: PortAllocation,
val debugPortAllocation: PortAllocation,
val systemProperties: Map<String, String>,
val driverDirectory: Path,
val useTestClock: Boolean,
val isDebug: Boolean,
val startNodesInProcess: Boolean,
val waitForNodesToFinish: Boolean,
extraCordappPackagesToScan: List<String>,
val notarySpecs: List<NotarySpec>,
val compatibilityZone: CompatibilityZoneParams?
) : DriverDSLInternalInterface {
private var _executorService: ScheduledExecutorService? = null
val executorService get() = _executorService!!
private var _shutdownManager: ShutdownManager? = null
override val shutdownManager get() = _shutdownManager!!
private val cordappPackages = extraCordappPackagesToScan + getCallerPackage()
// TODO: this object will copy NodeInfo files from started nodes to other nodes additional-node-infos/
// This uses the FileSystem and adds a delay (~5 seconds) given by the time we wait before polling the file system.
// Investigate whether we can avoid that.
// TODO: NodeInfoFilesCopier create observable threads in the init method, we should move that to a start method instead, changing this to lateinit instead to prevent that.
private lateinit var nodeInfoFilesCopier: NodeInfoFilesCopier
// Map from a nodes legal name to an observable emitting the number of nodes in its network map.
private val countObservables = mutableMapOf<CordaX500Name, Observable<Int>>()
private lateinit var _notaries: List<NotaryHandle>
override val notaryHandles: List<NotaryHandle> get() = _notaries
private var networkParameters: NetworkParametersCopier? = null
class State {
val processes = ArrayList<Process>()
}
private val state = ThreadBox(State())
//TODO: remove this once we can bundle quasar properly.
private val quasarJarPath: String by lazy {
val cl = ClassLoader.getSystemClassLoader()
val urls = (cl as URLClassLoader).urLs
val quasarPattern = ".*quasar.*\\.jar$".toRegex()
val quasarFileUrl = urls.first { quasarPattern.matches(it.path) }
Paths.get(quasarFileUrl.toURI()).toString()
}
override fun shutdown() {
if (waitForNodesToFinish) {
state.locked {
processes.forEach { it.waitFor() }
}
}
_shutdownManager?.shutdown()
_executorService?.shutdownNow()
}
private fun establishRpc(config: NodeConfiguration, processDeathFuture: CordaFuture<out Process>): CordaFuture<CordaRPCOps> {
val rpcAddress = config.rpcAddress!!
val client = CordaRPCClient(rpcAddress)
val connectionFuture = poll(executorService, "RPC connection") {
try {
client.start(config.rpcUsers[0].username, config.rpcUsers[0].password)
} catch (e: Exception) {
if (processDeathFuture.isDone) throw e
log.error("Exception $e, Retrying RPC connection at $rpcAddress")
null
}
}
return firstOf(connectionFuture, processDeathFuture) {
if (it == processDeathFuture) {
throw ListenProcessDeathException(rpcAddress, processDeathFuture.getOrThrow())
}
val connection = connectionFuture.getOrThrow()
shutdownManager.registerShutdown(connection::close)
connection.proxy
}
}
override fun startNode(
defaultParameters: NodeParameters,
providedName: CordaX500Name?,
rpcUsers: List<User>,
verifierType: VerifierType,
customOverrides: Map<String, Any?>,
startInSameProcess: Boolean?,
maximumHeapSize: String
): CordaFuture<NodeHandle> {
val p2pAddress = portAllocation.nextHostAndPort()
// TODO: Derive name from the full picked name, don't just wrap the common name
val name = providedName ?: CordaX500Name(organisation = "${oneOf(names).organisation}-${p2pAddress.port}", locality = "London", country = "GB")
val registrationFuture = if (compatibilityZone?.rootCert != null) {
nodeRegistration(name, compatibilityZone.rootCert, compatibilityZone.url)
} else {
doneFuture(Unit)
}
return registrationFuture.flatMap {
val rpcAddress = portAllocation.nextHostAndPort()
val webAddress = portAllocation.nextHostAndPort()
val users = rpcUsers.map { it.copy(permissions = it.permissions + DRIVER_REQUIRED_PERMISSIONS) }
val configMap = configOf(
"myLegalName" to name.toString(),
"p2pAddress" to p2pAddress.toString(),
"rpcAddress" to rpcAddress.toString(),
"webAddress" to webAddress.toString(),
"useTestClock" to useTestClock,
"rpcUsers" to if (users.isEmpty()) defaultRpcUserList else users.map { it.toConfig().root().unwrapped() },
"verifierType" to verifierType.name
) + customOverrides
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
configOverrides = if (compatibilityZone != null) {
configMap + mapOf("compatibilityZoneURL" to compatibilityZone.url.toString())
} else {
configMap
}
)
startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize)
}
}
private fun nodeRegistration(providedName: CordaX500Name, rootCert: X509Certificate, compatibilityZoneURL: URL): CordaFuture<Unit> {
val baseDirectory = baseDirectory(providedName).createDirectories()
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory,
allowMissingConfig = true,
configOverrides = configOf(
"p2pAddress" to "localhost:1222", // required argument, not really used
"compatibilityZoneURL" to compatibilityZoneURL.toString(),
"myLegalName" to providedName.toString())
)
val configuration = config.parseAsNodeConfiguration()
configuration.rootCertFile.parent.createDirectories()
X509Utilities.saveCertificateAsPEMFile(rootCert, configuration.rootCertFile)
return if (startNodesInProcess) {
// This is a bit cheating, we're not starting a full node, we're just calling the code nodes call
// when registering.
NetworkRegistrationHelper(configuration, HTTPNetworkRegistrationService(compatibilityZoneURL)).buildKeystore()
doneFuture(Unit)
} else {
startOutOfProcessNodeRegistration(config, configuration)
}
}
private enum class ClusterType(val validating: Boolean, val clusterName: CordaX500Name) {
VALIDATING_RAFT(true, CordaX500Name(RaftValidatingNotaryService.id, "Raft", "Zurich", "CH")),
NON_VALIDATING_RAFT(false, CordaX500Name(RaftNonValidatingNotaryService.id, "Raft", "Zurich", "CH")),
NON_VALIDATING_BFT(false, CordaX500Name(BFTNonValidatingNotaryService.id, "BFT", "Zurich", "CH"))
}
internal fun startCordformNodes(cordforms: List<CordformNode>): CordaFuture<*> {
val clusterNodes = HashMultimap.create<ClusterType, CordaX500Name>()
val notaryInfos = ArrayList<NotaryInfo>()
// Go though the node definitions and pick out the notaries so that we can generate their identities to be used
// in the network parameters
for (cordform in cordforms) {
if (cordform.notary == null) continue
val name = CordaX500Name.parse(cordform.name)
val notaryConfig = ConfigFactory.parseMap(cordform.notary).parseAs<NotaryConfig>()
// We need to first group the nodes that form part of a cluser. We assume for simplicity that nodes of the
// same cluster type and validating flag are part of the same cluster.
if (notaryConfig.raft != null) {
val key = if (notaryConfig.validating) VALIDATING_RAFT else NON_VALIDATING_RAFT
clusterNodes.put(key, name)
} else if (notaryConfig.bftSMaRt != null) {
clusterNodes.put(ClusterType.NON_VALIDATING_BFT, name)
} else {
// We have all we need here to generate the identity for single node notaries
val identity = ServiceIdentityGenerator.generateToDisk(
dirs = listOf(baseDirectory(name)),
serviceName = name,
serviceId = "identity"
)
notaryInfos += NotaryInfo(identity, notaryConfig.validating)
}
}
clusterNodes.asMap().forEach { type, nodeNames ->
val identity = ServiceIdentityGenerator.generateToDisk(
dirs = nodeNames.map { baseDirectory(it) },
serviceName = type.clusterName,
serviceId = NotaryService.constructId(
validating = type.validating,
raft = type in setOf(VALIDATING_RAFT, NON_VALIDATING_RAFT),
bft = type == ClusterType.NON_VALIDATING_BFT
)
)
notaryInfos += NotaryInfo(identity, type.validating)
}
networkParameters = NetworkParametersCopier(testNetworkParameters(notaryInfos))
return cordforms.map {
val startedNode = startCordformNode(it)
if (it.webAddress != null) {
// Start a webserver if an address for it was specified
startedNode.flatMap { startWebserver(it) }
} else {
startedNode
}
}.transpose()
}
private fun startCordformNode(cordform: CordformNode): CordaFuture<NodeHandle> {
val name = CordaX500Name.parse(cordform.name)
// TODO We shouldn't have to allocate an RPC or web address if they're not specified. We're having to do this because of startNodeInternal
val rpcAddress = if (cordform.rpcAddress == null) mapOf("rpcAddress" to portAllocation.nextHostAndPort().toString()) else emptyMap()
val webAddress = cordform.webAddress?.let { NetworkHostAndPort.parse(it) } ?: portAllocation.nextHostAndPort()
val notary = if (cordform.notary != null) mapOf("notary" to cordform.notary) else emptyMap()
val rpcUsers = cordform.rpcUsers
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
configOverrides = cordform.config + rpcAddress + notary + mapOf(
"rpcUsers" to if (rpcUsers.isEmpty()) defaultRpcUserList else rpcUsers
)
)
return startNodeInternal(config, webAddress, null, "200m")
}
private fun queryWebserver(handle: NodeHandle, process: Process): WebserverHandle {
val protocol = if (handle.configuration.useHTTPS) "https://" else "http://"
val url = URL("$protocol${handle.webAddress}/api/status")
val client = OkHttpClient.Builder().connectTimeout(5, TimeUnit.SECONDS).readTimeout(60, TimeUnit.SECONDS).build()
while (process.isAlive) try {
val response = client.newCall(Request.Builder().url(url).build()).execute()
if (response.isSuccessful && (response.body().string() == "started")) {
return WebserverHandle(handle.webAddress, process)
}
} catch (e: ConnectException) {
log.debug("Retrying webserver info at ${handle.webAddress}")
}
throw IllegalStateException("Webserver at ${handle.webAddress} has died")
}
override fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture<WebserverHandle> {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val process = startWebserver(handle, debugPort, maximumHeapSize)
shutdownManager.registerProcessShutdown(process)
val webReadyFuture = addressMustBeBoundFuture(executorService, handle.webAddress, process)
return webReadyFuture.map { queryWebserver(handle, process) }
}
override fun start() {
if (startNodesInProcess) {
Schedulers.reset()
}
_executorService = Executors.newScheduledThreadPool(2, ThreadFactoryBuilder().setNameFormat("driver-pool-thread-%d").build())
_shutdownManager = ShutdownManager(executorService)
nodeInfoFilesCopier = NodeInfoFilesCopier()
shutdownManager.registerShutdown { nodeInfoFilesCopier.close() }
val notaryInfos = generateNotaryIdentities()
// The network parameters must be serialised before starting any of the nodes
networkParameters = NetworkParametersCopier(testNetworkParameters(notaryInfos))
val nodeHandles = startNotaries()
_notaries = notaryInfos.zip(nodeHandles) { (identity, validating), nodes -> NotaryHandle(identity, validating, nodes) }
}
private fun generateNotaryIdentities(): List<NotaryInfo> {
return notarySpecs.map { spec ->
val identity = if (spec.cluster == null) {
ServiceIdentityGenerator.generateToDisk(
dirs = listOf(baseDirectory(spec.name)),
serviceName = spec.name,
serviceId = "identity",
customRootCert = compatibilityZone?.rootCert
)
} else {
ServiceIdentityGenerator.generateToDisk(
dirs = generateNodeNames(spec).map { baseDirectory(it) },
serviceName = spec.name,
serviceId = NotaryService.constructId(
validating = spec.validating,
raft = spec.cluster is ClusterSpec.Raft
),
customRootCert = compatibilityZone?.rootCert
)
}
NotaryInfo(identity, spec.validating)
}
}
private fun generateNodeNames(spec: NotarySpec): List<CordaX500Name> {
return (0 until spec.cluster!!.clusterSize).map { spec.name.copy(organisation = "${spec.name.organisation}-$it") }
}
private fun startNotaries(): List<CordaFuture<List<NodeHandle>>> {
return notarySpecs.map {
when {
it.cluster == null -> startSingleNotary(it)
it.cluster is ClusterSpec.Raft -> startRaftNotaryCluster(it)
else -> throw IllegalArgumentException("BFT-SMaRt not supported")
}
}
}
// TODO This mapping is done is several places including the gradle plugin. In general we need a better way of
// generating the configs for the nodes, probably making use of Any.toConfig()
private fun NotaryConfig.toConfigMap(): Map<String, Any> = mapOf("notary" to toConfig().root().unwrapped())
private fun startSingleNotary(spec: NotarySpec): CordaFuture<List<NodeHandle>> {
return startNode(
providedName = spec.name,
rpcUsers = spec.rpcUsers,
verifierType = spec.verifierType,
customOverrides = NotaryConfig(spec.validating).toConfigMap()
).map { listOf(it) }
}
private fun startRaftNotaryCluster(spec: NotarySpec): CordaFuture<List<NodeHandle>> {
fun notaryConfig(nodeAddress: NetworkHostAndPort, clusterAddress: NetworkHostAndPort? = null): Map<String, Any> {
val clusterAddresses = if (clusterAddress != null) listOf(clusterAddress) else emptyList()
val config = NotaryConfig(
validating = spec.validating,
raft = RaftConfig(nodeAddress = nodeAddress, clusterAddresses = clusterAddresses))
return config.toConfigMap()
}
val nodeNames = generateNodeNames(spec)
val clusterAddress = portAllocation.nextHostAndPort()
// Start the first node that will bootstrap the cluster
val firstNodeFuture = startNode(
providedName = nodeNames[0],
rpcUsers = spec.rpcUsers,
verifierType = spec.verifierType,
customOverrides = notaryConfig(clusterAddress) + mapOf(
"database.serverNameTablePrefix" to nodeNames[0].toString().replace(Regex("[^0-9A-Za-z]+"), "")
)
)
// All other nodes will join the cluster
val restNodeFutures = nodeNames.drop(1).map {
val nodeAddress = portAllocation.nextHostAndPort()
startNode(
providedName = it,
rpcUsers = spec.rpcUsers,
verifierType = spec.verifierType,
customOverrides = notaryConfig(nodeAddress, clusterAddress) + mapOf(
"database.serverNameTablePrefix" to it.toString().replace(Regex("[^0-9A-Za-z]+"), "")
)
)
}
return firstNodeFuture.flatMap { first ->
restNodeFutures.transpose().map { rest -> listOf(first) + rest }
}
}
fun baseDirectory(nodeName: CordaX500Name): Path {
val nodeDirectoryName = nodeName.organisation.filter { !it.isWhitespace() }
return driverDirectory / nodeDirectoryName
}
override fun baseDirectory(nodeName: String): Path = baseDirectory(CordaX500Name.parse(nodeName))
/**
* @param initial number of nodes currently in the network map of a running node.
* @param networkMapCacheChangeObservable an observable returning the updates to the node network map.
* @return a [ConnectableObservable] which emits a new [Int] every time the number of registered nodes changes
* the initial value emitted is always [initial]
*/
private fun nodeCountObservable(initial: Int, networkMapCacheChangeObservable: Observable<NetworkMapCache.MapChange>):
ConnectableObservable<Int> {
val count = AtomicInteger(initial)
return networkMapCacheChangeObservable.map { it ->
when (it) {
is NetworkMapCache.MapChange.Added -> count.incrementAndGet()
is NetworkMapCache.MapChange.Removed -> count.decrementAndGet()
is NetworkMapCache.MapChange.Modified -> count.get()
}
}.startWith(initial).replay()
}
/**
* @param rpc the [CordaRPCOps] of a newly started node.
* @return a [CordaFuture] which resolves when every node started by driver has in its network map a number of nodes
* equal to the number of running nodes. The future will yield the number of connected nodes.
*/
private fun allNodesConnected(rpc: CordaRPCOps): CordaFuture<Int> {
val (snapshot, updates) = rpc.networkMapFeed()
val counterObservable = nodeCountObservable(snapshot.size, updates)
countObservables[rpc.nodeInfo().legalIdentities[0].name] = counterObservable
/* TODO: this might not always be the exact number of nodes one has to wait for,
* for example in the following sequence
* 1 start 3 nodes in order, A, B, C.
* 2 before the future returned by this function resolves, kill B
* At that point this future won't ever resolve as it will wait for nodes to know 3 other nodes.
*/
val requiredNodes = countObservables.size
// This is an observable which yield the minimum number of nodes in each node network map.
val smallestSeenNetworkMapSize = Observable.combineLatest(countObservables.values.toList()) { args: Array<Any> ->
args.map { it as Int }.min() ?: 0
}
val future = smallestSeenNetworkMapSize.filter { it >= requiredNodes }.toFuture()
counterObservable.connect()
return future
}
private fun startOutOfProcessNodeRegistration(config: Config, configuration: NodeConfiguration): CordaFuture<Unit> {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val process = startOutOfProcessNode(configuration, config, quasarJarPath, debugPort,
systemProperties, cordappPackages, "200m", initialRegistration = true)
return poll(executorService, "node registration (${configuration.myLegalName})") {
if (process.isAlive) null else Unit
}
}
private fun startNodeInternal(config: Config,
webAddress: NetworkHostAndPort,
startInProcess: Boolean?,
maximumHeapSize: String): CordaFuture<NodeHandle> {
val configuration = config.parseAsNodeConfiguration()
val baseDirectory = configuration.baseDirectory.createDirectories()
// Distribute node info file using file copier when network map service URL (compatibilityZoneURL) is null.
// TODO: need to implement the same in cordformation?
val nodeInfoFilesCopier = if (compatibilityZone == null) nodeInfoFilesCopier else null
nodeInfoFilesCopier?.addConfig(baseDirectory)
networkParameters!!.install(baseDirectory)
val onNodeExit: () -> Unit = {
nodeInfoFilesCopier?.removeConfig(baseDirectory)
countObservables.remove(configuration.myLegalName)
}
if (startInProcess ?: startNodesInProcess) {
val nodeAndThreadFuture = startInProcessNode(executorService, configuration, config, cordappPackages)
shutdownManager.registerShutdown(
nodeAndThreadFuture.map { (node, thread) ->
{
node.dispose()
thread.interrupt()
}
}
)
return nodeAndThreadFuture.flatMap { (node, thread) ->
establishRpc(configuration, openFuture()).flatMap { rpc ->
allNodesConnected(rpc).map {
NodeHandle.InProcess(rpc.nodeInfo(), rpc, configuration, webAddress, node, thread, onNodeExit)
}
}
}
} else {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val process = startOutOfProcessNode(configuration, config, quasarJarPath, debugPort,
systemProperties, cordappPackages, maximumHeapSize, initialRegistration = false)
if (waitForNodesToFinish) {
state.locked {
processes += process
}
} else {
shutdownManager.registerProcessShutdown(process)
}
val p2pReadyFuture = addressMustBeBoundFuture(executorService, configuration.p2pAddress, process)
return p2pReadyFuture.flatMap {
val processDeathFuture = poll(executorService, "process death while waiting for RPC (${configuration.myLegalName})") {
if (process.isAlive) null else process
}
establishRpc(configuration, processDeathFuture).flatMap { rpc ->
// Check for all nodes to have all other nodes in background in case RPC is failing over:
val networkMapFuture = executorService.fork { allNodesConnected(rpc) }.flatMap { it }
firstOf(processDeathFuture, networkMapFuture) {
if (it == processDeathFuture) {
throw ListenProcessDeathException(configuration.p2pAddress, process)
}
processDeathFuture.cancel(false)
log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: $webAddress")
NodeHandle.OutOfProcess(rpc.nodeInfo(), rpc, configuration, webAddress, debugPort, process,
onNodeExit)
}
}
}
}
}
override fun <A> pollUntilNonNull(pollName: String, pollInterval: Duration, warnCount: Int, check: () -> A?): CordaFuture<A> {
val pollFuture = poll(executorService, pollName, pollInterval, warnCount, check)
shutdownManager.registerShutdown { pollFuture.cancel(true) }
return pollFuture
}
companion object {
internal val log = contextLogger()
private val defaultRpcUserList = listOf(User("default", "default", setOf("ALL")).toConfig().root().unwrapped())
private val names = arrayOf(
ALICE.name,
BOB.name,
DUMMY_BANK_A.name
)
/**
* A sub-set of permissions that grant most of the essential operations used in the unit/integration tests as well as
* in demo application like NodeExplorer.
*/
private val DRIVER_REQUIRED_PERMISSIONS = setOf(
Permissions.invokeRpc(CordaRPCOps::nodeInfo),
Permissions.invokeRpc(CordaRPCOps::networkMapFeed),
Permissions.invokeRpc(CordaRPCOps::networkMapSnapshot),
Permissions.invokeRpc(CordaRPCOps::notaryIdentities),
Permissions.invokeRpc(CordaRPCOps::stateMachinesFeed),
Permissions.invokeRpc(CordaRPCOps::stateMachineRecordedTransactionMappingFeed),
Permissions.invokeRpc(CordaRPCOps::nodeInfoFromParty),
Permissions.invokeRpc(CordaRPCOps::internalVerifiedTransactionsFeed),
Permissions.invokeRpc("vaultQueryBy"),
Permissions.invokeRpc("vaultTrackBy"),
Permissions.invokeRpc(CordaRPCOps::registeredFlows)
)
private fun <A> oneOf(array: Array<A>) = array[Random().nextInt(array.size)]
private fun startInProcessNode(
executorService: ScheduledExecutorService,
nodeConf: NodeConfiguration,
config: Config,
cordappPackages: List<String>
): CordaFuture<Pair<StartedNode<Node>, Thread>> {
return executorService.fork {
log.info("Starting in-process Node ${nodeConf.myLegalName.organisation}")
// Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config)
// TODO pass the version in?
val node = InProcessNode(nodeConf, MockServices.MOCK_VERSION_INFO, cordappPackages).start()
val nodeThread = thread(name = nodeConf.myLegalName.organisation) {
node.internals.run()
}
node to nodeThread
}.flatMap { nodeAndThread ->
addressMustBeBoundFuture(executorService, nodeConf.p2pAddress).map { nodeAndThread }
}
}
private fun startOutOfProcessNode(
nodeConf: NodeConfiguration,
config: Config,
quasarJarPath: String,
debugPort: Int?,
overriddenSystemProperties: Map<String, String>,
cordappPackages: List<String>,
maximumHeapSize: String,
initialRegistration: Boolean
): Process {
log.info("Starting out-of-process Node ${nodeConf.myLegalName.organisation}, debug port is " + (debugPort ?: "not enabled"))
// Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config)
val systemProperties = overriddenSystemProperties + mapOf(
"name" to nodeConf.myLegalName,
"visualvm.display.name" to "corda-${nodeConf.myLegalName}",
Node.scanPackagesSystemProperty to cordappPackages.joinToString(Node.scanPackagesSeparator),
"java.io.tmpdir" to System.getProperty("java.io.tmpdir"), // Inherit from parent process
"log4j2.debug" to if(debugPort != null) "true" else "false"
)
// See experimental/quasar-hook/README.md for how to generate.
val excludePattern = "x(antlr**;bftsmart**;ch**;co.paralleluniverse**;com.codahale**;com.esotericsoftware**;" +
"com.fasterxml**;com.google**;com.ibm**;com.intellij**;com.jcabi**;com.nhaarman**;com.opengamma**;" +
"com.typesafe**;com.zaxxer**;de.javakaffee**;groovy**;groovyjarjarantlr**;groovyjarjarasm**;io.atomix**;" +
"io.github**;io.netty**;jdk**;joptsimple**;junit**;kotlin**;net.bytebuddy**;net.i2p**;org.apache**;" +
"org.assertj**;org.bouncycastle**;org.codehaus**;org.crsh**;org.dom4j**;org.fusesource**;org.h2**;" +
"org.hamcrest**;org.hibernate**;org.jboss**;org.jcp**;org.joda**;org.junit**;org.mockito**;org.objectweb**;" +
"org.objenesis**;org.slf4j**;org.w3c**;org.xml**;org.yaml**;reflectasm**;rx**)"
val extraJvmArguments = systemProperties.removeResolvedClasspath().map { "-D${it.key}=${it.value}" } +
"-javaagent:$quasarJarPath=$excludePattern"
val loggingLevel = if (debugPort == null) "INFO" else "DEBUG"
val arguments = mutableListOf<String>(
"--base-directory=${nodeConf.baseDirectory}",
"--logging-level=$loggingLevel",
"--no-local-shell").also {
if (initialRegistration) {
it += "--initial-registration"
}
}.toList()
return ProcessUtilities.startCordaProcess(
className = "net.corda.node.Corda", // cannot directly get class for this, so just use string
arguments = arguments,
jdwpPort = debugPort,
extraJvmArguments = extraJvmArguments,
errorLogPath = nodeConf.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "error.log",
workingDirectory = nodeConf.baseDirectory,
maximumHeapSize = maximumHeapSize
)
}
private fun startWebserver(handle: NodeHandle, debugPort: Int?, maximumHeapSize: String): Process {
val className = "net.corda.webserver.WebServer"
return ProcessUtilities.startCordaProcess(
className = className, // cannot directly get class for this, so just use string
arguments = listOf("--base-directory", handle.configuration.baseDirectory.toString()),
jdwpPort = debugPort,
extraJvmArguments = listOf(
"-Dname=node-${handle.configuration.p2pAddress}-webserver",
"-Djava.io.tmpdir=${System.getProperty("java.io.tmpdir")}" // Inherit from parent process
),
errorLogPath = Paths.get("error.$className.log"),
workingDirectory = null,
maximumHeapSize = maximumHeapSize
)
}
private fun getCallerPackage(): String {
return Exception()
.stackTrace
.first { it.fileName != "Driver.kt" }
.let { Class.forName(it.className).`package`?.name }
?: throw IllegalStateException("Function instantiating driver must be defined in a package.")
}
/**
* We have an alternative way of specifying classpath for spawned process: by using "-cp" option. So duplicating the setting of this
* rather long string is un-necessary and can be harmful on Windows.
*/
private fun Map<String, Any>.removeResolvedClasspath(): Map<String, Any> {
return filterNot { it.key == "java.class.path" }
}
}
}

View File

@ -52,7 +52,7 @@ import java.nio.file.Path
import java.nio.file.Paths import java.nio.file.Paths
import java.util.* import java.util.*
interface RPCDriverExposedDSLInterface : DriverDSLExposedInterface { interface RPCDriverExposedDSLInterface : DriverDSLInternalInterface {
/** /**
* Starts an In-VM RPC server. Note that only a single one may be started. * Starts an In-VM RPC server. Note that only a single one may be started.
* *
@ -239,7 +239,7 @@ fun <A> rpcDriver(
dsl: RPCDriverExposedDSLInterface.() -> A dsl: RPCDriverExposedDSLInterface.() -> A
) = genericDriver( ) = genericDriver(
driverDsl = RPCDriverDSL( driverDsl = RPCDriverDSL(
DriverDSL( DriverDSLImpl(
portAllocation = portAllocation, portAllocation = portAllocation,
debugPortAllocation = debugPortAllocation, debugPortAllocation = debugPortAllocation,
systemProperties = systemProperties, systemProperties = systemProperties,
@ -279,7 +279,7 @@ private class SingleUserSecurityManager(val rpcUser: User) : ActiveMQSecurityMan
} }
data class RPCDriverDSL( data class RPCDriverDSL(
private val driverDSL: DriverDSL, private val externalTrace: Trace? private val driverDSL: DriverDSLImpl, private val externalTrace: Trace?
) : DriverDSLInternalInterface by driverDSL, RPCDriverInternalDSLInterface { ) : DriverDSLInternalInterface by driverDSL, RPCDriverInternalDSLInterface {
private companion object { private companion object {
val notificationAddress = "notifications" val notificationAddress = "notifications"

View File

@ -23,6 +23,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.internal.config.NodeSSLConfiguration import net.corda.nodeapi.internal.config.NodeSSLConfiguration
import net.corda.nodeapi.internal.config.SSLConfiguration import net.corda.nodeapi.internal.config.SSLConfiguration
import net.corda.testing.driver.* import net.corda.testing.driver.*
import net.corda.testing.internal.DriverDSLImpl
import net.corda.testing.internal.ProcessUtilities import net.corda.testing.internal.ProcessUtilities
import net.corda.testing.node.NotarySpec import net.corda.testing.node.NotarySpec
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
@ -44,10 +45,10 @@ import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
/** /**
* This file defines an extension to [DriverDSL] that allows starting of verifier processes and * This file defines an extension to [DriverDSLImpl] that allows starting of verifier processes and
* lightweight verification requestors. * lightweight verification requestors.
*/ */
interface VerifierExposedDSLInterface : DriverDSLExposedInterface { interface VerifierExposedDSLInterface : DriverDSL {
/** Starts a lightweight verification requestor that implements the Node's Verifier API */ /** Starts a lightweight verification requestor that implements the Node's Verifier API */
fun startVerificationRequestor(name: CordaX500Name): CordaFuture<VerificationRequestorHandle> fun startVerificationRequestor(name: CordaX500Name): CordaFuture<VerificationRequestorHandle>
@ -88,7 +89,7 @@ fun <A> verifierDriver(
dsl: VerifierExposedDSLInterface.() -> A dsl: VerifierExposedDSLInterface.() -> A
) = genericDriver( ) = genericDriver(
driverDsl = VerifierDriverDSL( driverDsl = VerifierDriverDSL(
DriverDSL( DriverDSLImpl(
portAllocation = portAllocation, portAllocation = portAllocation,
debugPortAllocation = debugPortAllocation, debugPortAllocation = debugPortAllocation,
systemProperties = systemProperties, systemProperties = systemProperties,
@ -145,7 +146,7 @@ data class VerificationRequestorHandle(
data class VerifierDriverDSL( data class VerifierDriverDSL(
val driverDSL: DriverDSL val driverDSL: DriverDSLImpl
) : DriverDSLInternalInterface by driverDSL, VerifierInternalDSLInterface { ) : DriverDSLInternalInterface by driverDSL, VerifierInternalDSLInterface {
val verifierCount = AtomicInteger(0) val verifierCount = AtomicInteger(0)