diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index 74e0152c47..6db3a1c656 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -1,15 +1,17 @@ package com.r3corda.node.driver import com.google.common.net.HostAndPort +import com.r3corda.core.ThreadBox import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.generateKeyPair -import com.r3corda.core.messaging.MessagingService import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.node.services.ServiceType import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfigurationFromConfig import com.r3corda.node.services.messaging.ArtemisMessagingClient +import com.r3corda.node.services.messaging.ArtemisMessagingComponent +import com.r3corda.node.services.messaging.ArtemisMessagingServer import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.utilities.AffinityExecutor @@ -25,9 +27,7 @@ import java.net.URLClassLoader import java.nio.file.Paths import java.text.SimpleDateFormat import java.util.* -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit -import java.util.concurrent.TimeoutException +import java.util.concurrent.* import kotlin.concurrent.thread /** @@ -45,15 +45,45 @@ import kotlin.concurrent.thread private val log: Logger = LoggerFactory.getLogger(DriverDSL::class.java) /** - * This is the interface that's exposed to + * This is the interface that's exposed to DSL users. */ interface DriverDSLExposedInterface { - fun startNode(providedName: String? = null, advertisedServices: Set = setOf()): NodeInfo + /** + * Starts a [Node] in a separate process. + * + * @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something + * random. Note that this must be unique as the driver uses it as a primary key! + * @param advertisedServices The set of services to be advertised by the node. Defaults to empty set. + * @return The [NodeInfo] of the started up node retrieved from the network map service. + */ + fun startNode(providedName: String? = null, advertisedServices: Set = setOf()): Future + + /** + * Starts an [ArtemisMessagingClient]. + * + * @param providedName name of the client, which will be used for creating its directory. + * @param serverAddress the artemis server to connect to, for example a [Node]. + * @param clientAddress the address of the client (this is not bound by the client!), defaults to [serverAddress] if null. + */ + fun startClient(providedName: String, serverAddress: HostAndPort, clientAddress: HostAndPort?): Future + /** + * Starts a local [ArtemisMessagingServer] of which there may only be one. + */ + fun startLocalServer(): Future fun waitForAllNodesToFinish() - val messagingService: MessagingService val networkMapCache: NetworkMapCache } +fun DriverDSLExposedInterface.startClient(localServer: ArtemisMessagingServer) = + startClient("driver-local-server-client", localServer.myHostPort, localServer.myHostPort) + +fun DriverDSLExposedInterface.startClient(remoteNodeInfo: NodeInfo, providedName: String? = null) = + startClient( + providedName = providedName ?: "${remoteNodeInfo.identity.name}-client", + serverAddress = (remoteNodeInfo.address as ArtemisMessagingComponent.Address).hostAndPort, + clientAddress = null + ) + interface DriverDSLInternalInterface : DriverDSLExposedInterface { fun start() fun shutdown() @@ -80,27 +110,32 @@ sealed class PortAllocation { * (...) * } * - * The driver implicitly bootstraps a [NetworkMapService] that may be accessed through a local cache [DriverDSL.networkMapCache] - * The driver is an artemis node itself, the messaging service may be accessed by [DriverDSL.messagingService] + * Note that [DriverDSL.startNode] does not wait for the node to start up synchronously, but rather returns a [Future] + * of the [NodeInfo] that may be waited on, which completes when the new node registered with the network map service. + * + * The driver implicitly bootstraps a [NetworkMapService] that may be accessed through a local cache [DriverDSL.networkMapCache]. * * @param baseDirectory The base directory node directories go into, defaults to "build//". The node * directories themselves are "//", where legalName defaults to "-" * and may be specified in [DriverDSL.startNode]. * @param portAllocation The port allocation strategy to use for the messaging and the web server addresses. Defaults to incremental. * @param debugPortAllocation The port allocation strategy to use for jvm debugging. Defaults to incremental. - * @param dsl The dsl itself - * @return The value returned in the [dsl] closure + * @param isDebug Indicates whether the spawned nodes should start in jdwt debug mode. + * @param dsl The dsl itself. + * @return The value returned in the [dsl] closure. */ fun driver( baseDirectory: String = "build/${getTimestampAsDirectoryName()}", portAllocation: PortAllocation = PortAllocation.Incremental(10000), debugPortAllocation: PortAllocation = PortAllocation.Incremental(5005), + isDebug: Boolean = false, dsl: DriverDSLExposedInterface.() -> A ) = genericDriver( driverDsl = DriverDSL( - portAllocation = portAllocation, - debugPortAllocation = debugPortAllocation, - baseDirectory = baseDirectory + portAllocation = portAllocation, + debugPortAllocation = debugPortAllocation, + baseDirectory = baseDirectory, + isDebug = isDebug ), coerce = { it }, dsl = dsl @@ -112,7 +147,7 @@ fun driver( * interface SomeOtherInternalDSLInterface : DriverDSLInternalInterface, SomeOtherExposedDSLInterface * class SomeOtherDSL(val driverDSL : DriverDSL) : DriverDSLInternalInterface by driverDSL, SomeOtherInternalDSLInterface * - * @param coerce We need this explicit coercion witness because we can't put an extra DI : D bound in a `where` clause + * @param coerce We need this explicit coercion witness because we can't put an extra DI : D bound in a `where` clause. */ fun genericDriver( driverDsl: D, @@ -144,7 +179,7 @@ private fun getTimestampAsDirectoryName(): String { } fun addressMustBeBound(hostAndPort: HostAndPort) { - poll { + poll("address $hostAndPort to bind") { try { Socket(hostAndPort.hostText, hostAndPort.port).close() Unit @@ -155,7 +190,7 @@ fun addressMustBeBound(hostAndPort: HostAndPort) { } fun addressMustNotBeBound(hostAndPort: HostAndPort) { - poll { + poll("address $hostAndPort to unbind") { try { Socket(hostAndPort.hostText, hostAndPort.port).close() null @@ -165,31 +200,37 @@ fun addressMustNotBeBound(hostAndPort: HostAndPort) { } } -fun poll(f: () -> A?): A { +fun poll(pollName: String, pollIntervalMs: Long = 500, warnCount: Int = 120, f: () -> A?): A { var counter = 0 var result = f() - while (result == null && counter < 120) { - counter++ - Thread.sleep(500) + while (result == null) { + if (counter == warnCount) { + log.warn("Been polling $pollName for ${pollIntervalMs * warnCount / 1000.0} seconds...") + } + counter = (counter % warnCount) + 1 + Thread.sleep(pollIntervalMs) result = f() } - if (result == null) { - throw Exception("Poll timed out") - } return result } class DriverDSL( val portAllocation: PortAllocation, val debugPortAllocation: PortAllocation, - val baseDirectory: String + val baseDirectory: String, + val isDebug: Boolean ) : DriverDSLInternalInterface { - override val networkMapCache = InMemoryNetworkMapCache() private val networkMapName = "NetworkMapService" private val networkMapAddress = portAllocation.nextHostAndPort() private var networkMapNodeInfo: NodeInfo? = null - private val registeredProcesses = LinkedList() + + class State { + val registeredProcesses = LinkedList() + val clients = LinkedList() + var localServer: ArtemisMessagingServer? = null + } + private val state = ThreadBox(State()) //TODO: remove this once we can bundle quasar properly. private val quasarJarPath: String by lazy { @@ -200,35 +241,24 @@ class DriverDSL( Paths.get(quasarFileUrl.toURI()).toString() } - val driverNodeConfiguration = NodeConfigurationFromConfig( - NodeConfiguration.loadConfig( - baseDirectoryPath = Paths.get(baseDirectory, "driver-artemis"), - allowMissingConfig = true, - configOverrides = mapOf( - "myLegalName" to "driver-artemis" - ) - ) - ) - - override val messagingService = ArtemisMessagingClient( - Paths.get(baseDirectory, "driver-artemis"), - driverNodeConfiguration, - serverHostPort = networkMapAddress, - myHostPort = portAllocation.nextHostAndPort(), - executor = AffinityExecutor.ServiceAffinityExecutor("Client thread", 1) - ) - var messagingServiceStarted = false - - fun registerProcess(process: Process) = registeredProcesses.push(process) + fun registerProcess(process: Process) = state.locked { registeredProcesses.push(process) } override fun waitForAllNodesToFinish() { - registeredProcesses.forEach { - it.waitFor() + state.locked { + registeredProcesses.forEach { + it.waitFor() + } } } override fun shutdown() { - registeredProcesses.forEach(Process::destroy) + state.locked { + clients.forEach { + it.stop() + } + localServer?.stop() + registeredProcesses.forEach(Process::destroy) + } /** Wait 5 seconds, then [Process.destroyForcibly] */ val finishedFuture = Executors.newSingleThreadExecutor().submit { waitForAllNodesToFinish() @@ -237,30 +267,24 @@ class DriverDSL( finishedFuture.get(5, TimeUnit.SECONDS) } catch (exception: TimeoutException) { finishedFuture.cancel(true) - registeredProcesses.forEach { - it.destroyForcibly() + state.locked { + registeredProcesses.forEach { + it.destroyForcibly() + } } } - if (messagingServiceStarted) - messagingService.stop() // Check that we shut down properly - addressMustNotBeBound(messagingService.myHostPort) + state.locked { + localServer?.run { addressMustNotBeBound(myHostPort) } + } addressMustNotBeBound(networkMapAddress) } - /** - * Starts a [Node] in a separate process. - * - * @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something - * random. Note that this must be unique as the driver uses it as a primary key! - * @param advertisedServices The set of services to be advertised by the node. Defaults to empty set. - * @return The [NodeInfo] of the started up node retrieved from the network map service. - */ - override fun startNode(providedName: String?, advertisedServices: Set): NodeInfo { + override fun startNode(providedName: String?, advertisedServices: Set): Future { val messagingAddress = portAllocation.nextHostAndPort() val apiAddress = portAllocation.nextHostAndPort() - val debugPort = debugPortAllocation.nextPort() + val debugPort = if (isDebug) debugPortAllocation.nextPort() else null val name = providedName ?: "${pickA(name)}-${messagingAddress.port}" val nodeDirectory = "$baseDirectory/$name" @@ -282,26 +306,86 @@ class DriverDSL( apiAddress = apiAddress, baseDirectory = nodeDirectory ) - registerProcess(DriverDSL.startNode(config, driverCliParams, name, quasarJarPath, debugPort)) - return poll { - networkMapCache.partyNodes.forEach { - if (it.identity.name == name) { - return@poll it + return Executors.newSingleThreadExecutor().submit(Callable { + registerProcess(DriverDSL.startNode(config, driverCliParams, name, quasarJarPath, debugPort)) + poll("network map cache for $name") { + networkMapCache.partyNodes.forEach { + if (it.identity.name == name) { + return@poll it + } } + null } - null - } + }) } + override fun startClient( + providedName: String, + serverAddress: HostAndPort, + clientAddress: HostAndPort? + ): Future { + + val nodeConfiguration = NodeConfigurationFromConfig( + NodeConfiguration.loadConfig( + baseDirectoryPath = Paths.get(baseDirectory, providedName), + allowMissingConfig = true, + configOverrides = mapOf( + "myLegalName" to providedName + ) + ) + ) + val client = ArtemisMessagingClient( + Paths.get(baseDirectory, providedName), + nodeConfiguration, + serverHostPort = serverAddress, + myHostPort = clientAddress ?: serverAddress, + executor = AffinityExecutor.ServiceAffinityExecutor(providedName, 1) + ) + + return Executors.newSingleThreadExecutor().submit(Callable { + client.configureWithDevSSLCertificate() + client.start() + thread { client.run() } + state.locked { + clients.add(client) + } + client + }) + } + + override fun startLocalServer(): Future { + val name = "driver-local-server" + val config = NodeConfigurationFromConfig( + NodeConfiguration.loadConfig( + baseDirectoryPath = Paths.get(baseDirectory, name), + allowMissingConfig = true, + configOverrides = mapOf( + "myLegalName" to name + ) + ) + ) + val server = ArtemisMessagingServer( + Paths.get(baseDirectory, name), + config, + portAllocation.nextHostAndPort() + ) + return Executors.newSingleThreadExecutor().submit(Callable { + server.configureWithDevSSLCertificate() + server.start() + state.locked { + localServer = server + } + server + }) + } + + override fun start() { startNetworkMapService() - messagingService.configureWithDevSSLCertificate() - messagingService.start() - thread { messagingService.run() } - messagingServiceStarted = true + val networkMapClient = startClient("driver-$networkMapName-client", networkMapAddress, portAllocation.nextHostAndPort()).get() // We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from - // the network map service itself + // the network map service itself. val fakeNodeInfo = NodeInfo( address = ArtemisMessagingClient.makeRecipient(networkMapAddress), identity = Party( @@ -310,8 +394,8 @@ class DriverDSL( ), advertisedServices = setOf(NetworkMapService.Type) ) - networkMapCache.addMapService(messagingService, fakeNodeInfo, true) - networkMapNodeInfo = poll { + networkMapCache.addMapService(networkMapClient, fakeNodeInfo, true) + networkMapNodeInfo = poll("network map cache for $networkMapName") { networkMapCache.partyNodes.forEach { if (it.identity.name == networkMapName) { return@poll it @@ -323,7 +407,7 @@ class DriverDSL( private fun startNetworkMapService() { val apiAddress = portAllocation.nextHostAndPort() - val debugPort = debugPortAllocation.nextPort() + val debugPort = if (isDebug) debugPortAllocation.nextPort() else null val nodeDirectory = "$baseDirectory/$networkMapName" @@ -363,7 +447,7 @@ class DriverDSL( cliParams: NodeRunner.CliParams, legalName: String, quasarJarPath: String, - debugPort: Int + debugPort: Int? ): Process { // Write node.conf @@ -375,9 +459,12 @@ class DriverDSL( val path = System.getProperty("java.home") + separator + "bin" + separator + "java" val javaArgs = listOf(path) + listOf("-Dname=$legalName", "-javaagent:$quasarJarPath", - "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$debugPort", "-cp", classpath, className) + - cliParams.toCliArguments() + cliParams.toCliArguments() + + if (debugPort != null) + listOf("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$debugPort") + else + listOf() val builder = ProcessBuilder(javaArgs) builder.redirectError(Paths.get("error.$className.log").toFile()) builder.inheritIO() diff --git a/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt index 58d448e643..311314cb7c 100644 --- a/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt @@ -13,7 +13,7 @@ class DriverTests { fun nodeMustBeUp(networkMapCache: NetworkMapCache, nodeInfo: NodeInfo, nodeName: String) { val address = nodeInfo.address as ArtemisMessagingComponent.Address // Check that the node is registered in the network map - poll { + poll("network map cache for $nodeName") { networkMapCache.get().firstOrNull { it.identity.name == nodeName } @@ -35,9 +35,9 @@ class DriverTests { val notary = startNode("TestNotary", setOf(NotaryService.Type)) val regulator = startNode("Regulator", setOf(RegulatorService.Type)) - nodeMustBeUp(networkMapCache, notary, "TestNotary") - nodeMustBeUp(networkMapCache, regulator, "Regulator") - Pair(notary, regulator) + nodeMustBeUp(networkMapCache, notary.get(), "TestNotary") + nodeMustBeUp(networkMapCache, regulator.get(), "Regulator") + Pair(notary.get(), regulator.get()) } nodeMustBeDown(notary) nodeMustBeDown(regulator) @@ -47,8 +47,8 @@ class DriverTests { fun startingNodeWithNoServicesWorks() { val noService = driver { val noService = startNode("NoService") - nodeMustBeUp(networkMapCache, noService, "NoService") - noService + nodeMustBeUp(networkMapCache, noService.get(), "NoService") + noService.get() } nodeMustBeDown(noService) } @@ -57,8 +57,8 @@ class DriverTests { fun randomFreePortAllocationWorks() { val nodeInfo = driver(portAllocation = PortAllocation.RandomFree()) { val nodeInfo = startNode("NoService") - nodeMustBeUp(networkMapCache, nodeInfo, "NoService") - nodeInfo + nodeMustBeUp(networkMapCache, nodeInfo.get(), "NoService") + nodeInfo.get() } nodeMustBeDown(nodeInfo) }