From 9cbdf001fb147bf3435e9e91c0e3c8cbe71424dd Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Thu, 18 Aug 2016 11:22:48 +0100 Subject: [PATCH] node-driver: Return Futures instead of waiting for full node startup --- .../kotlin/com/r3corda/node/driver/Driver.kt | 77 +++++++++++-------- .../com/r3corda/node/driver/DriverTests.kt | 14 ++-- 2 files changed, 53 insertions(+), 38 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index 74e0152c47..7f8816de45 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -1,6 +1,7 @@ package com.r3corda.node.driver import com.google.common.net.HostAndPort +import com.r3corda.core.ThreadBox import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.generateKeyPair import com.r3corda.core.messaging.MessagingService @@ -25,9 +26,7 @@ import java.net.URLClassLoader import java.nio.file.Paths import java.text.SimpleDateFormat import java.util.* -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit -import java.util.concurrent.TimeoutException +import java.util.concurrent.* import kotlin.concurrent.thread /** @@ -48,7 +47,15 @@ private val log: Logger = LoggerFactory.getLogger(DriverDSL::class.java) * This is the interface that's exposed to */ interface DriverDSLExposedInterface { - fun startNode(providedName: String? = null, advertisedServices: Set = setOf()): NodeInfo + /** + * Starts a [Node] in a separate process. + * + * @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something + * random. Note that this must be unique as the driver uses it as a primary key! + * @param advertisedServices The set of services to be advertised by the node. Defaults to empty set. + * @return The [NodeInfo] of the started up node retrieved from the network map service. + */ + fun startNode(providedName: String? = null, advertisedServices: Set = setOf()): Future fun waitForAllNodesToFinish() val messagingService: MessagingService val networkMapCache: NetworkMapCache @@ -80,6 +87,9 @@ sealed class PortAllocation { * (...) * } * + * Note that [DriverDSL.startNode] does not wait for the node to start up synchronously returns a [Future] of the + * [NodeInfo] that may be waited on, which guarantees that the new node registered with the network map service. + * * The driver implicitly bootstraps a [NetworkMapService] that may be accessed through a local cache [DriverDSL.networkMapCache] * The driver is an artemis node itself, the messaging service may be accessed by [DriverDSL.messagingService] * @@ -184,12 +194,17 @@ class DriverDSL( val debugPortAllocation: PortAllocation, val baseDirectory: String ) : DriverDSLInternalInterface { - override val networkMapCache = InMemoryNetworkMapCache() private val networkMapName = "NetworkMapService" private val networkMapAddress = portAllocation.nextHostAndPort() private var networkMapNodeInfo: NodeInfo? = null - private val registeredProcesses = LinkedList() + + class State { + val registeredProcesses = LinkedList() + val clients = LinkedList() + var localServer: ArtemisMessagingServer? = null + } + private val state = ThreadBox(State()) //TODO: remove this once we can bundle quasar properly. private val quasarJarPath: String by lazy { @@ -219,16 +234,20 @@ class DriverDSL( ) var messagingServiceStarted = false - fun registerProcess(process: Process) = registeredProcesses.push(process) + fun registerProcess(process: Process) = state.locked { registeredProcesses.push(process) } override fun waitForAllNodesToFinish() { - registeredProcesses.forEach { - it.waitFor() + state.locked { + registeredProcesses.forEach { + it.waitFor() + } } } override fun shutdown() { - registeredProcesses.forEach(Process::destroy) + state.locked { + registeredProcesses.forEach(Process::destroy) + } /** Wait 5 seconds, then [Process.destroyForcibly] */ val finishedFuture = Executors.newSingleThreadExecutor().submit { waitForAllNodesToFinish() @@ -237,27 +256,21 @@ class DriverDSL( finishedFuture.get(5, TimeUnit.SECONDS) } catch (exception: TimeoutException) { finishedFuture.cancel(true) - registeredProcesses.forEach { - it.destroyForcibly() + state.locked { + registeredProcesses.forEach { + it.destroyForcibly() + } } } - if (messagingServiceStarted) - messagingService.stop() // Check that we shut down properly - addressMustNotBeBound(messagingService.myHostPort) + state.locked { + localServer?.run { addressMustNotBeBound(myHostPort) } + } addressMustNotBeBound(networkMapAddress) } - /** - * Starts a [Node] in a separate process. - * - * @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something - * random. Note that this must be unique as the driver uses it as a primary key! - * @param advertisedServices The set of services to be advertised by the node. Defaults to empty set. - * @return The [NodeInfo] of the started up node retrieved from the network map service. - */ - override fun startNode(providedName: String?, advertisedServices: Set): NodeInfo { + override fun startNode(providedName: String?, advertisedServices: Set): Future { val messagingAddress = portAllocation.nextHostAndPort() val apiAddress = portAllocation.nextHostAndPort() val debugPort = debugPortAllocation.nextPort() @@ -282,16 +295,18 @@ class DriverDSL( apiAddress = apiAddress, baseDirectory = nodeDirectory ) - registerProcess(DriverDSL.startNode(config, driverCliParams, name, quasarJarPath, debugPort)) - return poll { - networkMapCache.partyNodes.forEach { - if (it.identity.name == name) { - return@poll it + return Executors.newSingleThreadExecutor().submit(Callable { + registerProcess(DriverDSL.startNode(config, driverCliParams, name, quasarJarPath, debugPort)) + poll("network map cache for $name") { + networkMapCache.partyNodes.forEach { + if (it.identity.name == name) { + return@poll it + } } + null } - null - } + }) } override fun start() { diff --git a/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt index 58d448e643..34b7e41790 100644 --- a/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt @@ -35,9 +35,9 @@ class DriverTests { val notary = startNode("TestNotary", setOf(NotaryService.Type)) val regulator = startNode("Regulator", setOf(RegulatorService.Type)) - nodeMustBeUp(networkMapCache, notary, "TestNotary") - nodeMustBeUp(networkMapCache, regulator, "Regulator") - Pair(notary, regulator) + nodeMustBeUp(networkMapCache, notary.get(), "TestNotary") + nodeMustBeUp(networkMapCache, regulator.get(), "Regulator") + Pair(notary.get(), regulator.get()) } nodeMustBeDown(notary) nodeMustBeDown(regulator) @@ -47,8 +47,8 @@ class DriverTests { fun startingNodeWithNoServicesWorks() { val noService = driver { val noService = startNode("NoService") - nodeMustBeUp(networkMapCache, noService, "NoService") - noService + nodeMustBeUp(networkMapCache, noService.get(), "NoService") + noService.get() } nodeMustBeDown(noService) } @@ -57,8 +57,8 @@ class DriverTests { fun randomFreePortAllocationWorks() { val nodeInfo = driver(portAllocation = PortAllocation.RandomFree()) { val nodeInfo = startNode("NoService") - nodeMustBeUp(networkMapCache, nodeInfo, "NoService") - nodeInfo + nodeMustBeUp(networkMapCache, nodeInfo.get(), "NoService") + nodeInfo.get() } nodeMustBeDown(nodeInfo) }