mirror of
https://github.com/corda/corda.git
synced 2025-06-22 17:09:00 +00:00
node: Address PR comments, better resource releasing, add kdoc
This commit is contained in:
@ -30,18 +30,8 @@ import java.util.concurrent.TimeoutException
|
|||||||
/**
|
/**
|
||||||
* This file defines a small "Driver" DSL for starting up nodes.
|
* This file defines a small "Driver" DSL for starting up nodes.
|
||||||
*
|
*
|
||||||
* The process the driver is run behaves as an Artemis client and starts up other processes. Namely it first
|
* The process the driver is run in behaves as an Artemis client and starts up other processes. Namely it first
|
||||||
* bootstraps a network map service to allow the specified nodes to connect to, then starts up the actual nodes/explorers.
|
* bootstraps a network map service to allow the specified nodes to connect to, then starts up the actual nodes.
|
||||||
*
|
|
||||||
* Usage:
|
|
||||||
* driver {
|
|
||||||
* startNode(setOf(NotaryService.Type), "Notary")
|
|
||||||
* val aliceMonitor = startNode(setOf(WalletMonitorService.Type), "Alice")
|
|
||||||
* }
|
|
||||||
*
|
|
||||||
* The base directory node directories go into may be specified in [driver] and defaults to "build/<timestamp>/".
|
|
||||||
* The node directories themselves are "<baseDirectory>/<legalName>/", where legalName defaults to
|
|
||||||
* "<randomName>-<messagingPort>" and may be specified in [DriverDSL.startNode].
|
|
||||||
*
|
*
|
||||||
* TODO The driver actually starts up as an Artemis server now that may route traffic. Fix this once the client MessagingService is done.
|
* TODO The driver actually starts up as an Artemis server now that may route traffic. Fix this once the client MessagingService is done.
|
||||||
* TODO The nodes are started up sequentially which is quite slow. Either speed up node startup or make startup parallel somehow.
|
* TODO The nodes are started up sequentially which is quite slow. Either speed up node startup or make startup parallel somehow.
|
||||||
@ -53,12 +43,12 @@ import java.util.concurrent.TimeoutException
|
|||||||
* This is the interface that's exposed to
|
* This is the interface that's exposed to
|
||||||
*/
|
*/
|
||||||
interface DriverDSLExposedInterface {
|
interface DriverDSLExposedInterface {
|
||||||
fun startNode(advertisedServices: Set<ServiceType>, providedName: String? = null): NodeInfo
|
fun startNode(providedName: String? = null, advertisedServices: Set<ServiceType> = setOf()): NodeInfo
|
||||||
|
val messagingService: MessagingService
|
||||||
|
val networkMapCache: NetworkMapCache
|
||||||
}
|
}
|
||||||
|
|
||||||
interface DriverDSLInternalInterface : DriverDSLExposedInterface {
|
interface DriverDSLInternalInterface : DriverDSLExposedInterface {
|
||||||
val messagingService: MessagingService
|
|
||||||
val networkMapCache: NetworkMapCache
|
|
||||||
fun start()
|
fun start()
|
||||||
fun shutdown()
|
fun shutdown()
|
||||||
fun waitForAllNodesToFinish()
|
fun waitForAllNodesToFinish()
|
||||||
@ -66,7 +56,7 @@ interface DriverDSLInternalInterface : DriverDSLExposedInterface {
|
|||||||
|
|
||||||
sealed class PortAllocation {
|
sealed class PortAllocation {
|
||||||
abstract fun nextPort(): Int
|
abstract fun nextPort(): Int
|
||||||
fun nextHostAndPort() = HostAndPort.fromParts("localhost", nextPort())
|
fun nextHostAndPort(): HostAndPort = HostAndPort.fromParts("localhost", nextPort())
|
||||||
|
|
||||||
class Incremental(private var portCounter: Int) : PortAllocation() {
|
class Incremental(private var portCounter: Int) : PortAllocation() {
|
||||||
override fun nextPort() = portCounter++
|
override fun nextPort() = portCounter++
|
||||||
@ -79,7 +69,27 @@ sealed class PortAllocation {
|
|||||||
private val log: Logger = LoggerFactory.getLogger("Driver")
|
private val log: Logger = LoggerFactory.getLogger("Driver")
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TODO: remove quasarJarPath once we have a proper way of bundling quasar
|
* [driver] allows one to start up nodes like this:
|
||||||
|
* driver {
|
||||||
|
* val noService = startNode("NoService")
|
||||||
|
* val notary = startNode("Notary")
|
||||||
|
*
|
||||||
|
* (...)
|
||||||
|
* }
|
||||||
|
*
|
||||||
|
* The driver implicitly bootstraps a [NetworkMapService] that may be accessed through a local cache [DriverDSL.networkMapCache]
|
||||||
|
* The driver is an artemis node itself, the messaging service may be accessed by [DriverDSL.messagingService]
|
||||||
|
*
|
||||||
|
* @param baseDirectory The base directory node directories go into, defaults to "build/<timestamp>/". The node
|
||||||
|
* directories themselves are "<baseDirectory>/<legalName>/", where legalName defaults to "<randomName>-<messagingPort>"
|
||||||
|
* and may be specified in [DriverDSL.startNode].
|
||||||
|
* @param nodeConfigurationPath The path to the node's .conf, defaults to "reference.conf".
|
||||||
|
* @param quasarJarPath The path to quasar.jar, relative to cwd. Defaults to "lib/quasar.jar". TODO remove this once we can bundle quasar properly.
|
||||||
|
* @param portAllocation The port allocation strategy to use for the messaging and the web server addresses. Defaults to incremental.
|
||||||
|
* @param debugPortAllocation The port allocation strategy to use for jvm debugging. Defaults to incremental.
|
||||||
|
* @param with A closure to be run once the nodes have started up. Defaults to empty closure.
|
||||||
|
* @param dsl The dsl itself
|
||||||
|
* @return The value returned in the [dsl] closure
|
||||||
*/
|
*/
|
||||||
fun <A> driver(
|
fun <A> driver(
|
||||||
baseDirectory: String = "build/${getTimestampAsDirectoryName()}",
|
baseDirectory: String = "build/${getTimestampAsDirectoryName()}",
|
||||||
@ -87,6 +97,7 @@ fun <A> driver(
|
|||||||
quasarJarPath: String = "lib/quasar.jar",
|
quasarJarPath: String = "lib/quasar.jar",
|
||||||
portAllocation: PortAllocation = PortAllocation.Incremental(10000),
|
portAllocation: PortAllocation = PortAllocation.Incremental(10000),
|
||||||
debugPortAllocation: PortAllocation = PortAllocation.Incremental(5005),
|
debugPortAllocation: PortAllocation = PortAllocation.Incremental(5005),
|
||||||
|
with: (DriverHandle, A) -> Unit = {_driverHandle, _result -> },
|
||||||
dsl: DriverDSLExposedInterface.() -> A
|
dsl: DriverDSLExposedInterface.() -> A
|
||||||
) = genericDriver(
|
) = genericDriver(
|
||||||
driverDsl = DriverDSL(
|
driverDsl = DriverDSL(
|
||||||
@ -97,7 +108,8 @@ fun <A> driver(
|
|||||||
quasarJarPath = quasarJarPath
|
quasarJarPath = quasarJarPath
|
||||||
),
|
),
|
||||||
coerce = { it },
|
coerce = { it },
|
||||||
dsl = dsl
|
dsl = dsl,
|
||||||
|
with = with
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -112,15 +124,22 @@ fun <A> driver(
|
|||||||
fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericDriver(
|
fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericDriver(
|
||||||
driverDsl: D,
|
driverDsl: D,
|
||||||
coerce: (D) -> DI,
|
coerce: (D) -> DI,
|
||||||
dsl: DI.() -> A
|
dsl: DI.() -> A,
|
||||||
): Pair<DriverHandle, A> {
|
with: (DriverHandle, A) -> Unit
|
||||||
|
): A {
|
||||||
driverDsl.start()
|
driverDsl.start()
|
||||||
val returnValue = dsl(coerce(driverDsl))
|
val returnValue = dsl(coerce(driverDsl))
|
||||||
val shutdownHook = Thread({
|
val shutdownHook = Thread({
|
||||||
driverDsl.shutdown()
|
driverDsl.shutdown()
|
||||||
})
|
})
|
||||||
Runtime.getRuntime().addShutdownHook(shutdownHook)
|
Runtime.getRuntime().addShutdownHook(shutdownHook)
|
||||||
return Pair(DriverHandle(driverDsl, shutdownHook), returnValue)
|
try {
|
||||||
|
with(DriverHandle(driverDsl), returnValue)
|
||||||
|
} finally {
|
||||||
|
driverDsl.shutdown()
|
||||||
|
Runtime.getRuntime().removeShutdownHook(shutdownHook)
|
||||||
|
}
|
||||||
|
return returnValue
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun getTimestampAsDirectoryName(): String {
|
private fun getTimestampAsDirectoryName(): String {
|
||||||
@ -130,18 +149,13 @@ private fun getTimestampAsDirectoryName(): String {
|
|||||||
return df.format(Date())
|
return df.format(Date())
|
||||||
}
|
}
|
||||||
|
|
||||||
class DriverHandle(private val driverDsl: DriverDSLInternalInterface, private val shutdownHook: Thread) {
|
class DriverHandle(private val driverDsl: DriverDSLInternalInterface) {
|
||||||
val messagingService = driverDsl.messagingService
|
val messagingService = driverDsl.messagingService
|
||||||
val networkMapCache = driverDsl.networkMapCache
|
val networkMapCache = driverDsl.networkMapCache
|
||||||
|
|
||||||
fun waitForAllNodesToFinish() {
|
fun waitForAllNodesToFinish() {
|
||||||
driverDsl.waitForAllNodesToFinish()
|
driverDsl.waitForAllNodesToFinish()
|
||||||
}
|
}
|
||||||
|
|
||||||
fun shutdown() {
|
|
||||||
driverDsl.shutdown()
|
|
||||||
Runtime.getRuntime().removeShutdownHook(shutdownHook)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fun <A> poll(f: () -> A?): A {
|
fun <A> poll(f: () -> A?): A {
|
||||||
@ -213,9 +227,18 @@ class DriverDSL(
|
|||||||
it.destroyForcibly()
|
it.destroyForcibly()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
messagingService.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun startNode(advertisedServices: Set<ServiceType>, providedName: String?): NodeInfo {
|
/**
|
||||||
|
* Starts a [Node] in a separate process.
|
||||||
|
*
|
||||||
|
* @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something
|
||||||
|
* random. Note that this must be unique as the driver uses it as a primary key!
|
||||||
|
* @param advertisedServices The set of services to be advertised by the node. Defaults to empty set.
|
||||||
|
* @return The [NodeInfo] of the started up node retrieved from the network map service.
|
||||||
|
*/
|
||||||
|
override fun startNode(providedName: String?, advertisedServices: Set<ServiceType>): NodeInfo {
|
||||||
val messagingAddress = portAllocation.nextHostAndPort()
|
val messagingAddress = portAllocation.nextHostAndPort()
|
||||||
val apiAddress = portAllocation.nextHostAndPort()
|
val apiAddress = portAllocation.nextHostAndPort()
|
||||||
val debugPort = debugPortAllocation.nextPort()
|
val debugPort = debugPortAllocation.nextPort()
|
||||||
|
@ -111,9 +111,6 @@ class NodeRunner {
|
|||||||
|
|
||||||
fun parse(optionSet: OptionSet): CliParams {
|
fun parse(optionSet: OptionSet): CliParams {
|
||||||
val services = optionSet.valuesOf(services)
|
val services = optionSet.valuesOf(services)
|
||||||
if (services.isEmpty()) {
|
|
||||||
throw IllegalArgumentException("Must provide at least one --services")
|
|
||||||
}
|
|
||||||
val networkMapName = optionSet.valueOf(networkMapName)
|
val networkMapName = optionSet.valueOf(networkMapName)
|
||||||
val networkMapPublicKey = optionSet.valueOf(networkMapPublicKey)?.let { parsePublicKeyBase58(it) }
|
val networkMapPublicKey = optionSet.valueOf(networkMapPublicKey)?.let { parsePublicKeyBase58(it) }
|
||||||
val networkMapAddress = optionSet.valueOf(networkMapAddress)
|
val networkMapAddress = optionSet.valueOf(networkMapAddress)
|
||||||
@ -139,8 +136,10 @@ class NodeRunner {
|
|||||||
|
|
||||||
fun toCliArguments(): List<String> {
|
fun toCliArguments(): List<String> {
|
||||||
val cliArguments = LinkedList<String>()
|
val cliArguments = LinkedList<String>()
|
||||||
|
if (services.isNotEmpty()) {
|
||||||
cliArguments.add("--services")
|
cliArguments.add("--services")
|
||||||
cliArguments.addAll(services.map { it.toString() })
|
cliArguments.addAll(services.map { it.toString() })
|
||||||
|
}
|
||||||
if (networkMapName != null) {
|
if (networkMapName != null) {
|
||||||
cliArguments.add("--network-map-name")
|
cliArguments.add("--network-map-name")
|
||||||
cliArguments.add(networkMapName)
|
cliArguments.add(networkMapName)
|
||||||
|
@ -1,5 +1,9 @@
|
|||||||
package com.r3corda.node.driver
|
package com.r3corda.node.driver
|
||||||
|
|
||||||
|
import com.google.common.net.HostAndPort
|
||||||
|
import com.r3corda.core.node.NodeInfo
|
||||||
|
import com.r3corda.core.node.services.NetworkMapCache
|
||||||
|
import com.r3corda.node.services.api.RegulatorService
|
||||||
import com.r3corda.node.services.messaging.ArtemisMessagingComponent
|
import com.r3corda.node.services.messaging.ArtemisMessagingComponent
|
||||||
import com.r3corda.node.services.transactions.NotaryService
|
import com.r3corda.node.services.transactions.NotaryService
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
@ -9,40 +13,79 @@ import java.net.SocketException
|
|||||||
|
|
||||||
class DriverTests {
|
class DriverTests {
|
||||||
|
|
||||||
@Test
|
companion object {
|
||||||
fun simpleNodeStartupShutdownWorks() {
|
fun addressMustBeBound(hostAndPort: HostAndPort) {
|
||||||
|
poll {
|
||||||
// Start a notary
|
try {
|
||||||
val (handle, notaryNodeInfo) = driver(quasarJarPath = "../lib/quasar.jar") {
|
Socket(hostAndPort.hostText, hostAndPort.port).close()
|
||||||
startNode(setOf(NotaryService.Type), "TestNotary")
|
Unit
|
||||||
|
} catch (_exception: SocketException) {
|
||||||
|
null
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fun addressMustNotBeBound(hostAndPort: HostAndPort) {
|
||||||
|
poll {
|
||||||
|
try {
|
||||||
|
Socket(hostAndPort.hostText, hostAndPort.port).close()
|
||||||
|
null
|
||||||
|
} catch (_exception: SocketException) {
|
||||||
|
Unit
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fun nodeMustBeUp(networkMapCache: NetworkMapCache, nodeInfo: NodeInfo, nodeName: String) {
|
||||||
|
val address = nodeInfo.address as ArtemisMessagingComponent.Address
|
||||||
// Check that the node is registered in the network map
|
// Check that the node is registered in the network map
|
||||||
poll {
|
poll {
|
||||||
handle.networkMapCache.get(NotaryService.Type).firstOrNull {
|
networkMapCache.get().firstOrNull {
|
||||||
it.identity.name == "TestNotary"
|
it.identity.name == nodeName
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Check that the port is bound
|
// Check that the port is bound
|
||||||
val address = notaryNodeInfo.address as ArtemisMessagingComponent.Address
|
addressMustBeBound(address.hostAndPort)
|
||||||
poll {
|
}
|
||||||
try {
|
|
||||||
Socket(address.hostAndPort.hostText, address.hostAndPort.port).close()
|
fun nodeMustBeDown(nodeInfo: NodeInfo) {
|
||||||
Unit
|
val address = nodeInfo.address as ArtemisMessagingComponent.Address
|
||||||
} catch (_exception: SocketException) {
|
// Check that the port is bound
|
||||||
null
|
addressMustNotBeBound(address.hostAndPort)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown
|
@Test
|
||||||
handle.shutdown()
|
fun simpleNodeStartupShutdownWorks() {
|
||||||
// Check that the port is not bound
|
val (notary, regulator) = driver(quasarJarPath = "../lib/quasar.jar") {
|
||||||
poll {
|
val notary = startNode("TestNotary", setOf(NotaryService.Type))
|
||||||
try {
|
val regulator = startNode("Regulator", setOf(RegulatorService.Type))
|
||||||
Socket(address.hostAndPort.hostText, address.hostAndPort.port).close()
|
|
||||||
null
|
nodeMustBeUp(networkMapCache, notary, "TestNotary")
|
||||||
} catch (_exception: SocketException) {
|
nodeMustBeUp(networkMapCache, regulator, "Regulator")
|
||||||
Unit
|
Pair(notary, regulator)
|
||||||
}
|
}
|
||||||
|
nodeMustBeDown(notary)
|
||||||
|
nodeMustBeDown(regulator)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun startingNodeWithNoServicesWorks() {
|
||||||
|
val noService = driver(quasarJarPath = "../lib/quasar.jar") {
|
||||||
|
val noService = startNode("NoService")
|
||||||
|
nodeMustBeUp(networkMapCache, noService, "NoService")
|
||||||
|
noService
|
||||||
|
}
|
||||||
|
nodeMustBeDown(noService)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun randomFreePortAllocationWorks() {
|
||||||
|
val nodeInfo = driver(quasarJarPath = "../lib/quasar.jar", portAllocation = PortAllocation.RandomFree()) {
|
||||||
|
val nodeInfo = startNode("NoService")
|
||||||
|
nodeMustBeUp(networkMapCache, nodeInfo, "NoService")
|
||||||
|
nodeInfo
|
||||||
|
}
|
||||||
|
nodeMustBeDown(nodeInfo)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user