mirror of
https://github.com/corda/corda.git
synced 2025-04-07 19:34:41 +00:00
node: Add driver DSL for starting up nodes
This commit is contained in:
parent
7357597501
commit
e42d8c2e8f
244
node/src/main/kotlin/com/r3corda/node/driver/Driver.kt
Normal file
244
node/src/main/kotlin/com/r3corda/node/driver/Driver.kt
Normal file
@ -0,0 +1,244 @@
|
||||
package com.r3corda.node.driver
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.generateKeyPair
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.node.driver.NodeRunner
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingService
|
||||
import com.r3corda.node.services.network.InMemoryNetworkMapCache
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import java.net.Socket
|
||||
import java.net.SocketException
|
||||
import java.nio.file.Paths
|
||||
import java.text.SimpleDateFormat
|
||||
import java.util.*
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
/**
|
||||
* This file defines a small "Driver" DSL for starting up nodes.
|
||||
*
|
||||
* The process the driver is run behaves as an Artemis client and starts up other processes. Namely it first
|
||||
* bootstraps a network map service to allow the specified nodes to connect to, then starts up the actual nodes/explorers.
|
||||
*
|
||||
* Usage:
|
||||
* driver {
|
||||
* startNode(setOf(NotaryService.Type), "Notary")
|
||||
* val aliceMonitor = startNode(setOf(WalletMonitorService.Type), "Alice")
|
||||
* startExplorer(aliceMonitor)
|
||||
* }
|
||||
*
|
||||
* The base directory node directories go into may be specified in [driver] and defaults to "build/<timestamp>/".
|
||||
* The node directories themselves are "<baseDirectory>/<legalName>/", where legalName defaults to
|
||||
* "<randomName>-<messagingPort>" and may be specified in [DriverDSL.startNode].
|
||||
*
|
||||
* TODO The driver actually starts up as an Artemis server now that may route traffic. Fix this once the client MessagingService is done.
|
||||
* TODO The nodes are started up sequentially which is quite slow. Either speed up node startup or make startup parallel somehow.
|
||||
* TODO The driver now polls the network map cache for info about newly started up nodes, this could be done asynchronously(?).
|
||||
* TODO The network map service bootstrap is hacky (needs to fake the service's public key in order to retrieve the true one), needs some thought.
|
||||
*/
|
||||
|
||||
fun <A> driver(baseDirectory: String? = null, dsl: DriverDSL.() -> A): Pair<DriverHandle, A> {
|
||||
val driverDsl = DriverDSL(10000, baseDirectory ?: "build/${getTimestampAsDirectoryName()}")
|
||||
driverDsl.start()
|
||||
val returnValue = dsl(driverDsl)
|
||||
val shutdownHook = Thread({
|
||||
driverDsl.shutdown()
|
||||
})
|
||||
Runtime.getRuntime().addShutdownHook(shutdownHook)
|
||||
return Pair(DriverHandle(driverDsl, shutdownHook), returnValue)
|
||||
}
|
||||
|
||||
private fun getTimestampAsDirectoryName(): String {
|
||||
val tz = TimeZone.getTimeZone("UTC")
|
||||
val df = SimpleDateFormat("yyyyMMddHHmmss")
|
||||
df.timeZone = tz
|
||||
return df.format(Date())
|
||||
}
|
||||
|
||||
class DriverHandle(private val driverDsl: DriverDSL, private val shutdownHook: Thread) {
|
||||
val messagingService = driverDsl.messagingService
|
||||
|
||||
fun waitForAllNodesToFinish() {
|
||||
driverDsl.waitForAllNodesToFinish()
|
||||
}
|
||||
|
||||
fun shutdown() {
|
||||
driverDsl.shutdown()
|
||||
Runtime.getRuntime().removeShutdownHook(shutdownHook)
|
||||
}
|
||||
}
|
||||
|
||||
private fun <A> poll(f: () -> A?): A {
|
||||
var counter = 0
|
||||
var result = f()
|
||||
while (result == null && counter < 30) {
|
||||
counter++
|
||||
Thread.sleep(500)
|
||||
result = f()
|
||||
}
|
||||
if (result == null) {
|
||||
throw Exception("Poll timed out")
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
class DriverDSL(private var portCounter: Int, val baseDirectory: String) {
|
||||
|
||||
private fun nextLocalHostAndPort() = HostAndPort.fromParts("localhost", nextPort())
|
||||
|
||||
val messagingService = ArtemisMessagingService(
|
||||
Paths.get(baseDirectory, "driver-artemis"),
|
||||
nextLocalHostAndPort(),
|
||||
object : NodeConfiguration {
|
||||
override val myLegalName = "driver-artemis"
|
||||
override val exportJMXto = ""
|
||||
override val nearestCity = "Zion"
|
||||
override val keyStorePassword = "keypass"
|
||||
override val trustStorePassword = "trustpass"
|
||||
}
|
||||
)
|
||||
|
||||
val networkMapCache = InMemoryNetworkMapCache(null)
|
||||
private val networkMapName = "NetworkMapService"
|
||||
private val networkMapAddress = nextLocalHostAndPort()
|
||||
private lateinit var networkMapNodeInfo: NodeInfo
|
||||
private val registeredProcesses = LinkedList<Process>()
|
||||
|
||||
private fun nextPort(): Int {
|
||||
val nextPort = portCounter
|
||||
portCounter++
|
||||
return nextPort
|
||||
}
|
||||
|
||||
fun registerProcess(process: Process) = registeredProcesses.push(process)
|
||||
|
||||
internal fun waitForAllNodesToFinish() {
|
||||
registeredProcesses.forEach {
|
||||
it.waitFor()
|
||||
}
|
||||
}
|
||||
|
||||
internal fun shutdown() {
|
||||
registeredProcesses.forEach {
|
||||
it.destroy()
|
||||
}
|
||||
registeredProcesses.forEach {
|
||||
if (!it.waitFor(5, TimeUnit.SECONDS)) {
|
||||
it.destroyForcibly()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun startNode(advertisedServices: Set<ServiceType>, providedName: String? = null): NodeInfo {
|
||||
val messagingAddress = nextLocalHostAndPort()
|
||||
val name = providedName ?: "${pickA(name)}-${messagingAddress.port}"
|
||||
val nearestCity = pickA(city)
|
||||
|
||||
val driverCliParams = NodeRunner.CliParams(
|
||||
services = advertisedServices,
|
||||
networkMapName = networkMapNodeInfo.identity.name,
|
||||
networkMapPublicKey = networkMapNodeInfo.identity.owningKey,
|
||||
networkMapAddress = networkMapAddress,
|
||||
messagingAddress = messagingAddress,
|
||||
apiAddress = nextLocalHostAndPort(),
|
||||
baseDirectory = baseDirectory,
|
||||
nearestCity = nearestCity,
|
||||
legalName = name
|
||||
)
|
||||
registerProcess(startNode(driverCliParams))
|
||||
|
||||
return poll {
|
||||
networkMapCache.partyNodes.forEach {
|
||||
if (it.identity.name == name) {
|
||||
return@poll it
|
||||
}
|
||||
}
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
internal fun start() {
|
||||
messagingService.configureWithDevSSLCertificate()
|
||||
messagingService.start()
|
||||
startNetworkMapService()
|
||||
}
|
||||
|
||||
private fun startNetworkMapService() {
|
||||
val driverCliParams = NodeRunner.CliParams(
|
||||
services = setOf(NetworkMapService.Type),
|
||||
networkMapName = null,
|
||||
networkMapPublicKey = null,
|
||||
networkMapAddress = null,
|
||||
messagingAddress = networkMapAddress,
|
||||
apiAddress = nextLocalHostAndPort(),
|
||||
baseDirectory = baseDirectory,
|
||||
nearestCity = pickA(city),
|
||||
legalName = networkMapName
|
||||
)
|
||||
println("Starting network-map-service")
|
||||
registerProcess(startNode(driverCliParams))
|
||||
// We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from
|
||||
// the network map service itself
|
||||
val nodeInfo = NodeInfo(
|
||||
address = ArtemisMessagingService.makeRecipient(networkMapAddress),
|
||||
identity = Party(
|
||||
name = networkMapName,
|
||||
owningKey = generateKeyPair().public
|
||||
),
|
||||
advertisedServices = setOf(NetworkMapService.Type)
|
||||
)
|
||||
networkMapCache.addMapService(messagingService, nodeInfo, true)
|
||||
networkMapNodeInfo = poll {
|
||||
networkMapCache.partyNodes.forEach {
|
||||
if (it.identity.name == networkMapName) {
|
||||
return@poll it
|
||||
}
|
||||
}
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
|
||||
private val city = arrayOf(
|
||||
"London",
|
||||
"Paris",
|
||||
"New York",
|
||||
"Tokyo"
|
||||
)
|
||||
private val name = arrayOf(
|
||||
"Alice",
|
||||
"Bob",
|
||||
"EvilBank",
|
||||
"NotSoEvilBank"
|
||||
)
|
||||
private fun <A> pickA(array: Array<A>): A = array[Math.abs(Random().nextInt()) % array.size]
|
||||
|
||||
private fun startNode(cliParams: NodeRunner.CliParams): Process {
|
||||
val className = NodeRunner::class.java.canonicalName
|
||||
val separator = System.getProperty("file.separator")
|
||||
val classpath = System.getProperty("java.class.path")
|
||||
val path = System.getProperty("java.home") + separator + "bin" + separator + "java"
|
||||
val javaArgs = listOf(path) +
|
||||
listOf("-Dname=${cliParams.legalName}", "-javaagent:lib/quasar.jar", "-cp", classpath, className) +
|
||||
cliParams.toCliArguments()
|
||||
val builder = ProcessBuilder(javaArgs)
|
||||
builder.redirectError(Paths.get("error.$className.log").toFile())
|
||||
builder.inheritIO()
|
||||
val process = builder.start()
|
||||
poll {
|
||||
try {
|
||||
Socket(cliParams.messagingAddress.hostText, cliParams.messagingAddress.port).close()
|
||||
Unit
|
||||
} catch (_exception: SocketException) {
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
return process
|
||||
}
|
||||
}
|
||||
}
|
165
node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt
Normal file
165
node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt
Normal file
@ -0,0 +1,165 @@
|
||||
package com.r3corda.node.driver
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.toBase58String
|
||||
import com.r3corda.core.crypto.toPublicKey
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.node.internal.Node
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingService
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import joptsimple.ArgumentAcceptingOptionSpec
|
||||
import joptsimple.OptionParser
|
||||
import joptsimple.OptionSet
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.security.PublicKey
|
||||
import java.util.*
|
||||
|
||||
class NodeRunner {
|
||||
companion object {
|
||||
@JvmStatic fun main(arguments: Array<String>) {
|
||||
val cliParams = CliParams.parse(CliParams.parser.parse(*arguments))
|
||||
|
||||
val nodeDirectory = Paths.get(cliParams.baseDirectory, cliParams.legalName)
|
||||
createNodeRunDirectory(nodeDirectory)
|
||||
|
||||
with(cliParams) {
|
||||
|
||||
val networkMapNodeInfo =
|
||||
if (networkMapName != null && networkMapPublicKey != null && networkMapAddress != null) {
|
||||
NodeInfo(
|
||||
address = ArtemisMessagingService.makeRecipient(networkMapAddress),
|
||||
identity = Party(
|
||||
name = networkMapName,
|
||||
owningKey = networkMapPublicKey
|
||||
),
|
||||
advertisedServices = setOf(NetworkMapService.Type)
|
||||
)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
|
||||
val node = Node(
|
||||
dir = nodeDirectory,
|
||||
p2pAddr = messagingAddress,
|
||||
webServerAddr = apiAddress,
|
||||
configuration = object : NodeConfiguration {
|
||||
override val myLegalName = legalName
|
||||
override val exportJMXto = ""
|
||||
override val nearestCity = cliParams.nearestCity
|
||||
override val keyStorePassword = "keypass"
|
||||
override val trustStorePassword = "trustpass"
|
||||
},
|
||||
networkMapAddress = networkMapNodeInfo,
|
||||
advertisedServices = services.toSet()
|
||||
)
|
||||
|
||||
|
||||
println("Starting $legalName with services $services on addresses $messagingAddress and $apiAddress")
|
||||
node.start()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class CliParams (
|
||||
val services: Set<ServiceType>,
|
||||
val networkMapName: String?,
|
||||
val networkMapPublicKey: PublicKey?,
|
||||
val networkMapAddress: HostAndPort?,
|
||||
val messagingAddress: HostAndPort,
|
||||
val apiAddress: HostAndPort,
|
||||
val baseDirectory: String,
|
||||
val legalName: String,
|
||||
val nearestCity: String
|
||||
) {
|
||||
|
||||
companion object {
|
||||
val parser = OptionParser()
|
||||
val services =
|
||||
parser.accepts("services").withRequiredArg().ofType(String::class.java)
|
||||
val networkMapName =
|
||||
parser.accepts("network-map-name").withOptionalArg().ofType(String::class.java)
|
||||
val networkMapPublicKey =
|
||||
parser.accepts("network-map-public-key").withOptionalArg().ofType(String::class.java)
|
||||
val networkMapAddress =
|
||||
parser.accepts("network-map-address").withOptionalArg().ofType(String::class.java)
|
||||
val messagingAddress =
|
||||
parser.accepts("messaging-address").withRequiredArg().ofType(String::class.java)
|
||||
val apiAddress =
|
||||
parser.accepts("api-address").withRequiredArg().ofType(String::class.java)
|
||||
val baseDirectory =
|
||||
parser.accepts("base-directory").withRequiredArg().ofType(String::class.java)
|
||||
val nearestCity =
|
||||
parser.accepts("nearest-city").withRequiredArg().ofType(String::class.java)
|
||||
val legalName =
|
||||
parser.accepts("legal-name").withRequiredArg().ofType(String::class.java)
|
||||
|
||||
private fun <T> requiredArgument(optionSet: OptionSet, spec: ArgumentAcceptingOptionSpec<T>) =
|
||||
optionSet.valueOf(spec) ?: throw IllegalArgumentException("Must provide $spec")
|
||||
|
||||
fun parse(optionSet: OptionSet): CliParams {
|
||||
val services = optionSet.valuesOf(services)
|
||||
if (services.isEmpty()) {
|
||||
throw IllegalArgumentException("Must provide at least one --services")
|
||||
}
|
||||
val networkMapName = optionSet.valueOf(networkMapName)
|
||||
val networkMapPublicKey = optionSet.valueOf(networkMapPublicKey)?.toPublicKey()
|
||||
val networkMapAddress = optionSet.valueOf(networkMapAddress)
|
||||
val messagingAddress = requiredArgument(optionSet, messagingAddress)
|
||||
val apiAddress = requiredArgument(optionSet, apiAddress)
|
||||
val baseDirectory = requiredArgument(optionSet, baseDirectory)
|
||||
val nearestCity = requiredArgument(optionSet, nearestCity)
|
||||
val legalName = requiredArgument(optionSet, legalName)
|
||||
|
||||
return CliParams(
|
||||
services = services.map { object : ServiceType(it) {} }.toSet(),
|
||||
messagingAddress = HostAndPort.fromString(messagingAddress),
|
||||
apiAddress = HostAndPort.fromString(apiAddress),
|
||||
baseDirectory = baseDirectory,
|
||||
networkMapName = networkMapName,
|
||||
networkMapPublicKey = networkMapPublicKey,
|
||||
networkMapAddress = networkMapAddress?.let { HostAndPort.fromString(it) },
|
||||
legalName = legalName,
|
||||
nearestCity = nearestCity
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fun toCliArguments(): List<String> {
|
||||
val cliArguments = LinkedList<String>()
|
||||
cliArguments.add("--services")
|
||||
cliArguments.addAll(services.map { it.toString() })
|
||||
if (networkMapName != null) {
|
||||
cliArguments.add("--network-map-name")
|
||||
cliArguments.add(networkMapName)
|
||||
}
|
||||
if (networkMapPublicKey != null) {
|
||||
cliArguments.add("--network-map-public-key")
|
||||
cliArguments.add(networkMapPublicKey.toBase58String())
|
||||
}
|
||||
if (networkMapAddress != null) {
|
||||
cliArguments.add("--network-map-address")
|
||||
cliArguments.add(networkMapAddress.toString())
|
||||
}
|
||||
cliArguments.add("--messaging-address")
|
||||
cliArguments.add(messagingAddress.toString())
|
||||
cliArguments.add("--api-address")
|
||||
cliArguments.add(apiAddress.toString())
|
||||
cliArguments.add("--base-directory")
|
||||
cliArguments.add(baseDirectory.toString())
|
||||
cliArguments.add("--nearest-city")
|
||||
cliArguments.add(nearestCity)
|
||||
cliArguments.add("--legal-name")
|
||||
cliArguments.add(legalName)
|
||||
return cliArguments
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun createNodeRunDirectory(directory: Path) {
|
||||
directory.toFile().mkdirs()
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user