Cleaned up Driver.kt so that only the relevant bits are exposed as public API

This commit is contained in:
Shams Asari 2017-12-02 11:19:18 +00:00
parent b0ebf3d7e0
commit 8461837f1a
27 changed files with 1138 additions and 1166 deletions

View File

@ -13,7 +13,7 @@ import net.corda.core.utilities.*
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi
import net.corda.testing.SerializationEnvironmentRule
import net.corda.testing.driver.poll
import net.corda.testing.internal.poll
import net.corda.testing.internal.*
import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.After
@ -231,6 +231,7 @@ class RPCStabilityTests {
override val protocolVersion = 0
override fun ping() = "pong"
}
val serverFollower = shutdownManager.follower()
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow()
@ -348,7 +349,7 @@ class RPCStabilityTests {
}
fun RPCDriverExposedDSLInterface.pollUntilClientNumber(server: RpcServerHandle, expected: Int) {
fun RPCDriverDSL.pollUntilClientNumber(server: RpcServerHandle, expected: Int) {
pollUntilTrue("number of RPC clients to become $expected") {
val clientAddresses = server.broker.serverControl.addressNames.filter { it.startsWith(RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX) }
clientAddresses.size == expected

View File

@ -7,7 +7,7 @@ import net.corda.core.messaging.RPCOps
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.internal.config.User
import net.corda.testing.SerializationEnvironmentRule
import net.corda.testing.internal.RPCDriverExposedDSLInterface
import net.corda.testing.internal.RPCDriverDSL
import net.corda.testing.internal.rpcTestUser
import net.corda.testing.internal.startInVmRpcClient
import net.corda.testing.internal.startRpcClient
@ -41,7 +41,7 @@ open class AbstractRPCTest {
val createSession: () -> ClientSession
)
inline fun <reified I : RPCOps> RPCDriverExposedDSLInterface.testProxy(
inline fun <reified I : RPCOps> RPCDriverDSL.testProxy(
ops: I,
rpcUser: User = rpcTestUser,
clientConfiguration: RPCClientConfiguration = RPCClientConfiguration.default,
@ -55,9 +55,9 @@ open class AbstractRPCTest {
}
}
RPCTestMode.Netty ->
startRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { server ->
startRpcClient<I>(server.broker.hostAndPort!!, rpcUser.username, rpcUser.password, clientConfiguration).map {
TestProxy(it, { startArtemisSession(server.broker.hostAndPort!!, rpcUser.username, rpcUser.password) })
startRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { (broker) ->
startRpcClient<I>(broker.hostAndPort!!, rpcUser.username, rpcUser.password, clientConfiguration).map {
TestProxy(it, { startArtemisSession(broker.hostAndPort!!, rpcUser.username, rpcUser.password) })
}
}
}.get()

View File

@ -7,7 +7,7 @@ import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.messaging.rpcContext
import net.corda.testing.internal.RPCDriverExposedDSLInterface
import net.corda.testing.internal.RPCDriverDSL
import net.corda.testing.internal.rpcDriver
import net.corda.testing.internal.rpcTestUser
import org.assertj.core.api.Assertions.assertThat
@ -26,7 +26,7 @@ import kotlin.test.assertTrue
class ClientRPCInfrastructureTests : AbstractRPCTest() {
// TODO: Test that timeouts work
private fun RPCDriverExposedDSLInterface.testProxy(): TestOps {
private fun RPCDriverDSL.testProxy(): TestOps {
return testProxy<TestOps>(TestOpsImpl()).ops
}

View File

@ -1,15 +1,15 @@
package net.corda.client.rpc
import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.millis
import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.millis
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.testing.internal.RPCDriverExposedDSLInterface
import net.corda.testing.internal.RPCDriverDSL
import net.corda.testing.internal.rpcDriver
import net.corda.testing.internal.testThreadFactory
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet
@ -20,7 +20,10 @@ import org.junit.runners.Parameterized
import rx.Observable
import rx.subjects.UnicastSubject
import java.util.*
import java.util.concurrent.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executor
import java.util.concurrent.Executors
@RunWith(Parameterized::class)
class RPCConcurrencyTests : AbstractRPCTest() {
@ -84,7 +87,7 @@ class RPCConcurrencyTests : AbstractRPCTest() {
}
}
private fun RPCDriverExposedDSLInterface.testProxy(): TestProxy<TestOps> {
private fun RPCDriverDSL.testProxy(): TestProxy<TestOps> {
return testProxy<TestOps>(
TestOpsImpl(pool),
clientConfiguration = RPCClientConfiguration.default.copy(

View File

@ -5,14 +5,14 @@ import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.testing.internal.performance.div
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.testing.internal.RPCDriverExposedDSLInterface
import net.corda.testing.measure
import net.corda.testing.internal.RPCDriverDSL
import net.corda.testing.internal.performance.div
import net.corda.testing.internal.performance.startPublishingFixedRateInjector
import net.corda.testing.internal.performance.startReporter
import net.corda.testing.internal.performance.startTightLoopInjector
import net.corda.testing.internal.rpcDriver
import net.corda.testing.measure
import org.junit.Ignore
import org.junit.Test
import org.junit.runner.RunWith
@ -42,7 +42,7 @@ class RPCPerformanceTests : AbstractRPCTest() {
}
}
private fun RPCDriverExposedDSLInterface.testProxy(
private fun RPCDriverDSL.testProxy(
clientConfiguration: RPCClientConfiguration,
serverConfiguration: RPCServerConfiguration
): TestProxy<TestOps> {

View File

@ -5,7 +5,7 @@ import net.corda.core.messaging.RPCOps
import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.node.services.messaging.rpcContext
import net.corda.nodeapi.internal.config.User
import net.corda.testing.internal.RPCDriverExposedDSLInterface
import net.corda.testing.internal.RPCDriverDSL
import net.corda.testing.internal.rpcDriver
import org.junit.Test
import org.junit.runner.RunWith
@ -37,7 +37,7 @@ class RPCPermissionsTests : AbstractRPCTest() {
/**
* Create an RPC proxy for the given user.
*/
private fun RPCDriverExposedDSLInterface.testProxyFor(rpcUser: User) = testProxy<TestOps>(TestOpsImpl(), rpcUser).ops
private fun RPCDriverDSL.testProxyFor(rpcUser: User) = testProxy<TestOps>(TestOpsImpl(), rpcUser).ops
private fun userOf(name: String, permissions: Set<String>) = User(name, "password", permissions)

View File

@ -20,14 +20,17 @@ import net.corda.node.internal.SecureCordaRPCOps
import net.corda.node.internal.StartedNode
import net.corda.node.services.Permissions.Companion.startFlow
import net.corda.nodeapi.internal.config.User
import net.corda.testing.*
import net.corda.testing.ALICE_NAME
import net.corda.testing.BOB_NAME
import net.corda.testing.contracts.DummyContract
import net.corda.testing.contracts.DummyContractV2
import net.corda.testing.internal.RPCDriverExposedDSLInterface
import net.corda.testing.internal.RPCDriverDSL
import net.corda.testing.internal.rpcDriver
import net.corda.testing.internal.rpcTestUser
import net.corda.testing.internal.startRpcClient
import net.corda.testing.node.MockNetwork
import net.corda.testing.singleIdentity
import net.corda.testing.startFlow
import org.junit.After
import org.junit.Before
import org.junit.Test
@ -120,7 +123,7 @@ class ContractUpgradeFlowTest {
check(bobNode)
}
private fun RPCDriverExposedDSLInterface.startProxy(node: StartedNode<*>, user: User): CordaRPCOps {
private fun RPCDriverDSL.startProxy(node: StartedNode<*>, user: User): CordaRPCOps {
return startRpcClient<CordaRPCOps>(
rpcAddress = startRpcServer(
rpcUser = user,

View File

@ -17,11 +17,12 @@ import net.corda.nodeapi.internal.config.User
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.driver
import net.corda.testing.node.NotarySpec
import net.corda.testing.internal.InternalDriverDSL
import net.corda.testing.internal.performance.div
import net.corda.testing.internal.performance.startPublishingFixedRateInjector
import net.corda.testing.internal.performance.startReporter
import net.corda.testing.internal.performance.startTightLoopInjector
import net.corda.testing.node.NotarySpec
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
@ -91,7 +92,7 @@ class NodePerformanceTests {
driver(startNodesInProcess = true) {
val a = startNode(rpcUsers = listOf(User("A", "A", setOf(startFlow<EmptyFlow>())))).get()
a as NodeHandle.InProcess
val metricRegistry = startReporter(shutdownManager, a.node.services.monitoringService.metrics)
val metricRegistry = startReporter((this as InternalDriverDSL).shutdownManager, a.node.services.monitoringService.metrics)
a.rpcClientToNode().use("A", "A") { connection ->
startPublishingFixedRateInjector(metricRegistry, 8, 5.minutes, 2000L / TimeUnit.SECONDS) {
connection.proxy.startFlow(::EmptyFlow).returnValue.get()
@ -109,7 +110,7 @@ class NodePerformanceTests {
extraCordappPackagesToScan = listOf("net.corda.finance")
) {
val notary = defaultNotaryNode.getOrThrow() as NodeHandle.InProcess
val metricRegistry = startReporter(shutdownManager, notary.node.services.monitoringService.metrics)
val metricRegistry = startReporter((this as InternalDriverDSL).shutdownManager, notary.node.services.monitoringService.metrics)
notary.rpcClientToNode().use("A", "A") { connection ->
println("ISSUING")
val doneFutures = (1..100).toList().parallelStream().map {

View File

@ -20,13 +20,14 @@ import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.internal.cordapp.CordappProviderImpl
import net.corda.testing.*
import net.corda.testing.DUMMY_BANK_A
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.driver.DriverDSLExposedInterface
import net.corda.testing.driver.DriverDSL
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.driver
import net.corda.testing.node.MockAttachmentStorage
import net.corda.testing.rigorousMock
import net.corda.testing.withTestSerialization
import org.junit.Assert.assertEquals
import org.junit.Test
import java.net.URLClassLoader
@ -51,16 +52,16 @@ class AttachmentLoadingTests {
Class.forName("net.corda.finance.contracts.isolated.IsolatedDummyFlow\$Initiator", true, URLClassLoader(arrayOf(isolatedJAR)))
.asSubclass(FlowLogic::class.java)
private fun DriverDSLExposedInterface.createTwoNodes(): List<NodeHandle> {
private fun DriverDSL.createTwoNodes(): List<NodeHandle> {
return listOf(
startNode(providedName = bankAName),
startNode(providedName = bankBName)
).transpose().getOrThrow()
}
private fun DriverDSLExposedInterface.installIsolatedCordappTo(nodeName: CordaX500Name) {
private fun DriverDSL.installIsolatedCordappTo(nodeName: CordaX500Name) {
// Copy the app jar to the first node. The second won't have it.
val path = (baseDirectory(nodeName.toString()) / "cordapps").createDirectories() / "isolated.jar"
val path = (baseDirectory(nodeName) / "cordapps").createDirectories() / "isolated.jar"
logger.info("Installing isolated jar to $path")
isolatedJAR.openStream().buffered().use { input ->
Files.newOutputStream(path).buffered().use { output ->

View File

@ -18,7 +18,7 @@ import net.corda.node.services.messaging.*
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.testing.ALICE
import net.corda.testing.chooseIdentity
import net.corda.testing.driver.DriverDSLExposedInterface
import net.corda.testing.driver.DriverDSL
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.driver
import net.corda.testing.node.ClusterSpec
@ -116,13 +116,13 @@ class P2PMessagingTest {
}
}
private fun startDriverWithDistributedService(dsl: DriverDSLExposedInterface.(List<StartedNode<Node>>) -> Unit) {
private fun startDriverWithDistributedService(dsl: DriverDSL.(List<StartedNode<Node>>) -> Unit) {
driver(startNodesInProcess = true, notarySpecs = listOf(NotarySpec(DISTRIBUTED_SERVICE_NAME, cluster = ClusterSpec.Raft(clusterSize = 2)))) {
dsl(defaultNotaryHandle.nodeHandles.getOrThrow().map { (it as NodeHandle.InProcess).node })
}
}
private fun DriverDSLExposedInterface.startAlice(): StartedNode<Node> {
private fun DriverDSL.startAlice(): StartedNode<Node> {
return startNode(providedName = ALICE.name, customOverrides = mapOf("messageRedeliveryDelaySeconds" to 1))
.map { (it as NodeHandle.InProcess).node }
.getOrThrow()

View File

@ -25,7 +25,7 @@ import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow
import net.corda.testing.DUMMY_BANK_B
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.driver.poll
import net.corda.testing.internal.poll
import java.io.InputStream
import java.net.HttpURLConnection
import java.net.URL

View File

@ -3,8 +3,11 @@ package net.corda.test.spring
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.concurrent.map
import net.corda.core.utilities.contextLogger
import net.corda.testing.driver.*
import net.corda.testing.internal.ProcessUtilities
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.PortAllocation
import net.corda.testing.driver.WebserverHandle
import net.corda.testing.internal.*
import net.corda.testing.node.NotarySpec
import okhttp3.OkHttpClient
import okhttp3.Request
@ -14,22 +17,6 @@ import java.nio.file.Path
import java.nio.file.Paths
import java.util.concurrent.TimeUnit
interface SpringDriverExposedDSLInterface : DriverDSLExposedInterface {
/**
* Starts a Spring Boot application, passes the RPC connection data as parameters the process.
* Returns future which will complete after (and if) the server passes healthcheck.
* @param clazz Class with main method which is expected to run Spring application
* @param handle Corda Node handle this webapp is expected to connect to
* @param checkUrl URL path to use for server readiness check - uses [okhttp3.Response.isSuccessful] as qualifier
*
* TODO: Rather then expecting a given clazz to contain main method which start Spring app our own simple class can do this
*/
fun startSpringBootWebapp(clazz: Class<*>, handle: NodeHandle, checkUrl: String): CordaFuture<WebserverHandle>
}
interface SpringDriverInternalDSLInterface : DriverDSLInternalInterface, SpringDriverExposedDSLInterface
fun <A> springDriver(
defaultParameters: DriverParameters = DriverParameters(),
isDebug: Boolean = defaultParameters.isDebug,
@ -42,29 +29,40 @@ fun <A> springDriver(
startNodesInProcess: Boolean = defaultParameters.startNodesInProcess,
notarySpecs: List<NotarySpec>,
extraCordappPackagesToScan: List<String> = defaultParameters.extraCordappPackagesToScan,
dsl: SpringDriverExposedDSLInterface.() -> A
) = genericDriver(
defaultParameters = defaultParameters,
isDebug = isDebug,
driverDirectory = driverDirectory,
portAllocation = portAllocation,
debugPortAllocation = debugPortAllocation,
systemProperties = systemProperties,
useTestClock = useTestClock,
initialiseSerialization = initialiseSerialization,
startNodesInProcess = startNodesInProcess,
extraCordappPackagesToScan = extraCordappPackagesToScan,
notarySpecs = notarySpecs,
driverDslWrapper = { driverDSL:DriverDSL -> SpringBootDriverDSL(driverDSL) },
coerce = { it }, dsl = dsl
)
dsl: SpringBootDriverDSL.() -> A
): A {
return genericDriver(
defaultParameters = defaultParameters,
isDebug = isDebug,
driverDirectory = driverDirectory,
portAllocation = portAllocation,
debugPortAllocation = debugPortAllocation,
systemProperties = systemProperties,
useTestClock = useTestClock,
initialiseSerialization = initialiseSerialization,
startNodesInProcess = startNodesInProcess,
extraCordappPackagesToScan = extraCordappPackagesToScan,
notarySpecs = notarySpecs,
driverDslWrapper = { driverDSL: DriverDSLImpl -> SpringBootDriverDSL(driverDSL) },
coerce = { it }, dsl = dsl
)
}
data class SpringBootDriverDSL(private val driverDSL: DriverDSL) : DriverDSLInternalInterface by driverDSL, SpringDriverInternalDSLInterface {
data class SpringBootDriverDSL(private val driverDSL: DriverDSLImpl) : InternalDriverDSL by driverDSL {
companion object {
private val log = contextLogger()
}
override fun startSpringBootWebapp(clazz: Class<*>, handle: NodeHandle, checkUrl: String): CordaFuture<WebserverHandle> {
/**
* Starts a Spring Boot application, passes the RPC connection data as parameters the process.
* Returns future which will complete after (and if) the server passes healthcheck.
* @param clazz Class with main method which is expected to run Spring application
* @param handle Corda Node handle this webapp is expected to connect to
* @param checkUrl URL path to use for server readiness check - uses [okhttp3.Response.isSuccessful] as qualifier
*
* TODO: Rather then expecting a given clazz to contain main method which start Spring app our own simple class can do this
*/
fun startSpringBootWebapp(clazz: Class<*>, handle: NodeHandle, checkUrl: String): CordaFuture<WebserverHandle> {
val debugPort = if (driverDSL.isDebug) driverDSL.debugPortAllocation.nextPort() else null
val process = startApplication(handle, debugPort, clazz)
driverDSL.shutdownManager.registerProcessShutdown(process)

View File

@ -15,7 +15,7 @@ import net.corda.testing.DUMMY_BANK_B
import net.corda.testing.chooseIdentity
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.driver
import net.corda.testing.driver.poll
import net.corda.testing.internal.poll
import net.corda.traderdemo.flow.BuyerFlow
import net.corda.traderdemo.flow.CommercialPaperIssueFlow
import net.corda.traderdemo.flow.SellerFlow

View File

@ -10,6 +10,8 @@ import net.corda.testing.DUMMY_BANK_A
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.DUMMY_REGULATOR
import net.corda.testing.common.internal.ProjectStructure.projectRootDir
import net.corda.testing.internal.addressMustBeBound
import net.corda.testing.internal.addressMustNotBeBound
import net.corda.testing.node.NotarySpec
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
@ -74,7 +76,7 @@ class DriverTests {
}
val baseDirectory = driver(notarySpecs = listOf(NotarySpec(DUMMY_NOTARY.name))) {
(this as DriverDSL).baseDirectory(DUMMY_NOTARY.name)
baseDirectory(DUMMY_NOTARY.name)
}
assertThat(baseDirectory / "process-id").doesNotExist()
}

View File

@ -2,202 +2,35 @@
package net.corda.testing.driver
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.typesafe.config.Config
import com.typesafe.config.ConfigRenderOptions
import net.corda.client.rpc.CordaRPCClient
import net.corda.cordform.CordformContext
import net.corda.cordform.CordformNode
import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.concurrent.firstOf
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.*
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NotaryService
import net.corda.core.toFuture
import net.corda.core.utilities.*
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.Node
import net.corda.node.internal.NodeStartup
import net.corda.node.internal.StartedNode
import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.node.services.config.*
import net.corda.node.utilities.ServiceIdentityGenerator
import net.corda.nodeapi.NodeInfoFilesCopier
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.VerifierType
import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.config.toConfig
import net.corda.nodeapi.internal.addShutdownHook
import net.corda.testing.*
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.internal.InProcessNode
import net.corda.testing.internal.ProcessUtilities
import net.corda.testing.node.ClusterSpec
import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO
import net.corda.testing.internal.DriverDSLImpl
import net.corda.testing.internal.genericDriver
import net.corda.testing.internal.getTimestampAsDirectoryName
import net.corda.testing.node.NotarySpec
import okhttp3.OkHttpClient
import okhttp3.Request
import org.slf4j.Logger
import rx.Observable
import rx.observables.ConnectableObservable
import java.net.*
import java.net.InetSocketAddress
import java.net.ServerSocket
import java.nio.file.Path
import java.nio.file.Paths
import java.nio.file.StandardCopyOption.REPLACE_EXISTING
import java.time.Duration
import java.time.Instant
import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
/**
* This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests.
*
* The process the driver is run in behaves as an Artemis client and starts up other processes.
*
* TODO this file is getting way too big, it should be split into several files.
*/
private val log: Logger = loggerFor<DriverDSL>()
private val DEFAULT_POLL_INTERVAL = 500.millis
private const val DEFAULT_WARN_COUNT = 120
/**
* A sub-set of permissions that grant most of the essential operations used in the unit/integration tests as well as
* in demo application like NodeExplorer.
*/
private val DRIVER_REQUIRED_PERMISSIONS = setOf(
invokeRpc(CordaRPCOps::nodeInfo),
invokeRpc(CordaRPCOps::networkMapFeed),
invokeRpc(CordaRPCOps::networkMapSnapshot),
invokeRpc(CordaRPCOps::notaryIdentities),
invokeRpc(CordaRPCOps::stateMachinesFeed),
invokeRpc(CordaRPCOps::stateMachineRecordedTransactionMappingFeed),
invokeRpc(CordaRPCOps::nodeInfoFromParty),
invokeRpc(CordaRPCOps::internalVerifiedTransactionsFeed),
invokeRpc("vaultQueryBy"),
invokeRpc("vaultTrackBy"),
invokeRpc(CordaRPCOps::registeredFlows)
)
/**
* Object ecapsulating a notary started automatically by the driver.
*/
data class NotaryHandle(val identity: Party, val validating: Boolean, val nodeHandles: CordaFuture<List<NodeHandle>>)
/**
* This is the interface that's exposed to DSL users.
*/
interface DriverDSLExposedInterface : CordformContext {
/** Returns a list of [NotaryHandle]s matching the list of [NotarySpec]s passed into [driver]. */
val notaryHandles: List<NotaryHandle>
/**
* Returns the [NotaryHandle] for the single notary on the network. Throws if there are none or more than one.
* @see notaryHandles
*/
val defaultNotaryHandle: NotaryHandle get() {
return when (notaryHandles.size) {
0 -> throw IllegalStateException("There are no notaries defined on the network")
1 -> notaryHandles[0]
else -> throw IllegalStateException("There is more than one notary defined on the network")
}
}
/**
* Returns the identity of the single notary on the network. Throws if there are none or more than one.
* @see defaultNotaryHandle
*/
val defaultNotaryIdentity: Party get() = defaultNotaryHandle.identity
/**
* Returns a [CordaFuture] on the [NodeHandle] for the single-node notary on the network. Throws if there
* are no notaries or more than one, or if the notary is a distributed cluster.
* @see defaultNotaryHandle
* @see notaryHandles
*/
val defaultNotaryNode: CordaFuture<NodeHandle> get() {
return defaultNotaryHandle.nodeHandles.map {
it.singleOrNull() ?: throw IllegalStateException("Default notary is not a single node")
}
}
/**
* Start a node.
*
* @param defaultParameters The default parameters for the node. Allows the node to be configured in builder style
* when called from Java code.
* @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something
* random. Note that this must be unique as the driver uses it as a primary key!
* @param verifierType The type of transaction verifier to use. See: [VerifierType]
* @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list.
* @param startInSameProcess Determines if the node should be started inside the same process the Driver is running
* in. If null the Driver-level value will be used.
* @return A [CordaFuture] on the [NodeHandle] to the node. The future will complete when the node is available.
*/
fun startNode(
defaultParameters: NodeParameters = NodeParameters(),
providedName: CordaX500Name? = defaultParameters.providedName,
rpcUsers: List<User> = defaultParameters.rpcUsers,
verifierType: VerifierType = defaultParameters.verifierType,
customOverrides: Map<String, Any?> = defaultParameters.customOverrides,
startInSameProcess: Boolean? = defaultParameters.startInSameProcess,
maximumHeapSize: String = defaultParameters.maximumHeapSize): CordaFuture<NodeHandle>
/**
* Helper function for starting a [Node] with custom parameters from Java.
*
* @param parameters The default parameters for the driver.
* @return [NodeHandle] that will be available sometime in the future.
*/
fun startNode(parameters: NodeParameters): CordaFuture<NodeHandle> = startNode(defaultParameters = parameters)
/** Call [startWebserver] with a default maximumHeapSize. */
fun startWebserver(handle: NodeHandle): CordaFuture<WebserverHandle> = startWebserver(handle, "200m")
/**
* Starts a web server for a node
* @param handle The handle for the node that this webserver connects to via RPC.
* @param maximumHeapSize Argument for JVM -Xmx option e.g. "200m".
*/
fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture<WebserverHandle>
/**
* Polls a function until it returns a non-null value. Note that there is no timeout on the polling.
*
* @param pollName A description of what is being polled.
* @param pollInterval The interval of polling.
* @param warnCount The number of polls after the Driver gives a warning.
* @param check The function being polled.
* @return A future that completes with the non-null value [check] has returned.
*/
fun <A> pollUntilNonNull(pollName: String, pollInterval: Duration = DEFAULT_POLL_INTERVAL, warnCount: Int = DEFAULT_WARN_COUNT, check: () -> A?): CordaFuture<A>
/**
* Polls the given function until it returns true.
* @see pollUntilNonNull
*/
fun pollUntilTrue(pollName: String, pollInterval: Duration = DEFAULT_POLL_INTERVAL, warnCount: Int = DEFAULT_WARN_COUNT, check: () -> Boolean): CordaFuture<Unit> {
return pollUntilNonNull(pollName, pollInterval, warnCount) { if (check()) Unit else null }
}
val shutdownManager: ShutdownManager
}
interface DriverDSLInternalInterface : DriverDSLExposedInterface {
fun start()
fun shutdown()
}
sealed class NodeHandle {
abstract val nodeInfo: NodeInfo
/**
@ -319,9 +152,9 @@ data class NodeParameters(
* @param systemProperties A Map of extra system properties which will be given to each new node. Defaults to empty.
* @param useTestClock If true the test clock will be used in Node.
* @param startNodesInProcess Provides the default behaviour of whether new nodes should start inside this process or
* not. Note that this may be overridden in [DriverDSLExposedInterface.startNode].
* not. Note that this may be overridden in [DriverDSL.startNode].
* @param notarySpecs The notaries advertised for this network. These nodes will be started automatically and will be
* available from [DriverDSLExposedInterface.notaryHandles]. Defaults to a simple validating notary.
* available from [DriverDSL.notaryHandles]. Defaults to a simple validating notary.
* @param dsl The dsl itself.
* @return The value returned in the [dsl] closure.
*/
@ -335,13 +168,13 @@ fun <A> driver(
useTestClock: Boolean = defaultParameters.useTestClock,
initialiseSerialization: Boolean = defaultParameters.initialiseSerialization,
startNodesInProcess: Boolean = defaultParameters.startNodesInProcess,
waitForAllNodesToFinish: Boolean = defaultParameters.waitForNodesToFinish,
waitForAllNodesToFinish: Boolean = defaultParameters.waitForAllNodesToFinish,
notarySpecs: List<NotarySpec> = defaultParameters.notarySpecs,
extraCordappPackagesToScan: List<String> = defaultParameters.extraCordappPackagesToScan,
dsl: DriverDSLExposedInterface.() -> A
dsl: DriverDSL.() -> A
): A {
return genericDriver(
driverDsl = DriverDSL(
driverDsl = DriverDSLImpl(
portAllocation = portAllocation,
debugPortAllocation = debugPortAllocation,
systemProperties = systemProperties,
@ -368,7 +201,7 @@ fun <A> driver(
*/
fun <A> driver(
parameters: DriverParameters,
dsl: DriverDSLExposedInterface.() -> A
dsl: DriverDSL.() -> A
): A {
return driver(defaultParameters = parameters, dsl = dsl)
}
@ -384,7 +217,7 @@ data class DriverParameters(
val useTestClock: Boolean = false,
val initialiseSerialization: Boolean = true,
val startNodesInProcess: Boolean = false,
val waitForNodesToFinish: Boolean = false,
val waitForAllNodesToFinish: Boolean = false,
val notarySpecs: List<NotarySpec> = listOf(NotarySpec(DUMMY_NOTARY.name)),
val extraCordappPackagesToScan: List<String> = emptyList()
) {
@ -396,643 +229,7 @@ data class DriverParameters(
fun setUseTestClock(useTestClock: Boolean) = copy(useTestClock = useTestClock)
fun setInitialiseSerialization(initialiseSerialization: Boolean) = copy(initialiseSerialization = initialiseSerialization)
fun setStartNodesInProcess(startNodesInProcess: Boolean) = copy(startNodesInProcess = startNodesInProcess)
fun setTerminateNodesOnShutdown(terminateNodesOnShutdown: Boolean) = copy(waitForNodesToFinish = terminateNodesOnShutdown)
fun setWaitForAllNodesToFinish(waitForAllNodesToFinish: Boolean) = copy(waitForAllNodesToFinish = waitForAllNodesToFinish)
fun setExtraCordappPackagesToScan(extraCordappPackagesToScan: List<String>) = copy(extraCordappPackagesToScan = extraCordappPackagesToScan)
fun setNotarySpecs(notarySpecs: List<NotarySpec>) = copy(notarySpecs = notarySpecs)
}
/**
* This is a helper method to allow extending of the DSL, along the lines of
* interface SomeOtherExposedDSLInterface : DriverDSLExposedInterface
* interface SomeOtherInternalDSLInterface : DriverDSLInternalInterface, SomeOtherExposedDSLInterface
* class SomeOtherDSL(val driverDSL : DriverDSL) : DriverDSLInternalInterface by driverDSL, SomeOtherInternalDSLInterface
*
* @param coerce We need this explicit coercion witness because we can't put an extra DI : D bound in a `where` clause.
*/
fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericDriver(
driverDsl: D,
initialiseSerialization: Boolean = true,
coerce: (D) -> DI,
dsl: DI.() -> A
): A {
val serializationEnv = setGlobalSerialization(initialiseSerialization)
val shutdownHook = addShutdownHook(driverDsl::shutdown)
try {
driverDsl.start()
return dsl(coerce(driverDsl))
} catch (exception: Throwable) {
log.error("Driver shutting down because of exception", exception)
throw exception
} finally {
driverDsl.shutdown()
shutdownHook.cancel()
serializationEnv.unset()
}
}
/**
* This is a helper method to allow extending of the DSL, along the lines of
* interface SomeOtherExposedDSLInterface : DriverDSLExposedInterface
* interface SomeOtherInternalDSLInterface : DriverDSLInternalInterface, SomeOtherExposedDSLInterface
* class SomeOtherDSL(val driverDSL : DriverDSL) : DriverDSLInternalInterface by driverDSL, SomeOtherInternalDSLInterface
*
* @param coerce We need this explicit coercion witness because we can't put an extra DI : D bound in a `where` clause.
*/
fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericDriver(
defaultParameters: DriverParameters = DriverParameters(),
isDebug: Boolean = defaultParameters.isDebug,
driverDirectory: Path = defaultParameters.driverDirectory,
portAllocation: PortAllocation = defaultParameters.portAllocation,
debugPortAllocation: PortAllocation = defaultParameters.debugPortAllocation,
systemProperties: Map<String, String> = defaultParameters.systemProperties,
useTestClock: Boolean = defaultParameters.useTestClock,
initialiseSerialization: Boolean = defaultParameters.initialiseSerialization,
waitForNodesToFinish: Boolean = defaultParameters.waitForNodesToFinish,
startNodesInProcess: Boolean = defaultParameters.startNodesInProcess,
notarySpecs: List<NotarySpec>,
extraCordappPackagesToScan: List<String> = defaultParameters.extraCordappPackagesToScan,
driverDslWrapper: (DriverDSL) -> D,
coerce: (D) -> DI, dsl: DI.() -> A
): A {
val serializationEnv = setGlobalSerialization(initialiseSerialization)
val driverDsl = driverDslWrapper(
DriverDSL(
portAllocation = portAllocation,
debugPortAllocation = debugPortAllocation,
systemProperties = systemProperties,
driverDirectory = driverDirectory.toAbsolutePath(),
useTestClock = useTestClock,
isDebug = isDebug,
startNodesInProcess = startNodesInProcess,
waitForNodesToFinish = waitForNodesToFinish,
extraCordappPackagesToScan = extraCordappPackagesToScan,
notarySpecs = notarySpecs
)
)
val shutdownHook = addShutdownHook(driverDsl::shutdown)
try {
driverDsl.start()
return dsl(coerce(driverDsl))
} catch (exception: Throwable) {
log.error("Driver shutting down because of exception", exception)
throw exception
} finally {
driverDsl.shutdown()
shutdownHook.cancel()
serializationEnv.unset()
}
}
fun getTimestampAsDirectoryName(): String {
return DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(UTC).format(Instant.now())
}
class ListenProcessDeathException(hostAndPort: NetworkHostAndPort, listenProcess: Process) :
CordaException("The process that was expected to listen on $hostAndPort has died with status: ${listenProcess.exitValue()}")
/**
* @throws ListenProcessDeathException if [listenProcess] dies before the check succeeds, i.e. the check can't succeed as intended.
*/
fun addressMustBeBound(executorService: ScheduledExecutorService, hostAndPort: NetworkHostAndPort, listenProcess: Process? = null) {
addressMustBeBoundFuture(executorService, hostAndPort, listenProcess).getOrThrow()
}
fun addressMustBeBoundFuture(executorService: ScheduledExecutorService, hostAndPort: NetworkHostAndPort, listenProcess: Process? = null): CordaFuture<Unit> {
return poll(executorService, "address $hostAndPort to bind") {
if (listenProcess != null && !listenProcess.isAlive) {
throw ListenProcessDeathException(hostAndPort, listenProcess)
}
try {
Socket(hostAndPort.host, hostAndPort.port).close()
Unit
} catch (_exception: SocketException) {
null
}
}
}
/*
* The default timeout value of 40 seconds have been chosen based on previous node shutdown time estimate.
* It's been observed that nodes can take up to 30 seconds to shut down, so just to stay on the safe side the 60 seconds
* timeout has been chosen.
*/
fun addressMustNotBeBound(executorService: ScheduledExecutorService, hostAndPort: NetworkHostAndPort, timeout: Duration = 40.seconds) {
addressMustNotBeBoundFuture(executorService, hostAndPort).getOrThrow(timeout)
}
fun addressMustNotBeBoundFuture(executorService: ScheduledExecutorService, hostAndPort: NetworkHostAndPort): CordaFuture<Unit> {
return poll(executorService, "address $hostAndPort to unbind") {
try {
Socket(hostAndPort.host, hostAndPort.port).close()
null
} catch (_exception: SocketException) {
Unit
}
}
}
fun <A> poll(
executorService: ScheduledExecutorService,
pollName: String,
pollInterval: Duration = 500.millis,
warnCount: Int = 120,
check: () -> A?
): CordaFuture<A> {
val resultFuture = openFuture<A>()
val task = object : Runnable {
var counter = -1
override fun run() {
if (resultFuture.isCancelled) return // Give up, caller can no longer get the result.
if (++counter == warnCount) {
log.warn("Been polling $pollName for ${(pollInterval * warnCount.toLong()).seconds} seconds...")
}
try {
val checkResult = check()
if (checkResult != null) {
resultFuture.set(checkResult)
} else {
executorService.schedule(this, pollInterval.toMillis(), MILLISECONDS)
}
} catch (t: Throwable) {
resultFuture.setException(t)
}
}
}
executorService.submit(task) // The check may be expensive, so always run it in the background even the first time.
return resultFuture
}
class DriverDSL(
val portAllocation: PortAllocation,
val debugPortAllocation: PortAllocation,
val systemProperties: Map<String, String>,
val driverDirectory: Path,
val useTestClock: Boolean,
val isDebug: Boolean,
val startNodesInProcess: Boolean,
val waitForNodesToFinish: Boolean,
extraCordappPackagesToScan: List<String>,
val notarySpecs: List<NotarySpec>
) : DriverDSLInternalInterface {
private var _executorService: ScheduledExecutorService? = null
val executorService get() = _executorService!!
private var _shutdownManager: ShutdownManager? = null
override val shutdownManager get() = _shutdownManager!!
private val cordappPackages = extraCordappPackagesToScan + getCallerPackage()
// TODO: this object will copy NodeInfo files from started nodes to other nodes additional-node-infos/
// This uses the FileSystem and adds a delay (~5 seconds) given by the time we wait before polling the file system.
// Investigate whether we can avoid that.
private val nodeInfoFilesCopier = NodeInfoFilesCopier()
// Map from a nodes legal name to an observable emitting the number of nodes in its network map.
private val countObservables = mutableMapOf<CordaX500Name, Observable<Int>>()
private lateinit var _notaries: List<NotaryHandle>
override val notaryHandles: List<NotaryHandle> get() = _notaries
class State {
val processes = ArrayList<Process>()
}
private val state = ThreadBox(State())
//TODO: remove this once we can bundle quasar properly.
private val quasarJarPath: String by lazy {
val cl = ClassLoader.getSystemClassLoader()
val urls = (cl as URLClassLoader).urLs
val quasarPattern = ".*quasar.*\\.jar$".toRegex()
val quasarFileUrl = urls.first { quasarPattern.matches(it.path) }
Paths.get(quasarFileUrl.toURI()).toString()
}
override fun shutdown() {
if (waitForNodesToFinish) {
state.locked {
processes.forEach { it.waitFor() }
}
}
_shutdownManager?.shutdown()
_executorService?.shutdownNow()
}
private fun establishRpc(config: NodeConfiguration, processDeathFuture: CordaFuture<out Process>): CordaFuture<CordaRPCOps> {
val rpcAddress = config.rpcAddress!!
val client = CordaRPCClient(rpcAddress)
val connectionFuture = poll(executorService, "RPC connection") {
try {
client.start(config.rpcUsers[0].username, config.rpcUsers[0].password)
} catch (e: Exception) {
if (processDeathFuture.isDone) throw e
log.error("Exception $e, Retrying RPC connection at $rpcAddress")
null
}
}
return firstOf(connectionFuture, processDeathFuture) {
if (it == processDeathFuture) {
throw ListenProcessDeathException(rpcAddress, processDeathFuture.getOrThrow())
}
val connection = connectionFuture.getOrThrow()
shutdownManager.registerShutdown(connection::close)
connection.proxy
}
}
override fun startNode(
defaultParameters: NodeParameters,
providedName: CordaX500Name?,
rpcUsers: List<User>,
verifierType: VerifierType,
customOverrides: Map<String, Any?>,
startInSameProcess: Boolean?,
maximumHeapSize: String
): CordaFuture<NodeHandle> {
val p2pAddress = portAllocation.nextHostAndPort()
val rpcAddress = portAllocation.nextHostAndPort()
val webAddress = portAllocation.nextHostAndPort()
// TODO: Derive name from the full picked name, don't just wrap the common name
val name = providedName ?: CordaX500Name(organisation = "${oneOf(names).organisation}-${p2pAddress.port}", locality = "London", country = "GB")
val users = rpcUsers.map { it.copy(permissions = it.permissions + DRIVER_REQUIRED_PERMISSIONS) }
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
configOverrides = configOf(
"myLegalName" to name.toString(),
"p2pAddress" to p2pAddress.toString(),
"rpcAddress" to rpcAddress.toString(),
"webAddress" to webAddress.toString(),
"useTestClock" to useTestClock,
"rpcUsers" to if (users.isEmpty()) defaultRpcUserList else users.map { it.toConfig().root().unwrapped() },
"verifierType" to verifierType.name
) + customOverrides
)
return startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize)
}
internal fun startCordformNode(cordform: CordformNode): CordaFuture<NodeHandle> {
val name = CordaX500Name.parse(cordform.name)
// TODO We shouldn't have to allocate an RPC or web address if they're not specified. We're having to do this because of startNodeInternal
val rpcAddress = if (cordform.rpcAddress == null) mapOf("rpcAddress" to portAllocation.nextHostAndPort().toString()) else emptyMap()
val webAddress = cordform.webAddress?.let { NetworkHostAndPort.parse(it) } ?: portAllocation.nextHostAndPort()
val notary = if (cordform.notary != null) mapOf("notary" to cordform.notary) else emptyMap()
val rpcUsers = cordform.rpcUsers
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
configOverrides = cordform.config + rpcAddress + notary + mapOf(
"rpcUsers" to if (rpcUsers.isEmpty()) defaultRpcUserList else rpcUsers
)
)
return startNodeInternal(config, webAddress, null, "200m")
}
private fun queryWebserver(handle: NodeHandle, process: Process): WebserverHandle {
val protocol = if (handle.configuration.useHTTPS) "https://" else "http://"
val url = URL("$protocol${handle.webAddress}/api/status")
val client = OkHttpClient.Builder().connectTimeout(5, SECONDS).readTimeout(60, SECONDS).build()
while (process.isAlive) try {
val response = client.newCall(Request.Builder().url(url).build()).execute()
if (response.isSuccessful && (response.body().string() == "started")) {
return WebserverHandle(handle.webAddress, process)
}
} catch (e: ConnectException) {
log.debug("Retrying webserver info at ${handle.webAddress}")
}
throw IllegalStateException("Webserver at ${handle.webAddress} has died")
}
override fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture<WebserverHandle> {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val process = DriverDSL.startWebserver(handle, debugPort, maximumHeapSize)
shutdownManager.registerProcessShutdown(process)
val webReadyFuture = addressMustBeBoundFuture(executorService, handle.webAddress, process)
return webReadyFuture.map { queryWebserver(handle, process) }
}
override fun start() {
_executorService = Executors.newScheduledThreadPool(2, ThreadFactoryBuilder().setNameFormat("driver-pool-thread-%d").build())
_shutdownManager = ShutdownManager(executorService)
shutdownManager.registerShutdown { nodeInfoFilesCopier.close() }
val notaryInfos = generateNotaryIdentities()
val nodeHandles = startNotaries()
_notaries = notaryInfos.zip(nodeHandles) { (identity, validating), nodes -> NotaryHandle(identity, validating, nodes) }
}
private fun generateNotaryIdentities(): List<Pair<Party, Boolean>> {
return notarySpecs.map { spec ->
val identity = if (spec.cluster == null) {
ServiceIdentityGenerator.generateToDisk(
dirs = listOf(baseDirectory(spec.name)),
serviceName = spec.name.copy(commonName = NotaryService.constructId(validating = spec.validating))
)
} else {
ServiceIdentityGenerator.generateToDisk(
dirs = generateNodeNames(spec).map { baseDirectory(it) },
serviceName = spec.name
)
}
Pair(identity, spec.validating)
}
}
private fun generateNodeNames(spec: NotarySpec): List<CordaX500Name> {
return (0 until spec.cluster!!.clusterSize).map { spec.name.copy(commonName = null, organisation = "${spec.name.organisation}-$it") }
}
private fun startNotaries(): List<CordaFuture<List<NodeHandle>>> {
return notarySpecs.map {
when {
it.cluster == null -> startSingleNotary(it)
it.cluster is ClusterSpec.Raft -> startRaftNotaryCluster(it)
else -> throw IllegalArgumentException("BFT-SMaRt not supported")
}
}
}
// TODO This mapping is done is several places including the gradle plugin. In general we need a better way of
// generating the configs for the nodes, probably making use of Any.toConfig()
private fun NotaryConfig.toConfigMap(): Map<String, Any> = mapOf("notary" to toConfig().root().unwrapped())
private fun startSingleNotary(spec: NotarySpec): CordaFuture<List<NodeHandle>> {
return startNode(
providedName = spec.name,
rpcUsers = spec.rpcUsers,
verifierType = spec.verifierType,
customOverrides = NotaryConfig(spec.validating).toConfigMap()
).map { listOf(it) }
}
private fun startRaftNotaryCluster(spec: NotarySpec): CordaFuture<List<NodeHandle>> {
fun notaryConfig(nodeAddress: NetworkHostAndPort, clusterAddress: NetworkHostAndPort? = null): Map<String, Any> {
val clusterAddresses = if (clusterAddress != null) listOf(clusterAddress) else emptyList()
val config = NotaryConfig(
validating = spec.validating,
raft = RaftConfig(nodeAddress = nodeAddress, clusterAddresses = clusterAddresses))
return config.toConfigMap()
}
val nodeNames = generateNodeNames(spec)
val clusterAddress = portAllocation.nextHostAndPort()
// Start the first node that will bootstrap the cluster
val firstNodeFuture = startNode(
providedName = nodeNames[0],
rpcUsers = spec.rpcUsers,
verifierType = spec.verifierType,
customOverrides = notaryConfig(clusterAddress) + mapOf(
"database.serverNameTablePrefix" to nodeNames[0].toString().replace(Regex("[^0-9A-Za-z]+"), "")
)
)
// All other nodes will join the cluster
val restNodeFutures = nodeNames.drop(1).map {
val nodeAddress = portAllocation.nextHostAndPort()
startNode(
providedName = it,
rpcUsers = spec.rpcUsers,
verifierType = spec.verifierType,
customOverrides = notaryConfig(nodeAddress, clusterAddress) + mapOf(
"database.serverNameTablePrefix" to it.toString().replace(Regex("[^0-9A-Za-z]+"), "")
)
)
}
return firstNodeFuture.flatMap { first ->
restNodeFutures.transpose().map { rest -> listOf(first) + rest }
}
}
fun baseDirectory(nodeName: CordaX500Name): Path {
val nodeDirectoryName = nodeName.organisation.filter { !it.isWhitespace() }
return driverDirectory / nodeDirectoryName
}
override fun baseDirectory(nodeName: String): Path = baseDirectory(CordaX500Name.parse(nodeName))
/**
* @param initial number of nodes currently in the network map of a running node.
* @param networkMapCacheChangeObservable an observable returning the updates to the node network map.
* @return a [ConnectableObservable] which emits a new [Int] every time the number of registered nodes changes
* the initial value emitted is always [initial]
*/
private fun nodeCountObservable(initial: Int, networkMapCacheChangeObservable: Observable<NetworkMapCache.MapChange>):
ConnectableObservable<Int> {
val count = AtomicInteger(initial)
return networkMapCacheChangeObservable.map { it ->
when (it) {
is NetworkMapCache.MapChange.Added -> count.incrementAndGet()
is NetworkMapCache.MapChange.Removed -> count.decrementAndGet()
is NetworkMapCache.MapChange.Modified -> count.get()
}
}.startWith(initial).replay()
}
/**
* @param rpc the [CordaRPCOps] of a newly started node.
* @return a [CordaFuture] which resolves when every node started by driver has in its network map a number of nodes
* equal to the number of running nodes. The future will yield the number of connected nodes.
*/
private fun allNodesConnected(rpc: CordaRPCOps): CordaFuture<Int> {
val (snapshot, updates) = rpc.networkMapFeed()
val counterObservable = nodeCountObservable(snapshot.size, updates)
countObservables[rpc.nodeInfo().legalIdentities[0].name] = counterObservable
/* TODO: this might not always be the exact number of nodes one has to wait for,
* for example in the following sequence
* 1 start 3 nodes in order, A, B, C.
* 2 before the future returned by this function resolves, kill B
* At that point this future won't ever resolve as it will wait for nodes to know 3 other nodes.
*/
val requiredNodes = countObservables.size
// This is an observable which yield the minimum number of nodes in each node network map.
val smallestSeenNetworkMapSize = Observable.combineLatest(countObservables.values.toList()) { args: Array<Any> ->
args.map { it as Int }.min() ?: 0
}
val future = smallestSeenNetworkMapSize.filter { it >= requiredNodes }.toFuture()
counterObservable.connect()
return future
}
private fun startNodeInternal(config: Config,
webAddress: NetworkHostAndPort,
startInProcess: Boolean?,
maximumHeapSize: String): CordaFuture<NodeHandle> {
val configuration = config.parseAsNodeConfiguration()
val baseDirectory = configuration.baseDirectory.createDirectories()
nodeInfoFilesCopier.addConfig(baseDirectory)
val onNodeExit: () -> Unit = {
nodeInfoFilesCopier.removeConfig(baseDirectory)
countObservables.remove(configuration.myLegalName)
}
if (startInProcess ?: startNodesInProcess) {
val nodeAndThreadFuture = startInProcessNode(executorService, configuration, config, cordappPackages)
shutdownManager.registerShutdown(
nodeAndThreadFuture.map { (node, thread) ->
{
node.dispose()
thread.interrupt()
}
}
)
return nodeAndThreadFuture.flatMap { (node, thread) ->
establishRpc(configuration, openFuture()).flatMap { rpc ->
allNodesConnected(rpc).map {
NodeHandle.InProcess(rpc.nodeInfo(), rpc, configuration, webAddress, node, thread, onNodeExit)
}
}
}
} else {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val process = startOutOfProcessNode(configuration, config, quasarJarPath, debugPort, systemProperties, cordappPackages, maximumHeapSize)
if (waitForNodesToFinish) {
state.locked {
processes += process
}
} else {
shutdownManager.registerProcessShutdown(process)
}
val p2pReadyFuture = addressMustBeBoundFuture(executorService, configuration.p2pAddress, process)
return p2pReadyFuture.flatMap {
val processDeathFuture = poll(executorService, "process death") {
if (process.isAlive) null else process
}
establishRpc(configuration, processDeathFuture).flatMap { rpc ->
// Check for all nodes to have all other nodes in background in case RPC is failing over:
val networkMapFuture = executorService.fork { allNodesConnected(rpc) }.flatMap { it }
firstOf(processDeathFuture, networkMapFuture) {
if (it == processDeathFuture) {
throw ListenProcessDeathException(configuration.p2pAddress, process)
}
processDeathFuture.cancel(false)
log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: $webAddress")
NodeHandle.OutOfProcess(rpc.nodeInfo(), rpc, configuration, webAddress, debugPort, process,
onNodeExit)
}
}
}
}
}
override fun <A> pollUntilNonNull(pollName: String, pollInterval: Duration, warnCount: Int, check: () -> A?): CordaFuture<A> {
val pollFuture = poll(executorService, pollName, pollInterval, warnCount, check)
shutdownManager.registerShutdown { pollFuture.cancel(true) }
return pollFuture
}
companion object {
private val defaultRpcUserList = listOf(User("default", "default", setOf("ALL")).toConfig().root().unwrapped())
private val names = arrayOf(
ALICE.name,
BOB.name,
DUMMY_BANK_A.name
)
private fun <A> oneOf(array: Array<A>) = array[Random().nextInt(array.size)]
private fun startInProcessNode(
executorService: ScheduledExecutorService,
nodeConf: NodeConfiguration,
config: Config,
cordappPackages: List<String>
): CordaFuture<Pair<StartedNode<Node>, Thread>> {
return executorService.fork {
log.info("Starting in-process Node ${nodeConf.myLegalName.organisation}")
// Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config)
// TODO pass the version in?
val node = InProcessNode(nodeConf, MOCK_VERSION_INFO, cordappPackages).start()
val nodeThread = thread(name = nodeConf.myLegalName.organisation) {
node.internals.run()
}
node to nodeThread
}.flatMap {
nodeAndThread -> addressMustBeBoundFuture(executorService, nodeConf.p2pAddress).map { nodeAndThread }
}
}
private fun startOutOfProcessNode(
nodeConf: NodeConfiguration,
config: Config,
quasarJarPath: String,
debugPort: Int?,
overriddenSystemProperties: Map<String, String>,
cordappPackages: List<String>,
maximumHeapSize: String
): Process {
log.info("Starting out-of-process Node ${nodeConf.myLegalName.organisation}, debug port is " + (debugPort ?: "not enabled"))
// Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config)
val systemProperties = overriddenSystemProperties + mapOf(
"name" to nodeConf.myLegalName,
"visualvm.display.name" to "corda-${nodeConf.myLegalName}",
Node.scanPackagesSystemProperty to cordappPackages.joinToString(Node.scanPackagesSeparator),
"java.io.tmpdir" to System.getProperty("java.io.tmpdir"), // Inherit from parent process
"log4j2.debug" to if(debugPort != null) "true" else "false"
)
// See experimental/quasar-hook/README.md for how to generate.
val excludePattern = "x(antlr**;bftsmart**;ch**;co.paralleluniverse**;com.codahale**;com.esotericsoftware**;" +
"com.fasterxml**;com.google**;com.ibm**;com.intellij**;com.jcabi**;com.nhaarman**;com.opengamma**;" +
"com.typesafe**;com.zaxxer**;de.javakaffee**;groovy**;groovyjarjarantlr**;groovyjarjarasm**;io.atomix**;" +
"io.github**;io.netty**;jdk**;joptsimple**;junit**;kotlin**;net.bytebuddy**;net.i2p**;org.apache**;" +
"org.assertj**;org.bouncycastle**;org.codehaus**;org.crsh**;org.dom4j**;org.fusesource**;org.h2**;" +
"org.hamcrest**;org.hibernate**;org.jboss**;org.jcp**;org.joda**;org.junit**;org.mockito**;org.objectweb**;" +
"org.objenesis**;org.slf4j**;org.w3c**;org.xml**;org.yaml**;reflectasm**;rx**)"
val extraJvmArguments = systemProperties.removeResolvedClasspath().map { "-D${it.key}=${it.value}" } +
"-javaagent:$quasarJarPath=$excludePattern"
val loggingLevel = if (debugPort == null) "INFO" else "DEBUG"
return ProcessUtilities.startCordaProcess(
className = "net.corda.node.Corda", // cannot directly get class for this, so just use string
arguments = listOf(
"--base-directory=${nodeConf.baseDirectory}",
"--logging-level=$loggingLevel",
"--no-local-shell"
),
jdwpPort = debugPort,
extraJvmArguments = extraJvmArguments,
errorLogPath = nodeConf.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "error.log",
workingDirectory = nodeConf.baseDirectory,
maximumHeapSize = maximumHeapSize
)
}
private fun startWebserver(handle: NodeHandle, debugPort: Int?, maximumHeapSize: String): Process {
val className = "net.corda.webserver.WebServer"
return ProcessUtilities.startCordaProcess(
className = className, // cannot directly get class for this, so just use string
arguments = listOf("--base-directory", handle.configuration.baseDirectory.toString()),
jdwpPort = debugPort,
extraJvmArguments = listOf(
"-Dname=node-${handle.configuration.p2pAddress}-webserver",
"-Djava.io.tmpdir=${System.getProperty("java.io.tmpdir")}" // Inherit from parent process
),
errorLogPath = Paths.get("error.$className.log"),
workingDirectory = null,
maximumHeapSize = maximumHeapSize
)
}
private fun getCallerPackage(): String {
return Exception()
.stackTrace
.first { it.fileName != "Driver.kt" }
.let { Class.forName(it.className).`package`?.name }
?: throw IllegalStateException("Function instantiating driver must be defined in a package.")
}
/**
* We have an alternative way of specifying classpath for spawned process: by using "-cp" option. So duplicating the setting of this
* rather long string is un-necessary and can be harmful on Windows.
*/
private fun Map<String, Any>.removeResolvedClasspath(): Map<String, Any> {
return filterNot { it.key == "java.class.path" }
}
}
}
fun writeConfig(path: Path, filename: String, config: Config) {
val configString = config.root().render(ConfigRenderOptions.defaults())
configString.byteInputStream().copyTo(path / filename, REPLACE_EXISTING)
}

View File

@ -0,0 +1,92 @@
package net.corda.testing.driver
import net.corda.core.concurrent.CordaFuture
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.map
import net.corda.node.internal.Node
import net.corda.node.services.config.VerifierType
import net.corda.nodeapi.internal.config.User
import net.corda.testing.node.NotarySpec
import java.nio.file.Path
interface DriverDSL {
/** Returns a list of [NotaryHandle]s matching the list of [NotarySpec]s passed into [driver]. */
val notaryHandles: List<NotaryHandle>
/**
* Returns the [NotaryHandle] for the single notary on the network. Throws if there are none or more than one.
* @see notaryHandles
*/
val defaultNotaryHandle: NotaryHandle get() {
return when (notaryHandles.size) {
0 -> throw IllegalStateException("There are no notaries defined on the network")
1 -> notaryHandles[0]
else -> throw IllegalStateException("There is more than one notary defined on the network")
}
}
/**
* Returns the identity of the single notary on the network. Throws if there are none or more than one.
* @see defaultNotaryHandle
*/
val defaultNotaryIdentity: Party get() = defaultNotaryHandle.identity
/**
* Returns a [CordaFuture] on the [NodeHandle] for the single-node notary on the network. Throws if there
* are no notaries or more than one, or if the notary is a distributed cluster.
* @see defaultNotaryHandle
* @see notaryHandles
*/
val defaultNotaryNode: CordaFuture<NodeHandle> get() {
return defaultNotaryHandle.nodeHandles.map {
it.singleOrNull() ?: throw IllegalStateException("Default notary is not a single node")
}
}
/**
* Start a node.
*
* @param defaultParameters The default parameters for the node. Allows the node to be configured in builder style
* when called from Java code.
* @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something
* random. Note that this must be unique as the driver uses it as a primary key!
* @param verifierType The type of transaction verifier to use. See: [VerifierType]
* @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list.
* @param startInSameProcess Determines if the node should be started inside the same process the Driver is running
* in. If null the Driver-level value will be used.
* @return A [CordaFuture] on the [NodeHandle] to the node. The future will complete when the node is available.
*/
fun startNode(
defaultParameters: NodeParameters = NodeParameters(),
providedName: CordaX500Name? = defaultParameters.providedName,
rpcUsers: List<User> = defaultParameters.rpcUsers,
verifierType: VerifierType = defaultParameters.verifierType,
customOverrides: Map<String, Any?> = defaultParameters.customOverrides,
startInSameProcess: Boolean? = defaultParameters.startInSameProcess,
maximumHeapSize: String = defaultParameters.maximumHeapSize): CordaFuture<NodeHandle>
/**
* Helper function for starting a [Node] with custom parameters from Java.
*
* @param parameters The default parameters for the driver.
* @return [NodeHandle] that will be available sometime in the future.
*/
fun startNode(parameters: NodeParameters): CordaFuture<NodeHandle> = startNode(defaultParameters = parameters)
/** Call [startWebserver] with a default maximumHeapSize. */
fun startWebserver(handle: NodeHandle): CordaFuture<WebserverHandle> = startWebserver(handle, "200m")
/**
* Starts a web server for a node
* @param handle The handle for the node that this webserver connects to via RPC.
* @param maximumHeapSize Argument for JVM -Xmx option e.g. "200m".
*/
fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture<WebserverHandle>
/**
* Returns the base directory for a node with the given [CordaX500Name]. This method is useful if the base directory
* is needed before the node is started.
*/
fun baseDirectory(nodeName: CordaX500Name): Path
}

View File

@ -0,0 +1,683 @@
package net.corda.testing.internal
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.typesafe.config.Config
import com.typesafe.config.ConfigRenderOptions
import net.corda.client.rpc.CordaRPCClient
import net.corda.cordform.CordformContext
import net.corda.cordform.CordformNode
import net.corda.core.concurrent.CordaFuture
import net.corda.core.concurrent.firstOf
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.concurrent.*
import net.corda.core.internal.copyTo
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NotaryService
import net.corda.core.toFuture
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.millis
import net.corda.node.internal.Node
import net.corda.node.internal.NodeStartup
import net.corda.node.internal.StartedNode
import net.corda.node.services.Permissions
import net.corda.node.services.config.*
import net.corda.node.utilities.ServiceIdentityGenerator
import net.corda.nodeapi.NodeInfoFilesCopier
import net.corda.nodeapi.internal.addShutdownHook
import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.config.toConfig
import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.testing.DUMMY_BANK_A
import net.corda.testing.driver.*
import net.corda.testing.node.ClusterSpec
import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO
import net.corda.testing.node.NotarySpec
import net.corda.testing.setGlobalSerialization
import okhttp3.OkHttpClient
import okhttp3.Request
import rx.Observable
import rx.observables.ConnectableObservable
import java.net.ConnectException
import java.net.URL
import java.net.URLClassLoader
import java.nio.file.Path
import java.nio.file.Paths
import java.nio.file.StandardCopyOption
import java.time.Duration
import java.time.Instant
import java.time.ZoneOffset
import java.time.format.DateTimeFormatter
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
class DriverDSLImpl(
val portAllocation: PortAllocation,
val debugPortAllocation: PortAllocation,
val systemProperties: Map<String, String>,
val driverDirectory: Path,
val useTestClock: Boolean,
val isDebug: Boolean,
val startNodesInProcess: Boolean,
val waitForNodesToFinish: Boolean,
extraCordappPackagesToScan: List<String>,
val notarySpecs: List<NotarySpec>
) : InternalDriverDSL {
private var _executorService: ScheduledExecutorService? = null
val executorService get() = _executorService!!
private var _shutdownManager: ShutdownManager? = null
override val shutdownManager get() = _shutdownManager!!
private val cordappPackages = extraCordappPackagesToScan + getCallerPackage()
// TODO: this object will copy NodeInfo files from started nodes to other nodes additional-node-infos/
// This uses the FileSystem and adds a delay (~5 seconds) given by the time we wait before polling the file system.
// Investigate whether we can avoid that.
private val nodeInfoFilesCopier = NodeInfoFilesCopier()
// Map from a nodes legal name to an observable emitting the number of nodes in its network map.
private val countObservables = mutableMapOf<CordaX500Name, Observable<Int>>()
private lateinit var _notaries: List<NotaryHandle>
override val notaryHandles: List<NotaryHandle> get() = _notaries
class State {
val processes = ArrayList<Process>()
}
private val state = ThreadBox(State())
//TODO: remove this once we can bundle quasar properly.
private val quasarJarPath: String by lazy {
val cl = ClassLoader.getSystemClassLoader()
val urls = (cl as URLClassLoader).urLs
val quasarPattern = ".*quasar.*\\.jar$".toRegex()
val quasarFileUrl = urls.first { quasarPattern.matches(it.path) }
Paths.get(quasarFileUrl.toURI()).toString()
}
override fun shutdown() {
if (waitForNodesToFinish) {
state.locked {
processes.forEach { it.waitFor() }
}
}
_shutdownManager?.shutdown()
_executorService?.shutdownNow()
}
private fun establishRpc(config: NodeConfiguration, processDeathFuture: CordaFuture<out Process>): CordaFuture<CordaRPCOps> {
val rpcAddress = config.rpcAddress!!
val client = CordaRPCClient(rpcAddress)
val connectionFuture = poll(executorService, "RPC connection") {
try {
client.start(config.rpcUsers[0].username, config.rpcUsers[0].password)
} catch (e: Exception) {
if (processDeathFuture.isDone) throw e
log.error("Exception $e, Retrying RPC connection at $rpcAddress")
null
}
}
return firstOf(connectionFuture, processDeathFuture) {
if (it == processDeathFuture) {
throw ListenProcessDeathException(rpcAddress, processDeathFuture.getOrThrow())
}
val connection = connectionFuture.getOrThrow()
shutdownManager.registerShutdown(connection::close)
connection.proxy
}
}
override fun startNode(
defaultParameters: NodeParameters,
providedName: CordaX500Name?,
rpcUsers: List<User>,
verifierType: VerifierType,
customOverrides: Map<String, Any?>,
startInSameProcess: Boolean?,
maximumHeapSize: String
): CordaFuture<NodeHandle> {
val p2pAddress = portAllocation.nextHostAndPort()
val rpcAddress = portAllocation.nextHostAndPort()
val webAddress = portAllocation.nextHostAndPort()
// TODO: Derive name from the full picked name, don't just wrap the common name
val name = providedName ?: CordaX500Name(organisation = "${oneOf(names).organisation}-${p2pAddress.port}", locality = "London", country = "GB")
val users = rpcUsers.map { it.copy(permissions = it.permissions + DRIVER_REQUIRED_PERMISSIONS) }
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
configOverrides = configOf(
"myLegalName" to name.toString(),
"p2pAddress" to p2pAddress.toString(),
"rpcAddress" to rpcAddress.toString(),
"webAddress" to webAddress.toString(),
"useTestClock" to useTestClock,
"rpcUsers" to if (users.isEmpty()) defaultRpcUserList else users.map { it.toConfig().root().unwrapped() },
"verifierType" to verifierType.name
) + customOverrides
)
return startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize)
}
internal fun startCordformNode(cordform: CordformNode): CordaFuture<NodeHandle> {
val name = CordaX500Name.parse(cordform.name)
// TODO We shouldn't have to allocate an RPC or web address if they're not specified. We're having to do this because of startNodeInternal
val rpcAddress = if (cordform.rpcAddress == null) mapOf("rpcAddress" to portAllocation.nextHostAndPort().toString()) else emptyMap()
val webAddress = cordform.webAddress?.let { NetworkHostAndPort.parse(it) } ?: portAllocation.nextHostAndPort()
val notary = if (cordform.notary != null) mapOf("notary" to cordform.notary) else emptyMap()
val rpcUsers = cordform.rpcUsers
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
configOverrides = cordform.config + rpcAddress + notary + mapOf(
"rpcUsers" to if (rpcUsers.isEmpty()) defaultRpcUserList else rpcUsers
)
)
return startNodeInternal(config, webAddress, null, "200m")
}
private fun queryWebserver(handle: NodeHandle, process: Process): WebserverHandle {
val protocol = if (handle.configuration.useHTTPS) "https://" else "http://"
val url = URL("$protocol${handle.webAddress}/api/status")
val client = OkHttpClient.Builder().connectTimeout(5, TimeUnit.SECONDS).readTimeout(60, TimeUnit.SECONDS).build()
while (process.isAlive) try {
val response = client.newCall(Request.Builder().url(url).build()).execute()
if (response.isSuccessful && (response.body().string() == "started")) {
return WebserverHandle(handle.webAddress, process)
}
} catch (e: ConnectException) {
log.debug("Retrying webserver info at ${handle.webAddress}")
}
throw IllegalStateException("Webserver at ${handle.webAddress} has died")
}
override fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture<WebserverHandle> {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val process = startWebserver(handle, debugPort, maximumHeapSize)
shutdownManager.registerProcessShutdown(process)
val webReadyFuture = addressMustBeBoundFuture(executorService, handle.webAddress, process)
return webReadyFuture.map { queryWebserver(handle, process) }
}
override fun start() {
_executorService = Executors.newScheduledThreadPool(2, ThreadFactoryBuilder().setNameFormat("driver-pool-thread-%d").build())
_shutdownManager = ShutdownManager(executorService)
shutdownManager.registerShutdown { nodeInfoFilesCopier.close() }
val notaryInfos = generateNotaryIdentities()
val nodeHandles = startNotaries()
_notaries = notaryInfos.zip(nodeHandles) { (identity, validating), nodes -> NotaryHandle(identity, validating, nodes) }
}
private fun generateNotaryIdentities(): List<Pair<Party, Boolean>> {
return notarySpecs.map { spec ->
val identity = if (spec.cluster == null) {
ServiceIdentityGenerator.generateToDisk(
dirs = listOf(baseDirectory(spec.name)),
serviceName = spec.name.copy(commonName = NotaryService.constructId(validating = spec.validating))
)
} else {
ServiceIdentityGenerator.generateToDisk(
dirs = generateNodeNames(spec).map { baseDirectory(it) },
serviceName = spec.name
)
}
Pair(identity, spec.validating)
}
}
private fun generateNodeNames(spec: NotarySpec): List<CordaX500Name> {
return (0 until spec.cluster!!.clusterSize).map { spec.name.copy(commonName = null, organisation = "${spec.name.organisation}-$it") }
}
private fun startNotaries(): List<CordaFuture<List<NodeHandle>>> {
return notarySpecs.map {
when {
it.cluster == null -> startSingleNotary(it)
it.cluster is ClusterSpec.Raft -> startRaftNotaryCluster(it)
else -> throw IllegalArgumentException("BFT-SMaRt not supported")
}
}
}
// TODO This mapping is done is several places including the gradle plugin. In general we need a better way of
// generating the configs for the nodes, probably making use of Any.toConfig()
private fun NotaryConfig.toConfigMap(): Map<String, Any> = mapOf("notary" to toConfig().root().unwrapped())
private fun startSingleNotary(spec: NotarySpec): CordaFuture<List<NodeHandle>> {
return startNode(
providedName = spec.name,
rpcUsers = spec.rpcUsers,
verifierType = spec.verifierType,
customOverrides = NotaryConfig(spec.validating).toConfigMap()
).map { listOf(it) }
}
private fun startRaftNotaryCluster(spec: NotarySpec): CordaFuture<List<NodeHandle>> {
fun notaryConfig(nodeAddress: NetworkHostAndPort, clusterAddress: NetworkHostAndPort? = null): Map<String, Any> {
val clusterAddresses = if (clusterAddress != null) listOf(clusterAddress) else emptyList()
val config = NotaryConfig(
validating = spec.validating,
raft = RaftConfig(nodeAddress = nodeAddress, clusterAddresses = clusterAddresses))
return config.toConfigMap()
}
val nodeNames = generateNodeNames(spec)
val clusterAddress = portAllocation.nextHostAndPort()
// Start the first node that will bootstrap the cluster
val firstNodeFuture = startNode(
providedName = nodeNames[0],
rpcUsers = spec.rpcUsers,
verifierType = spec.verifierType,
customOverrides = notaryConfig(clusterAddress) + mapOf(
"database.serverNameTablePrefix" to nodeNames[0].toString().replace(Regex("[^0-9A-Za-z]+"), "")
)
)
// All other nodes will join the cluster
val restNodeFutures = nodeNames.drop(1).map {
val nodeAddress = portAllocation.nextHostAndPort()
startNode(
providedName = it,
rpcUsers = spec.rpcUsers,
verifierType = spec.verifierType,
customOverrides = notaryConfig(nodeAddress, clusterAddress) + mapOf(
"database.serverNameTablePrefix" to it.toString().replace(Regex("[^0-9A-Za-z]+"), "")
)
)
}
return firstNodeFuture.flatMap { first ->
restNodeFutures.transpose().map { rest -> listOf(first) + rest }
}
}
override fun baseDirectory(nodeName: CordaX500Name): Path {
val nodeDirectoryName = nodeName.organisation.filter { !it.isWhitespace() }
return driverDirectory / nodeDirectoryName
}
/**
* @param initial number of nodes currently in the network map of a running node.
* @param networkMapCacheChangeObservable an observable returning the updates to the node network map.
* @return a [ConnectableObservable] which emits a new [Int] every time the number of registered nodes changes
* the initial value emitted is always [initial]
*/
private fun nodeCountObservable(initial: Int, networkMapCacheChangeObservable: Observable<NetworkMapCache.MapChange>):
ConnectableObservable<Int> {
val count = AtomicInteger(initial)
return networkMapCacheChangeObservable.map { it ->
when (it) {
is NetworkMapCache.MapChange.Added -> count.incrementAndGet()
is NetworkMapCache.MapChange.Removed -> count.decrementAndGet()
is NetworkMapCache.MapChange.Modified -> count.get()
}
}.startWith(initial).replay()
}
/**
* @param rpc the [CordaRPCOps] of a newly started node.
* @return a [CordaFuture] which resolves when every node started by driver has in its network map a number of nodes
* equal to the number of running nodes. The future will yield the number of connected nodes.
*/
private fun allNodesConnected(rpc: CordaRPCOps): CordaFuture<Int> {
val (snapshot, updates) = rpc.networkMapFeed()
val counterObservable = nodeCountObservable(snapshot.size, updates)
countObservables[rpc.nodeInfo().legalIdentities[0].name] = counterObservable
/* TODO: this might not always be the exact number of nodes one has to wait for,
* for example in the following sequence
* 1 start 3 nodes in order, A, B, C.
* 2 before the future returned by this function resolves, kill B
* At that point this future won't ever resolve as it will wait for nodes to know 3 other nodes.
*/
val requiredNodes = countObservables.size
// This is an observable which yield the minimum number of nodes in each node network map.
val smallestSeenNetworkMapSize = Observable.combineLatest(countObservables.values.toList()) { args: Array<Any> ->
args.map { it as Int }.min() ?: 0
}
val future = smallestSeenNetworkMapSize.filter { it >= requiredNodes }.toFuture()
counterObservable.connect()
return future
}
private fun startNodeInternal(config: Config,
webAddress: NetworkHostAndPort,
startInProcess: Boolean?,
maximumHeapSize: String): CordaFuture<NodeHandle> {
val configuration = config.parseAsNodeConfiguration()
val baseDirectory = configuration.baseDirectory.createDirectories()
nodeInfoFilesCopier.addConfig(baseDirectory)
val onNodeExit: () -> Unit = {
nodeInfoFilesCopier.removeConfig(baseDirectory)
countObservables.remove(configuration.myLegalName)
}
if (startInProcess ?: startNodesInProcess) {
val nodeAndThreadFuture = startInProcessNode(executorService, configuration, config, cordappPackages)
shutdownManager.registerShutdown(
nodeAndThreadFuture.map { (node, thread) ->
{
node.dispose()
thread.interrupt()
}
}
)
return nodeAndThreadFuture.flatMap { (node, thread) ->
establishRpc(configuration, openFuture()).flatMap { rpc ->
allNodesConnected(rpc).map {
NodeHandle.InProcess(rpc.nodeInfo(), rpc, configuration, webAddress, node, thread, onNodeExit)
}
}
}
} else {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val process = startOutOfProcessNode(configuration, config, quasarJarPath, debugPort, systemProperties, cordappPackages, maximumHeapSize)
if (waitForNodesToFinish) {
state.locked {
processes += process
}
} else {
shutdownManager.registerProcessShutdown(process)
}
val p2pReadyFuture = addressMustBeBoundFuture(executorService, configuration.p2pAddress, process)
return p2pReadyFuture.flatMap {
val processDeathFuture = poll(executorService, "process death") {
if (process.isAlive) null else process
}
establishRpc(configuration, processDeathFuture).flatMap { rpc ->
// Check for all nodes to have all other nodes in background in case RPC is failing over:
val networkMapFuture = executorService.fork { allNodesConnected(rpc) }.flatMap { it }
firstOf(processDeathFuture, networkMapFuture) {
if (it == processDeathFuture) {
throw ListenProcessDeathException(configuration.p2pAddress, process)
}
processDeathFuture.cancel(false)
log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: $webAddress")
NodeHandle.OutOfProcess(rpc.nodeInfo(), rpc, configuration, webAddress, debugPort, process,
onNodeExit)
}
}
}
}
}
override fun <A> pollUntilNonNull(pollName: String, pollInterval: Duration, warnCount: Int, check: () -> A?): CordaFuture<A> {
val pollFuture = poll(executorService, pollName, pollInterval, warnCount, check)
shutdownManager.registerShutdown { pollFuture.cancel(true) }
return pollFuture
}
companion object {
internal val log = contextLogger()
private val defaultRpcUserList = listOf(User("default", "default", setOf("ALL")).toConfig().root().unwrapped())
private val names = arrayOf(
ALICE.name,
BOB.name,
DUMMY_BANK_A.name
)
/**
* A sub-set of permissions that grant most of the essential operations used in the unit/integration tests as well as
* in demo application like NodeExplorer.
*/
private val DRIVER_REQUIRED_PERMISSIONS = setOf(
Permissions.invokeRpc(CordaRPCOps::nodeInfo),
Permissions.invokeRpc(CordaRPCOps::networkMapFeed),
Permissions.invokeRpc(CordaRPCOps::networkMapSnapshot),
Permissions.invokeRpc(CordaRPCOps::notaryIdentities),
Permissions.invokeRpc(CordaRPCOps::stateMachinesFeed),
Permissions.invokeRpc(CordaRPCOps::stateMachineRecordedTransactionMappingFeed),
Permissions.invokeRpc(CordaRPCOps::nodeInfoFromParty),
Permissions.invokeRpc(CordaRPCOps::internalVerifiedTransactionsFeed),
Permissions.invokeRpc("vaultQueryBy"),
Permissions.invokeRpc("vaultTrackBy"),
Permissions.invokeRpc(CordaRPCOps::registeredFlows)
)
private fun <A> oneOf(array: Array<A>) = array[Random().nextInt(array.size)]
private fun startInProcessNode(
executorService: ScheduledExecutorService,
nodeConf: NodeConfiguration,
config: Config,
cordappPackages: List<String>
): CordaFuture<Pair<StartedNode<Node>, Thread>> {
return executorService.fork {
log.info("Starting in-process Node ${nodeConf.myLegalName.organisation}")
// Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config)
// TODO pass the version in?
val node = InProcessNode(nodeConf, MOCK_VERSION_INFO, cordappPackages).start()
val nodeThread = thread(name = nodeConf.myLegalName.organisation) {
node.internals.run()
}
node to nodeThread
}.flatMap {
nodeAndThread -> addressMustBeBoundFuture(executorService, nodeConf.p2pAddress).map { nodeAndThread }
}
}
private fun startOutOfProcessNode(
nodeConf: NodeConfiguration,
config: Config,
quasarJarPath: String,
debugPort: Int?,
overriddenSystemProperties: Map<String, String>,
cordappPackages: List<String>,
maximumHeapSize: String
): Process {
log.info("Starting out-of-process Node ${nodeConf.myLegalName.organisation}, debug port is " + (debugPort ?: "not enabled"))
// Write node.conf
writeConfig(nodeConf.baseDirectory, "node.conf", config)
val systemProperties = overriddenSystemProperties + mapOf(
"name" to nodeConf.myLegalName,
"visualvm.display.name" to "corda-${nodeConf.myLegalName}",
Node.scanPackagesSystemProperty to cordappPackages.joinToString(Node.scanPackagesSeparator),
"java.io.tmpdir" to System.getProperty("java.io.tmpdir"), // Inherit from parent process
"log4j2.debug" to if(debugPort != null) "true" else "false"
)
// See experimental/quasar-hook/README.md for how to generate.
val excludePattern = "x(antlr**;bftsmart**;ch**;co.paralleluniverse**;com.codahale**;com.esotericsoftware**;" +
"com.fasterxml**;com.google**;com.ibm**;com.intellij**;com.jcabi**;com.nhaarman**;com.opengamma**;" +
"com.typesafe**;com.zaxxer**;de.javakaffee**;groovy**;groovyjarjarantlr**;groovyjarjarasm**;io.atomix**;" +
"io.github**;io.netty**;jdk**;joptsimple**;junit**;kotlin**;net.bytebuddy**;net.i2p**;org.apache**;" +
"org.assertj**;org.bouncycastle**;org.codehaus**;org.crsh**;org.dom4j**;org.fusesource**;org.h2**;" +
"org.hamcrest**;org.hibernate**;org.jboss**;org.jcp**;org.joda**;org.junit**;org.mockito**;org.objectweb**;" +
"org.objenesis**;org.slf4j**;org.w3c**;org.xml**;org.yaml**;reflectasm**;rx**)"
val extraJvmArguments = systemProperties.removeResolvedClasspath().map { "-D${it.key}=${it.value}" } +
"-javaagent:$quasarJarPath=$excludePattern"
val loggingLevel = if (debugPort == null) "INFO" else "DEBUG"
return ProcessUtilities.startCordaProcess(
className = "net.corda.node.Corda", // cannot directly get class for this, so just use string
arguments = listOf(
"--base-directory=${nodeConf.baseDirectory}",
"--logging-level=$loggingLevel",
"--no-local-shell"
),
jdwpPort = debugPort,
extraJvmArguments = extraJvmArguments,
errorLogPath = nodeConf.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "error.log",
workingDirectory = nodeConf.baseDirectory,
maximumHeapSize = maximumHeapSize
)
}
private fun startWebserver(handle: NodeHandle, debugPort: Int?, maximumHeapSize: String): Process {
val className = "net.corda.webserver.WebServer"
return ProcessUtilities.startCordaProcess(
className = className, // cannot directly get class for this, so just use string
arguments = listOf("--base-directory", handle.configuration.baseDirectory.toString()),
jdwpPort = debugPort,
extraJvmArguments = listOf(
"-Dname=node-${handle.configuration.p2pAddress}-webserver",
"-Djava.io.tmpdir=${System.getProperty("java.io.tmpdir")}" // Inherit from parent process
),
errorLogPath = Paths.get("error.$className.log"),
workingDirectory = null,
maximumHeapSize = maximumHeapSize
)
}
/**
* Get the package of the caller to the driver so that it can be added to the list of packages the nodes will scan.
* This makes the driver automatically pick the CorDapp module that it's run from.
*
* This returns List<String> rather than String? to make it easier to bolt onto extraCordappPackagesToScan.
*/
private fun getCallerPackage(): List<String> {
val stackTrace = Throwable().stackTrace
val index = stackTrace.indexOfLast { it.className == "net.corda.testing.driver.Driver" }
// In this case we're dealing with the the RPCDriver or one of it's cousins which are internal and we don't care about them
if (index == -1) return emptyList()
val callerPackage = Class.forName(stackTrace[index + 1].className).`package` ?:
throw IllegalStateException("Function instantiating driver must be defined in a package.")
return listOf(callerPackage.name)
}
/**
* We have an alternative way of specifying classpath for spawned process: by using "-cp" option. So duplicating the setting of this
* rather long string is un-necessary and can be harmful on Windows.
*/
private fun Map<String, Any>.removeResolvedClasspath(): Map<String, Any> {
return filterNot { it.key == "java.class.path" }
}
}
}
interface InternalDriverDSL : DriverDSL, CordformContext {
private companion object {
private val DEFAULT_POLL_INTERVAL = 500.millis
private const val DEFAULT_WARN_COUNT = 120
}
val shutdownManager: ShutdownManager
override fun baseDirectory(nodeName: String): Path = baseDirectory(CordaX500Name.parse(nodeName))
/**
* Polls a function until it returns a non-null value. Note that there is no timeout on the polling.
*
* @param pollName A description of what is being polled.
* @param pollInterval The interval of polling.
* @param warnCount The number of polls after the Driver gives a warning.
* @param check The function being polled.
* @return A future that completes with the non-null value [check] has returned.
*/
fun <A> pollUntilNonNull(pollName: String, pollInterval: Duration = DEFAULT_POLL_INTERVAL, warnCount: Int = DEFAULT_WARN_COUNT, check: () -> A?): CordaFuture<A>
/**
* Polls the given function until it returns true.
* @see pollUntilNonNull
*/
fun pollUntilTrue(pollName: String, pollInterval: Duration = DEFAULT_POLL_INTERVAL, warnCount: Int = DEFAULT_WARN_COUNT, check: () -> Boolean): CordaFuture<Unit> {
return pollUntilNonNull(pollName, pollInterval, warnCount) { if (check()) Unit else null }
}
fun start()
fun shutdown()
}
/**
* This is a helper method to allow extending of the DSL, along the lines of
* interface SomeOtherExposedDSLInterface : DriverDSL
* interface SomeOtherInternalDSLInterface : InternalDriverDSL, SomeOtherExposedDSLInterface
* class SomeOtherDSL(val driverDSL : DriverDSLImpl) : InternalDriverDSL by driverDSL, SomeOtherInternalDSLInterface
*
* @param coerce We need this explicit coercion witness because we can't put an extra DI : D bound in a `where` clause.
*/
fun <DI : DriverDSL, D : InternalDriverDSL, A> genericDriver(
driverDsl: D,
initialiseSerialization: Boolean = true,
coerce: (D) -> DI,
dsl: DI.() -> A
): A {
val serializationEnv = setGlobalSerialization(initialiseSerialization)
val shutdownHook = addShutdownHook(driverDsl::shutdown)
try {
driverDsl.start()
return dsl(coerce(driverDsl))
} catch (exception: Throwable) {
DriverDSLImpl.log.error("Driver shutting down because of exception", exception)
throw exception
} finally {
driverDsl.shutdown()
shutdownHook.cancel()
serializationEnv.unset()
}
}
/**
* This is a helper method to allow extending of the DSL, along the lines of
* interface SomeOtherExposedDSLInterface : DriverDSL
* interface SomeOtherInternalDSLInterface : InternalDriverDSL, SomeOtherExposedDSLInterface
* class SomeOtherDSL(val driverDSL : DriverDSLImpl) : InternalDriverDSL by driverDSL, SomeOtherInternalDSLInterface
*
* @param coerce We need this explicit coercion witness because we can't put an extra DI : D bound in a `where` clause.
*/
fun <DI : DriverDSL, D : InternalDriverDSL, A> genericDriver(
defaultParameters: DriverParameters = DriverParameters(),
isDebug: Boolean = defaultParameters.isDebug,
driverDirectory: Path = defaultParameters.driverDirectory,
portAllocation: PortAllocation = defaultParameters.portAllocation,
debugPortAllocation: PortAllocation = defaultParameters.debugPortAllocation,
systemProperties: Map<String, String> = defaultParameters.systemProperties,
useTestClock: Boolean = defaultParameters.useTestClock,
initialiseSerialization: Boolean = defaultParameters.initialiseSerialization,
waitForNodesToFinish: Boolean = defaultParameters.waitForAllNodesToFinish,
startNodesInProcess: Boolean = defaultParameters.startNodesInProcess,
notarySpecs: List<NotarySpec>,
extraCordappPackagesToScan: List<String> = defaultParameters.extraCordappPackagesToScan,
driverDslWrapper: (DriverDSLImpl) -> D,
coerce: (D) -> DI, dsl: DI.() -> A
): A {
val serializationEnv = setGlobalSerialization(initialiseSerialization)
val driverDsl = driverDslWrapper(
DriverDSLImpl(
portAllocation = portAllocation,
debugPortAllocation = debugPortAllocation,
systemProperties = systemProperties,
driverDirectory = driverDirectory.toAbsolutePath(),
useTestClock = useTestClock,
isDebug = isDebug,
startNodesInProcess = startNodesInProcess,
waitForNodesToFinish = waitForNodesToFinish,
extraCordappPackagesToScan = extraCordappPackagesToScan,
notarySpecs = notarySpecs
)
)
val shutdownHook = addShutdownHook(driverDsl::shutdown)
try {
driverDsl.start()
return dsl(coerce(driverDsl))
} catch (exception: Throwable) {
DriverDSLImpl.log.error("Driver shutting down because of exception", exception)
throw exception
} finally {
driverDsl.shutdown()
shutdownHook.cancel()
serializationEnv.unset()
}
}
fun getTimestampAsDirectoryName(): String {
return DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(ZoneOffset.UTC).format(Instant.now())
}
fun writeConfig(path: Path, filename: String, config: Config) {
val configString = config.root().render(ConfigRenderOptions.defaults())
configString.byteInputStream().copyTo(path / filename, StandardCopyOption.REPLACE_EXISTING)
}

View File

@ -0,0 +1,93 @@
package net.corda.testing.internal
import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.times
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.millis
import net.corda.core.utilities.seconds
import org.slf4j.LoggerFactory
import java.net.Socket
import java.net.SocketException
import java.time.Duration
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
private val log = LoggerFactory.getLogger("net.corda.testing.internal.InternalTestUtils")
/**
* @throws ListenProcessDeathException if [listenProcess] dies before the check succeeds, i.e. the check can't succeed as intended.
*/
fun addressMustBeBound(executorService: ScheduledExecutorService, hostAndPort: NetworkHostAndPort, listenProcess: Process? = null) {
addressMustBeBoundFuture(executorService, hostAndPort, listenProcess).getOrThrow()
}
fun addressMustBeBoundFuture(executorService: ScheduledExecutorService, hostAndPort: NetworkHostAndPort, listenProcess: Process? = null): CordaFuture<Unit> {
return poll(executorService, "address $hostAndPort to bind") {
if (listenProcess != null && !listenProcess.isAlive) {
throw ListenProcessDeathException(hostAndPort, listenProcess)
}
try {
Socket(hostAndPort.host, hostAndPort.port).close()
Unit
} catch (_exception: SocketException) {
null
}
}
}
/*
* The default timeout value of 40 seconds have been chosen based on previous node shutdown time estimate.
* It's been observed that nodes can take up to 30 seconds to shut down, so just to stay on the safe side the 60 seconds
* timeout has been chosen.
*/
fun addressMustNotBeBound(executorService: ScheduledExecutorService, hostAndPort: NetworkHostAndPort, timeout: Duration = 40.seconds) {
addressMustNotBeBoundFuture(executorService, hostAndPort).getOrThrow(timeout)
}
fun addressMustNotBeBoundFuture(executorService: ScheduledExecutorService, hostAndPort: NetworkHostAndPort): CordaFuture<Unit> {
return poll(executorService, "address $hostAndPort to unbind") {
try {
Socket(hostAndPort.host, hostAndPort.port).close()
null
} catch (_exception: SocketException) {
Unit
}
}
}
fun <A> poll(
executorService: ScheduledExecutorService,
pollName: String,
pollInterval: Duration = 500.millis,
warnCount: Int = 120,
check: () -> A?
): CordaFuture<A> {
val resultFuture = openFuture<A>()
val task = object : Runnable {
var counter = -1
override fun run() {
if (resultFuture.isCancelled) return // Give up, caller can no longer get the result.
if (++counter == warnCount) {
log.warn("Been polling $pollName for ${(pollInterval * warnCount.toLong()).seconds} seconds...")
}
try {
val checkResult = check()
if (checkResult != null) {
resultFuture.set(checkResult)
} else {
executorService.schedule(this, pollInterval.toMillis(), TimeUnit.MILLISECONDS)
}
} catch (t: Throwable) {
resultFuture.setException(t)
}
}
}
executorService.submit(task) // The check may be expensive, so always run it in the background even the first time.
return resultFuture
}
class ListenProcessDeathException(hostAndPort: NetworkHostAndPort, listenProcess: Process) :
CordaException("The process that was expected to listen on $hostAndPort has died with status: ${listenProcess.exitValue()}")

View File

@ -12,13 +12,8 @@ import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.services.config.*
import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.configOf
import net.corda.node.services.config.parseAsNodeConfiguration
import net.corda.node.services.config.plus
import net.corda.nodeapi.internal.config.User
import net.corda.testing.SerializationEnvironmentRule
import net.corda.testing.driver.addressMustNotBeBoundFuture
import net.corda.testing.getFreeLocalPorts
import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO
import org.apache.logging.log4j.Level

View File

@ -52,159 +52,25 @@ import java.nio.file.Path
import java.nio.file.Paths
import java.util.*
interface RPCDriverExposedDSLInterface : DriverDSLExposedInterface {
/**
* Starts an In-VM RPC server. Note that only a single one may be started.
*
* @param rpcUser The single user who can access the server through RPC, and their permissions.
* @param nodeLegalName The legal name of the node to check against to authenticate a super user.
* @param configuration The RPC server configuration.
* @param ops The server-side implementation of the RPC interface.
*/
fun <I : RPCOps> startInVmRpcServer(
rpcUser: User = rpcTestUser,
nodeLegalName: CordaX500Name = fakeNodeLegalName,
maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE,
maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE,
configuration: RPCServerConfiguration = RPCServerConfiguration.default,
ops: I
): CordaFuture<RpcServerHandle>
/**
* Starts an In-VM RPC client.
*
* @param rpcOpsClass The [Class] of the RPC interface.
* @param username The username to authenticate with.
* @param password The password to authenticate with.
* @param configuration The RPC client configuration.
*/
fun <I : RPCOps> startInVmRpcClient(
rpcOpsClass: Class<I>,
username: String = rpcTestUser.username,
password: String = rpcTestUser.password,
configuration: RPCClientConfiguration = RPCClientConfiguration.default
): CordaFuture<I>
/**
* Starts an In-VM Artemis session connecting to the RPC server.
*
* @param username The username to authenticate with.
* @param password The password to authenticate with.
*/
fun startInVmArtemisSession(
username: String = rpcTestUser.username,
password: String = rpcTestUser.password
): ClientSession
/**
* Starts a Netty RPC server.
*
* @param serverName The name of the server, to be used for the folder created for Artemis files.
* @param rpcUser The single user who can access the server through RPC, and their permissions.
* @param nodeLegalName The legal name of the node to check against to authenticate a super user.
* @param configuration The RPC server configuration.
* @param ops The server-side implementation of the RPC interface.
*/
fun <I : RPCOps> startRpcServer(
serverName: String = "driver-rpc-server-${random63BitValue()}",
rpcUser: User = rpcTestUser,
nodeLegalName: CordaX500Name = fakeNodeLegalName,
maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE,
maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE,
configuration: RPCServerConfiguration = RPCServerConfiguration.default,
customPort: NetworkHostAndPort? = null,
ops: I
): CordaFuture<RpcServerHandle>
/**
* Starts a Netty RPC client.
*
* @param rpcOpsClass The [Class] of the RPC interface.
* @param rpcAddress The address of the RPC server to connect to.
* @param username The username to authenticate with.
* @param password The password to authenticate with.
* @param configuration The RPC client configuration.
*/
fun <I : RPCOps> startRpcClient(
rpcOpsClass: Class<I>,
rpcAddress: NetworkHostAndPort,
username: String = rpcTestUser.username,
password: String = rpcTestUser.password,
configuration: RPCClientConfiguration = RPCClientConfiguration.default
): CordaFuture<I>
/**
* Starts a Netty RPC client in a new JVM process that calls random RPCs with random arguments.
*
* @param rpcOpsClass The [Class] of the RPC interface.
* @param rpcAddress The address of the RPC server to connect to.
* @param username The username to authenticate with.
* @param password The password to authenticate with.
*/
fun <I : RPCOps> startRandomRpcClient(
rpcOpsClass: Class<I>,
rpcAddress: NetworkHostAndPort,
username: String = rpcTestUser.username,
password: String = rpcTestUser.password
): CordaFuture<Process>
/**
* Starts a Netty Artemis session connecting to an RPC server.
*
* @param rpcAddress The address of the RPC server.
* @param username The username to authenticate with.
* @param password The password to authenticate with.
*/
fun startArtemisSession(
rpcAddress: NetworkHostAndPort,
username: String = rpcTestUser.username,
password: String = rpcTestUser.password
): ClientSession
fun startRpcBroker(
serverName: String = "driver-rpc-server-${random63BitValue()}",
rpcUser: User = rpcTestUser,
maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE,
maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE,
customPort: NetworkHostAndPort? = null
): CordaFuture<RpcBrokerHandle>
fun startInVmRpcBroker(
rpcUser: User = rpcTestUser,
maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE,
maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE
): CordaFuture<RpcBrokerHandle>
fun <I : RPCOps> startRpcServerWithBrokerRunning(
rpcUser: User = rpcTestUser,
nodeLegalName: CordaX500Name = fakeNodeLegalName,
configuration: RPCServerConfiguration = RPCServerConfiguration.default,
ops: I,
brokerHandle: RpcBrokerHandle
): RpcServerHandle
}
inline fun <reified I : RPCOps> RPCDriverExposedDSLInterface.startInVmRpcClient(
inline fun <reified I : RPCOps> RPCDriverDSL.startInVmRpcClient(
username: String = rpcTestUser.username,
password: String = rpcTestUser.password,
configuration: RPCClientConfiguration = RPCClientConfiguration.default
) = startInVmRpcClient(I::class.java, username, password, configuration)
inline fun <reified I : RPCOps> RPCDriverExposedDSLInterface.startRandomRpcClient(
inline fun <reified I : RPCOps> RPCDriverDSL.startRandomRpcClient(
hostAndPort: NetworkHostAndPort,
username: String = rpcTestUser.username,
password: String = rpcTestUser.password
) = startRandomRpcClient(I::class.java, hostAndPort, username, password)
inline fun <reified I : RPCOps> RPCDriverExposedDSLInterface.startRpcClient(
inline fun <reified I : RPCOps> RPCDriverDSL.startRpcClient(
rpcAddress: NetworkHostAndPort,
username: String = rpcTestUser.username,
password: String = rpcTestUser.password,
configuration: RPCClientConfiguration = RPCClientConfiguration.default
) = startRpcClient(I::class.java, rpcAddress, username, password, configuration)
interface RPCDriverInternalDSLInterface : DriverDSLInternalInterface, RPCDriverExposedDSLInterface
data class RpcBrokerHandle(
val hostAndPort: NetworkHostAndPort?,
/** null if this is an InVM broker */
@ -235,26 +101,28 @@ fun <A> rpcDriver(
extraCordappPackagesToScan: List<String> = emptyList(),
notarySpecs: List<NotarySpec> = emptyList(),
externalTrace: Trace? = null,
dsl: RPCDriverExposedDSLInterface.() -> A
) = genericDriver(
driverDsl = RPCDriverDSL(
DriverDSL(
portAllocation = portAllocation,
debugPortAllocation = debugPortAllocation,
systemProperties = systemProperties,
driverDirectory = driverDirectory.toAbsolutePath(),
useTestClock = useTestClock,
isDebug = isDebug,
startNodesInProcess = startNodesInProcess,
waitForNodesToFinish = waitForNodesToFinish,
extraCordappPackagesToScan = extraCordappPackagesToScan,
notarySpecs = notarySpecs
), externalTrace
),
coerce = { it },
dsl = dsl,
initialiseSerialization = false
)
dsl: RPCDriverDSL.() -> A
): A {
return genericDriver(
driverDsl = RPCDriverDSL(
DriverDSLImpl(
portAllocation = portAllocation,
debugPortAllocation = debugPortAllocation,
systemProperties = systemProperties,
driverDirectory = driverDirectory.toAbsolutePath(),
useTestClock = useTestClock,
isDebug = isDebug,
startNodesInProcess = startNodesInProcess,
waitForNodesToFinish = waitForNodesToFinish,
extraCordappPackagesToScan = extraCordappPackagesToScan,
notarySpecs = notarySpecs
), externalTrace
),
coerce = { it },
dsl = dsl,
initialiseSerialization = false
)
}
private class SingleUserSecurityManager(val rpcUser: User) : ActiveMQSecurityManager3 {
override fun validateUser(user: String?, password: String?) = isValid(user, password)
@ -276,8 +144,8 @@ private class SingleUserSecurityManager(val rpcUser: User) : ActiveMQSecurityMan
}
data class RPCDriverDSL(
private val driverDSL: DriverDSL, private val externalTrace: Trace?
) : DriverDSLInternalInterface by driverDSL, RPCDriverInternalDSLInterface {
private val driverDSL: DriverDSLImpl, private val externalTrace: Trace?
) : InternalDriverDSL by driverDSL {
private companion object {
val notificationAddress = "notifications"
@ -340,12 +208,20 @@ data class RPCDriverDSL(
}
}
override fun <I : RPCOps> startInVmRpcServer(
rpcUser: User,
nodeLegalName: CordaX500Name,
maxFileSize: Int,
maxBufferedBytesPerClient: Long,
configuration: RPCServerConfiguration,
/**
* Starts an In-VM RPC server. Note that only a single one may be started.
*
* @param rpcUser The single user who can access the server through RPC, and their permissions.
* @param nodeLegalName The legal name of the node to check against to authenticate a super user.
* @param configuration The RPC server configuration.
* @param ops The server-side implementation of the RPC interface.
*/
fun <I : RPCOps> startInVmRpcServer(
rpcUser: User = rpcTestUser,
nodeLegalName: CordaX500Name = fakeNodeLegalName,
maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE,
maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE,
configuration: RPCServerConfiguration = RPCServerConfiguration.default,
ops: I
): CordaFuture<RpcServerHandle> {
return startInVmRpcBroker(rpcUser, maxFileSize, maxBufferedBytesPerClient).map { broker ->
@ -353,7 +229,20 @@ data class RPCDriverDSL(
}
}
override fun <I : RPCOps> startInVmRpcClient(rpcOpsClass: Class<I>, username: String, password: String, configuration: RPCClientConfiguration): CordaFuture<I> {
/**
* Starts an In-VM RPC client.
*
* @param rpcOpsClass The [Class] of the RPC interface.
* @param username The username to authenticate with.
* @param password The password to authenticate with.
* @param configuration The RPC client configuration.
*/
fun <I : RPCOps> startInVmRpcClient(
rpcOpsClass: Class<I>,
username: String = rpcTestUser.username,
password: String = rpcTestUser.password,
configuration: RPCClientConfiguration = RPCClientConfiguration.default
): CordaFuture<I> {
return driverDSL.executorService.fork {
val client = RPCClient<I>(inVmClientTransportConfiguration, configuration)
val connection = client.start(rpcOpsClass, username, password, externalTrace)
@ -364,7 +253,16 @@ data class RPCDriverDSL(
}
}
override fun startInVmArtemisSession(username: String, password: String): ClientSession {
/**
* Starts an In-VM Artemis session connecting to the RPC server.
*
* @param username The username to authenticate with.
* @param password The password to authenticate with.
*/
fun startInVmArtemisSession(
username: String = rpcTestUser.username,
password: String = rpcTestUser.password
): ClientSession {
val locator = ActiveMQClient.createServerLocatorWithoutHA(inVmClientTransportConfiguration)
val sessionFactory = locator.createSessionFactory()
val session = sessionFactory.createSession(username, password, false, true, true, locator.isPreAcknowledge, DEFAULT_ACK_BATCH_SIZE)
@ -376,14 +274,23 @@ data class RPCDriverDSL(
return session
}
override fun <I : RPCOps> startRpcServer(
serverName: String,
rpcUser: User,
nodeLegalName: CordaX500Name,
maxFileSize: Int,
maxBufferedBytesPerClient: Long,
configuration: RPCServerConfiguration,
customPort: NetworkHostAndPort?,
/**
* Starts a Netty RPC server.
*
* @param serverName The name of the server, to be used for the folder created for Artemis files.
* @param rpcUser The single user who can access the server through RPC, and their permissions.
* @param nodeLegalName The legal name of the node to check against to authenticate a super user.
* @param configuration The RPC server configuration.
* @param ops The server-side implementation of the RPC interface.
*/
fun <I : RPCOps> startRpcServer(
serverName: String = "driver-rpc-server-${random63BitValue()}",
rpcUser: User = rpcTestUser,
nodeLegalName: CordaX500Name = fakeNodeLegalName,
maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE,
maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE,
configuration: RPCServerConfiguration = RPCServerConfiguration.default,
customPort: NetworkHostAndPort? = null,
ops: I
): CordaFuture<RpcServerHandle> {
return startRpcBroker(serverName, rpcUser, maxFileSize, maxBufferedBytesPerClient, customPort).map { broker ->
@ -391,12 +298,21 @@ data class RPCDriverDSL(
}
}
override fun <I : RPCOps> startRpcClient(
/**
* Starts a Netty RPC client.
*
* @param rpcOpsClass The [Class] of the RPC interface.
* @param rpcAddress The address of the RPC server to connect to.
* @param username The username to authenticate with.
* @param password The password to authenticate with.
* @param configuration The RPC client configuration.
*/
fun <I : RPCOps> startRpcClient(
rpcOpsClass: Class<I>,
rpcAddress: NetworkHostAndPort,
username: String,
password: String,
configuration: RPCClientConfiguration
username: String = rpcTestUser.username,
password: String = rpcTestUser.password,
configuration: RPCClientConfiguration = RPCClientConfiguration.default
): CordaFuture<I> {
return driverDSL.executorService.fork {
val client = RPCClient<I>(ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), rpcAddress, null), configuration)
@ -408,13 +324,37 @@ data class RPCDriverDSL(
}
}
override fun <I : RPCOps> startRandomRpcClient(rpcOpsClass: Class<I>, rpcAddress: NetworkHostAndPort, username: String, password: String): CordaFuture<Process> {
/**
* Starts a Netty RPC client in a new JVM process that calls random RPCs with random arguments.
*
* @param rpcOpsClass The [Class] of the RPC interface.
* @param rpcAddress The address of the RPC server to connect to.
* @param username The username to authenticate with.
* @param password The password to authenticate with.
*/
fun <I : RPCOps> startRandomRpcClient(
rpcOpsClass: Class<I>,
rpcAddress: NetworkHostAndPort,
username: String = rpcTestUser.username,
password: String = rpcTestUser.password
): CordaFuture<Process> {
val process = ProcessUtilities.startJavaProcess<RandomRpcUser>(listOf(rpcOpsClass.name, rpcAddress.toString(), username, password))
driverDSL.shutdownManager.registerProcessShutdown(process)
return doneFuture(process)
}
override fun startArtemisSession(rpcAddress: NetworkHostAndPort, username: String, password: String): ClientSession {
/**
* Starts a Netty Artemis session connecting to an RPC server.
*
* @param rpcAddress The address of the RPC server.
* @param username The username to authenticate with.
* @param password The password to authenticate with.
*/
fun startArtemisSession(
rpcAddress: NetworkHostAndPort,
username: String = rpcTestUser.username,
password: String = rpcTestUser.password
): ClientSession {
val locator = ActiveMQClient.createServerLocatorWithoutHA(createNettyClientTransportConfiguration(rpcAddress))
val sessionFactory = locator.createSessionFactory()
val session = sessionFactory.createSession(username, password, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
@ -427,12 +367,12 @@ data class RPCDriverDSL(
return session
}
override fun startRpcBroker(
serverName: String,
rpcUser: User,
maxFileSize: Int,
maxBufferedBytesPerClient: Long,
customPort: NetworkHostAndPort?
fun startRpcBroker(
serverName: String = "driver-rpc-server-${random63BitValue()}",
rpcUser: User = rpcTestUser,
maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE,
maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE,
customPort: NetworkHostAndPort? = null
): CordaFuture<RpcBrokerHandle> {
val hostAndPort = customPort ?: driverDSL.portAllocation.nextHostAndPort()
addressMustNotBeBound(driverDSL.executorService, hostAndPort)
@ -452,7 +392,11 @@ data class RPCDriverDSL(
}
}
override fun startInVmRpcBroker(rpcUser: User, maxFileSize: Int, maxBufferedBytesPerClient: Long): CordaFuture<RpcBrokerHandle> {
fun startInVmRpcBroker(
rpcUser: User = rpcTestUser,
maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE,
maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE
): CordaFuture<RpcBrokerHandle> {
return driverDSL.executorService.fork {
val artemisConfig = createInVmRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient)
val server = EmbeddedActiveMQ()
@ -471,10 +415,10 @@ data class RPCDriverDSL(
}
}
override fun <I : RPCOps> startRpcServerWithBrokerRunning(
rpcUser: User,
nodeLegalName: CordaX500Name,
configuration: RPCServerConfiguration,
fun <I : RPCOps> startRpcServerWithBrokerRunning(
rpcUser: User = rpcTestUser,
nodeLegalName: CordaX500Name = fakeNodeLegalName,
configuration: RPCServerConfiguration = RPCServerConfiguration.default,
ops: I,
brokerHandle: RpcBrokerHandle
): RpcServerHandle {

View File

@ -1,4 +1,4 @@
package net.corda.testing.driver
package net.corda.testing.internal
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.ThreadBox

View File

@ -8,7 +8,7 @@ import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.testing.driver.DriverDSL
import net.corda.testing.internal.DriverDSLImpl
import net.corda.testing.driver.PortAllocation
import net.corda.testing.driver.driver
@ -49,8 +49,8 @@ private fun CordformDefinition.runNodes(waitForAllNodesToFinish: Boolean, block:
portAllocation = PortAllocation.Incremental(maxPort + 1),
waitForAllNodesToFinish = waitForAllNodesToFinish
) {
this as DriverDSLImpl // access internal API
setup(this)
this as DriverDSL // startCordformNode is an internal API
nodes.map {
val startedNode = startCordformNode(it)
if (it.webAddress != null) {

View File

@ -3,7 +3,7 @@ package net.corda.testing.internal.performance
import com.codahale.metrics.Gauge
import com.codahale.metrics.MetricRegistry
import com.google.common.base.Stopwatch
import net.corda.testing.driver.ShutdownManager
import net.corda.testing.internal.ShutdownManager
import java.time.Duration
import java.util.*
import java.util.concurrent.CountDownLatch

View File

@ -3,7 +3,7 @@ package net.corda.testing.internal.performance
import com.codahale.metrics.ConsoleReporter
import com.codahale.metrics.JmxReporter
import com.codahale.metrics.MetricRegistry
import net.corda.testing.driver.ShutdownManager
import net.corda.testing.internal.ShutdownManager
import java.util.concurrent.TimeUnit
import javax.management.ObjectName
import kotlin.concurrent.thread

View File

@ -1,29 +0,0 @@
@file:JvmName("X500NameUtils")
package net.corda.testing
import org.bouncycastle.asn1.x500.X500Name
import org.bouncycastle.asn1.x500.X500NameBuilder
import org.bouncycastle.asn1.x500.style.BCStyle
/**
* Generate a distinguished name from the provided X500 .
*
* @param O organisation name.
* @param L locality.
* @param C county.
* @param CN common name.
* @param OU organisation unit.
* @param ST state.
*/
@JvmOverloads
fun getX500Name(O: String, L: String, C: String, CN: String? = null, OU: String? = null, ST: String? = null): X500Name {
return X500NameBuilder(BCStyle.INSTANCE).apply {
addRDN(BCStyle.C, C)
ST?.let { addRDN(BCStyle.ST, it) }
addRDN(BCStyle.L, L)
addRDN(BCStyle.O, O)
OU?.let { addRDN(BCStyle.OU, it) }
CN?.let { addRDN(BCStyle.CN, it) }
}.build()
}

View File

@ -16,14 +16,16 @@ import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.node.services.config.configureDevKeyAndTrustStores
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.VerifierApi
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
import net.corda.nodeapi.internal.config.SSLConfiguration
import net.corda.testing.driver.*
import net.corda.testing.internal.ProcessUtilities
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.PortAllocation
import net.corda.testing.driver.driver
import net.corda.testing.internal.*
import net.corda.testing.node.NotarySpec
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
@ -43,34 +45,6 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.atomic.AtomicInteger
/**
* This file defines an extension to [DriverDSL] that allows starting of verifier processes and
* lightweight verification requestors.
*/
interface VerifierExposedDSLInterface : DriverDSLExposedInterface {
/** Starts a lightweight verification requestor that implements the Node's Verifier API */
fun startVerificationRequestor(name: CordaX500Name): CordaFuture<VerificationRequestorHandle>
/** Starts an out of process verifier connected to [address] */
fun startVerifier(address: NetworkHostAndPort): CordaFuture<VerifierHandle>
/**
* Waits until [number] verifiers are listening for verification requests coming from the Node. Check
* [VerificationRequestorHandle.waitUntilNumberOfVerifiers] for an equivalent for requestors.
*/
fun NodeHandle.waitUntilNumberOfVerifiers(number: Int)
}
/** Starts a verifier connecting to the specified node */
fun VerifierExposedDSLInterface.startVerifier(nodeHandle: NodeHandle) =
startVerifier(nodeHandle.configuration.p2pAddress)
/** Starts a verifier connecting to the specified requestor */
fun VerifierExposedDSLInterface.startVerifier(verificationRequestorHandle: VerificationRequestorHandle) =
startVerifier(verificationRequestorHandle.p2pAddress)
interface VerifierInternalDSLInterface : DriverDSLInternalInterface, VerifierExposedDSLInterface
/**
* Behaves the same as [driver] and adds verifier-related functionality.
*/
@ -85,10 +59,10 @@ fun <A> verifierDriver(
waitForNodesToFinish: Boolean = false,
extraCordappPackagesToScan: List<String> = emptyList(),
notarySpecs: List<NotarySpec> = emptyList(),
dsl: VerifierExposedDSLInterface.() -> A
dsl: VerifierDriverDSL.() -> A
) = genericDriver(
driverDsl = VerifierDriverDSL(
DriverDSL(
DriverDSLImpl(
portAllocation = portAllocation,
debugPortAllocation = debugPortAllocation,
systemProperties = systemProperties,
@ -143,10 +117,8 @@ data class VerificationRequestorHandle(
}
data class VerifierDriverDSL(
val driverDSL: DriverDSL
) : DriverDSLInternalInterface by driverDSL, VerifierInternalDSLInterface {
val verifierCount = AtomicInteger(0)
data class VerifierDriverDSL(private val driverDSL: DriverDSLImpl) : InternalDriverDSL by driverDSL {
private val verifierCount = AtomicInteger(0)
companion object {
private val log = contextLogger()
@ -183,7 +155,8 @@ data class VerifierDriverDSL(
}
}
override fun startVerificationRequestor(name: CordaX500Name): CordaFuture<VerificationRequestorHandle> {
/** Starts a lightweight verification requestor that implements the Node's Verifier API */
fun startVerificationRequestor(name: CordaX500Name): CordaFuture<VerificationRequestorHandle> {
val hostAndPort = driverDSL.portAllocation.nextHostAndPort()
return driverDSL.executorService.fork {
startVerificationRequestorInternal(name, hostAndPort)
@ -255,7 +228,8 @@ data class VerifierDriverDSL(
)
}
override fun startVerifier(address: NetworkHostAndPort): CordaFuture<VerifierHandle> {
/** Starts an out of process verifier connected to [address] */
fun startVerifier(address: NetworkHostAndPort): CordaFuture<VerifierHandle> {
log.info("Starting verifier connecting to address $address")
val id = verifierCount.andIncrement
val jdwpPort = if (driverDSL.isDebug) driverDSL.debugPortAllocation.nextPort() else null
@ -270,6 +244,16 @@ data class VerifierDriverDSL(
return doneFuture(VerifierHandle(process))
}
/** Starts a verifier connecting to the specified node */
fun startVerifier(nodeHandle: NodeHandle): CordaFuture<VerifierHandle> {
return startVerifier(nodeHandle.configuration.p2pAddress)
}
/** Starts a verifier connecting to the specified requestor */
fun startVerifier(verificationRequestorHandle: VerificationRequestorHandle): CordaFuture<VerifierHandle> {
return startVerifier(verificationRequestorHandle.p2pAddress)
}
private fun <A> NodeHandle.connectToNode(closure: (ClientSession) -> A): A {
val transport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), configuration.p2pAddress, configuration)
val locator = ActiveMQClient.createServerLocatorWithoutHA(transport)
@ -280,7 +264,11 @@ data class VerifierDriverDSL(
}
}
override fun NodeHandle.waitUntilNumberOfVerifiers(number: Int) {
/**
* Waits until [number] verifiers are listening for verification requests coming from the Node. Check
* [VerificationRequestorHandle.waitUntilNumberOfVerifiers] for an equivalent for requestors.
*/
fun NodeHandle.waitUntilNumberOfVerifiers(number: Int) {
connectToNode { session ->
poll(driverDSL.executorService, "$number verifiers to come online") {
if (session.queueQuery(SimpleString(VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME)).consumerCount >= number) {

View File

@ -4,8 +4,8 @@ import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.testing.DUMMY_BANK_A
import net.corda.testing.driver.WebserverHandle
import net.corda.testing.driver.addressMustBeBound
import net.corda.testing.driver.addressMustNotBeBound
import net.corda.testing.internal.addressMustBeBound
import net.corda.testing.internal.addressMustNotBeBound
import net.corda.testing.driver.driver
import org.junit.Test
import java.util.concurrent.Executors