node-driver: Return Futures instead of waiting for full node startup

This commit is contained in:
Andras Slemmer 2016-08-18 11:22:48 +01:00
parent 21092e2bb6
commit 9cbdf001fb
2 changed files with 53 additions and 38 deletions

View File

@ -1,6 +1,7 @@
package com.r3corda.node.driver package com.r3corda.node.driver
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.generateKeyPair import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.messaging.MessagingService import com.r3corda.core.messaging.MessagingService
@ -25,9 +26,7 @@ import java.net.URLClassLoader
import java.nio.file.Paths import java.nio.file.Paths
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import java.util.* import java.util.*
import java.util.concurrent.Executors import java.util.concurrent.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import kotlin.concurrent.thread import kotlin.concurrent.thread
/** /**
@ -48,7 +47,15 @@ private val log: Logger = LoggerFactory.getLogger(DriverDSL::class.java)
* This is the interface that's exposed to * This is the interface that's exposed to
*/ */
interface DriverDSLExposedInterface { interface DriverDSLExposedInterface {
fun startNode(providedName: String? = null, advertisedServices: Set<ServiceType> = setOf()): NodeInfo /**
* Starts a [Node] in a separate process.
*
* @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something
* random. Note that this must be unique as the driver uses it as a primary key!
* @param advertisedServices The set of services to be advertised by the node. Defaults to empty set.
* @return The [NodeInfo] of the started up node retrieved from the network map service.
*/
fun startNode(providedName: String? = null, advertisedServices: Set<ServiceType> = setOf()): Future<NodeInfo>
fun waitForAllNodesToFinish() fun waitForAllNodesToFinish()
val messagingService: MessagingService val messagingService: MessagingService
val networkMapCache: NetworkMapCache val networkMapCache: NetworkMapCache
@ -80,6 +87,9 @@ sealed class PortAllocation {
* (...) * (...)
* } * }
* *
* Note that [DriverDSL.startNode] does not wait for the node to start up synchronously returns a [Future] of the
* [NodeInfo] that may be waited on, which guarantees that the new node registered with the network map service.
*
* The driver implicitly bootstraps a [NetworkMapService] that may be accessed through a local cache [DriverDSL.networkMapCache] * The driver implicitly bootstraps a [NetworkMapService] that may be accessed through a local cache [DriverDSL.networkMapCache]
* The driver is an artemis node itself, the messaging service may be accessed by [DriverDSL.messagingService] * The driver is an artemis node itself, the messaging service may be accessed by [DriverDSL.messagingService]
* *
@ -184,12 +194,17 @@ class DriverDSL(
val debugPortAllocation: PortAllocation, val debugPortAllocation: PortAllocation,
val baseDirectory: String val baseDirectory: String
) : DriverDSLInternalInterface { ) : DriverDSLInternalInterface {
override val networkMapCache = InMemoryNetworkMapCache() override val networkMapCache = InMemoryNetworkMapCache()
private val networkMapName = "NetworkMapService" private val networkMapName = "NetworkMapService"
private val networkMapAddress = portAllocation.nextHostAndPort() private val networkMapAddress = portAllocation.nextHostAndPort()
private var networkMapNodeInfo: NodeInfo? = null private var networkMapNodeInfo: NodeInfo? = null
private val registeredProcesses = LinkedList<Process>()
class State {
val registeredProcesses = LinkedList<Process>()
val clients = LinkedList<ArtemisMessagingClient>()
var localServer: ArtemisMessagingServer? = null
}
private val state = ThreadBox(State())
//TODO: remove this once we can bundle quasar properly. //TODO: remove this once we can bundle quasar properly.
private val quasarJarPath: String by lazy { private val quasarJarPath: String by lazy {
@ -219,16 +234,20 @@ class DriverDSL(
) )
var messagingServiceStarted = false var messagingServiceStarted = false
fun registerProcess(process: Process) = registeredProcesses.push(process) fun registerProcess(process: Process) = state.locked { registeredProcesses.push(process) }
override fun waitForAllNodesToFinish() { override fun waitForAllNodesToFinish() {
state.locked {
registeredProcesses.forEach { registeredProcesses.forEach {
it.waitFor() it.waitFor()
} }
} }
}
override fun shutdown() { override fun shutdown() {
state.locked {
registeredProcesses.forEach(Process::destroy) registeredProcesses.forEach(Process::destroy)
}
/** Wait 5 seconds, then [Process.destroyForcibly] */ /** Wait 5 seconds, then [Process.destroyForcibly] */
val finishedFuture = Executors.newSingleThreadExecutor().submit { val finishedFuture = Executors.newSingleThreadExecutor().submit {
waitForAllNodesToFinish() waitForAllNodesToFinish()
@ -237,27 +256,21 @@ class DriverDSL(
finishedFuture.get(5, TimeUnit.SECONDS) finishedFuture.get(5, TimeUnit.SECONDS)
} catch (exception: TimeoutException) { } catch (exception: TimeoutException) {
finishedFuture.cancel(true) finishedFuture.cancel(true)
state.locked {
registeredProcesses.forEach { registeredProcesses.forEach {
it.destroyForcibly() it.destroyForcibly()
} }
} }
if (messagingServiceStarted) }
messagingService.stop()
// Check that we shut down properly // Check that we shut down properly
addressMustNotBeBound(messagingService.myHostPort) state.locked {
localServer?.run { addressMustNotBeBound(myHostPort) }
}
addressMustNotBeBound(networkMapAddress) addressMustNotBeBound(networkMapAddress)
} }
/** override fun startNode(providedName: String?, advertisedServices: Set<ServiceType>): Future<NodeInfo> {
* Starts a [Node] in a separate process.
*
* @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something
* random. Note that this must be unique as the driver uses it as a primary key!
* @param advertisedServices The set of services to be advertised by the node. Defaults to empty set.
* @return The [NodeInfo] of the started up node retrieved from the network map service.
*/
override fun startNode(providedName: String?, advertisedServices: Set<ServiceType>): NodeInfo {
val messagingAddress = portAllocation.nextHostAndPort() val messagingAddress = portAllocation.nextHostAndPort()
val apiAddress = portAllocation.nextHostAndPort() val apiAddress = portAllocation.nextHostAndPort()
val debugPort = debugPortAllocation.nextPort() val debugPort = debugPortAllocation.nextPort()
@ -282,9 +295,10 @@ class DriverDSL(
apiAddress = apiAddress, apiAddress = apiAddress,
baseDirectory = nodeDirectory baseDirectory = nodeDirectory
) )
registerProcess(DriverDSL.startNode(config, driverCliParams, name, quasarJarPath, debugPort))
return poll { return Executors.newSingleThreadExecutor().submit(Callable<NodeInfo> {
registerProcess(DriverDSL.startNode(config, driverCliParams, name, quasarJarPath, debugPort))
poll("network map cache for $name") {
networkMapCache.partyNodes.forEach { networkMapCache.partyNodes.forEach {
if (it.identity.name == name) { if (it.identity.name == name) {
return@poll it return@poll it
@ -292,6 +306,7 @@ class DriverDSL(
} }
null null
} }
})
} }
override fun start() { override fun start() {

View File

@ -35,9 +35,9 @@ class DriverTests {
val notary = startNode("TestNotary", setOf(NotaryService.Type)) val notary = startNode("TestNotary", setOf(NotaryService.Type))
val regulator = startNode("Regulator", setOf(RegulatorService.Type)) val regulator = startNode("Regulator", setOf(RegulatorService.Type))
nodeMustBeUp(networkMapCache, notary, "TestNotary") nodeMustBeUp(networkMapCache, notary.get(), "TestNotary")
nodeMustBeUp(networkMapCache, regulator, "Regulator") nodeMustBeUp(networkMapCache, regulator.get(), "Regulator")
Pair(notary, regulator) Pair(notary.get(), regulator.get())
} }
nodeMustBeDown(notary) nodeMustBeDown(notary)
nodeMustBeDown(regulator) nodeMustBeDown(regulator)
@ -47,8 +47,8 @@ class DriverTests {
fun startingNodeWithNoServicesWorks() { fun startingNodeWithNoServicesWorks() {
val noService = driver { val noService = driver {
val noService = startNode("NoService") val noService = startNode("NoService")
nodeMustBeUp(networkMapCache, noService, "NoService") nodeMustBeUp(networkMapCache, noService.get(), "NoService")
noService noService.get()
} }
nodeMustBeDown(noService) nodeMustBeDown(noService)
} }
@ -57,8 +57,8 @@ class DriverTests {
fun randomFreePortAllocationWorks() { fun randomFreePortAllocationWorks() {
val nodeInfo = driver(portAllocation = PortAllocation.RandomFree()) { val nodeInfo = driver(portAllocation = PortAllocation.RandomFree()) {
val nodeInfo = startNode("NoService") val nodeInfo = startNode("NoService")
nodeMustBeUp(networkMapCache, nodeInfo, "NoService") nodeMustBeUp(networkMapCache, nodeInfo.get(), "NoService")
nodeInfo nodeInfo.get()
} }
nodeMustBeDown(nodeInfo) nodeMustBeDown(nodeInfo)
} }