From e42d8c2e8f18b8525dd73676eb6b14bd6f7df02d Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Fri, 29 Jul 2016 16:42:32 +0100 Subject: [PATCH] node: Add driver DSL for starting up nodes --- .../kotlin/com/r3corda/node/driver/Driver.kt | 244 ++++++++++++++++++ .../com/r3corda/node/driver/NodeRunner.kt | 165 ++++++++++++ 2 files changed, 409 insertions(+) create mode 100644 node/src/main/kotlin/com/r3corda/node/driver/Driver.kt create mode 100644 node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt new file mode 100644 index 0000000000..c30a3109cb --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -0,0 +1,244 @@ +package com.r3corda.node.driver + +import com.google.common.net.HostAndPort +import com.r3corda.core.crypto.Party +import com.r3corda.core.crypto.generateKeyPair +import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.services.ServiceType +import com.r3corda.node.driver.NodeRunner +import com.r3corda.node.services.config.NodeConfiguration +import com.r3corda.node.services.messaging.ArtemisMessagingService +import com.r3corda.node.services.network.InMemoryNetworkMapCache +import com.r3corda.node.services.network.NetworkMapService +import java.net.Socket +import java.net.SocketException +import java.nio.file.Paths +import java.text.SimpleDateFormat +import java.util.* +import java.util.concurrent.TimeUnit + +/** + * This file defines a small "Driver" DSL for starting up nodes. + * + * The process the driver is run behaves as an Artemis client and starts up other processes. Namely it first + * bootstraps a network map service to allow the specified nodes to connect to, then starts up the actual nodes/explorers. + * + * Usage: + * driver { + * startNode(setOf(NotaryService.Type), "Notary") + * val aliceMonitor = startNode(setOf(WalletMonitorService.Type), "Alice") + * startExplorer(aliceMonitor) + * } + * + * The base directory node directories go into may be specified in [driver] and defaults to "build//". + * The node directories themselves are "//", where legalName defaults to + * "-" and may be specified in [DriverDSL.startNode]. + * + * TODO The driver actually starts up as an Artemis server now that may route traffic. Fix this once the client MessagingService is done. + * TODO The nodes are started up sequentially which is quite slow. Either speed up node startup or make startup parallel somehow. + * TODO The driver now polls the network map cache for info about newly started up nodes, this could be done asynchronously(?). + * TODO The network map service bootstrap is hacky (needs to fake the service's public key in order to retrieve the true one), needs some thought. + */ + +fun driver(baseDirectory: String? = null, dsl: DriverDSL.() -> A): Pair { + val driverDsl = DriverDSL(10000, baseDirectory ?: "build/${getTimestampAsDirectoryName()}") + driverDsl.start() + val returnValue = dsl(driverDsl) + val shutdownHook = Thread({ + driverDsl.shutdown() + }) + Runtime.getRuntime().addShutdownHook(shutdownHook) + return Pair(DriverHandle(driverDsl, shutdownHook), returnValue) +} + +private fun getTimestampAsDirectoryName(): String { + val tz = TimeZone.getTimeZone("UTC") + val df = SimpleDateFormat("yyyyMMddHHmmss") + df.timeZone = tz + return df.format(Date()) +} + +class DriverHandle(private val driverDsl: DriverDSL, private val shutdownHook: Thread) { + val messagingService = driverDsl.messagingService + + fun waitForAllNodesToFinish() { + driverDsl.waitForAllNodesToFinish() + } + + fun shutdown() { + driverDsl.shutdown() + Runtime.getRuntime().removeShutdownHook(shutdownHook) + } +} + +private fun poll(f: () -> A?): A { + var counter = 0 + var result = f() + while (result == null && counter < 30) { + counter++ + Thread.sleep(500) + result = f() + } + if (result == null) { + throw Exception("Poll timed out") + } + return result +} + +class DriverDSL(private var portCounter: Int, val baseDirectory: String) { + + private fun nextLocalHostAndPort() = HostAndPort.fromParts("localhost", nextPort()) + + val messagingService = ArtemisMessagingService( + Paths.get(baseDirectory, "driver-artemis"), + nextLocalHostAndPort(), + object : NodeConfiguration { + override val myLegalName = "driver-artemis" + override val exportJMXto = "" + override val nearestCity = "Zion" + override val keyStorePassword = "keypass" + override val trustStorePassword = "trustpass" + } + ) + + val networkMapCache = InMemoryNetworkMapCache(null) + private val networkMapName = "NetworkMapService" + private val networkMapAddress = nextLocalHostAndPort() + private lateinit var networkMapNodeInfo: NodeInfo + private val registeredProcesses = LinkedList() + + private fun nextPort(): Int { + val nextPort = portCounter + portCounter++ + return nextPort + } + + fun registerProcess(process: Process) = registeredProcesses.push(process) + + internal fun waitForAllNodesToFinish() { + registeredProcesses.forEach { + it.waitFor() + } + } + + internal fun shutdown() { + registeredProcesses.forEach { + it.destroy() + } + registeredProcesses.forEach { + if (!it.waitFor(5, TimeUnit.SECONDS)) { + it.destroyForcibly() + } + } + } + + fun startNode(advertisedServices: Set, providedName: String? = null): NodeInfo { + val messagingAddress = nextLocalHostAndPort() + val name = providedName ?: "${pickA(name)}-${messagingAddress.port}" + val nearestCity = pickA(city) + + val driverCliParams = NodeRunner.CliParams( + services = advertisedServices, + networkMapName = networkMapNodeInfo.identity.name, + networkMapPublicKey = networkMapNodeInfo.identity.owningKey, + networkMapAddress = networkMapAddress, + messagingAddress = messagingAddress, + apiAddress = nextLocalHostAndPort(), + baseDirectory = baseDirectory, + nearestCity = nearestCity, + legalName = name + ) + registerProcess(startNode(driverCliParams)) + + return poll { + networkMapCache.partyNodes.forEach { + if (it.identity.name == name) { + return@poll it + } + } + null + } + } + + internal fun start() { + messagingService.configureWithDevSSLCertificate() + messagingService.start() + startNetworkMapService() + } + + private fun startNetworkMapService() { + val driverCliParams = NodeRunner.CliParams( + services = setOf(NetworkMapService.Type), + networkMapName = null, + networkMapPublicKey = null, + networkMapAddress = null, + messagingAddress = networkMapAddress, + apiAddress = nextLocalHostAndPort(), + baseDirectory = baseDirectory, + nearestCity = pickA(city), + legalName = networkMapName + ) + println("Starting network-map-service") + registerProcess(startNode(driverCliParams)) + // We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from + // the network map service itself + val nodeInfo = NodeInfo( + address = ArtemisMessagingService.makeRecipient(networkMapAddress), + identity = Party( + name = networkMapName, + owningKey = generateKeyPair().public + ), + advertisedServices = setOf(NetworkMapService.Type) + ) + networkMapCache.addMapService(messagingService, nodeInfo, true) + networkMapNodeInfo = poll { + networkMapCache.partyNodes.forEach { + if (it.identity.name == networkMapName) { + return@poll it + } + } + null + } + } + + companion object { + + private val city = arrayOf( + "London", + "Paris", + "New York", + "Tokyo" + ) + private val name = arrayOf( + "Alice", + "Bob", + "EvilBank", + "NotSoEvilBank" + ) + private fun pickA(array: Array): A = array[Math.abs(Random().nextInt()) % array.size] + + private fun startNode(cliParams: NodeRunner.CliParams): Process { + val className = NodeRunner::class.java.canonicalName + val separator = System.getProperty("file.separator") + val classpath = System.getProperty("java.class.path") + val path = System.getProperty("java.home") + separator + "bin" + separator + "java" + val javaArgs = listOf(path) + + listOf("-Dname=${cliParams.legalName}", "-javaagent:lib/quasar.jar", "-cp", classpath, className) + + cliParams.toCliArguments() + val builder = ProcessBuilder(javaArgs) + builder.redirectError(Paths.get("error.$className.log").toFile()) + builder.inheritIO() + val process = builder.start() + poll { + try { + Socket(cliParams.messagingAddress.hostText, cliParams.messagingAddress.port).close() + Unit + } catch (_exception: SocketException) { + null + } + } + + return process + } + } +} diff --git a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt new file mode 100644 index 0000000000..896111ce5d --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt @@ -0,0 +1,165 @@ +package com.r3corda.node.driver + +import com.google.common.net.HostAndPort +import com.r3corda.core.crypto.Party +import com.r3corda.core.crypto.toBase58String +import com.r3corda.core.crypto.toPublicKey +import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.services.ServiceType +import com.r3corda.node.internal.Node +import com.r3corda.node.services.config.NodeConfiguration +import com.r3corda.node.services.messaging.ArtemisMessagingService +import com.r3corda.node.services.network.NetworkMapService +import joptsimple.ArgumentAcceptingOptionSpec +import joptsimple.OptionParser +import joptsimple.OptionSet +import java.nio.file.Path +import java.nio.file.Paths +import java.security.PublicKey +import java.util.* + +class NodeRunner { + companion object { + @JvmStatic fun main(arguments: Array) { + val cliParams = CliParams.parse(CliParams.parser.parse(*arguments)) + + val nodeDirectory = Paths.get(cliParams.baseDirectory, cliParams.legalName) + createNodeRunDirectory(nodeDirectory) + + with(cliParams) { + + val networkMapNodeInfo = + if (networkMapName != null && networkMapPublicKey != null && networkMapAddress != null) { + NodeInfo( + address = ArtemisMessagingService.makeRecipient(networkMapAddress), + identity = Party( + name = networkMapName, + owningKey = networkMapPublicKey + ), + advertisedServices = setOf(NetworkMapService.Type) + ) + } else { + null + } + + val node = Node( + dir = nodeDirectory, + p2pAddr = messagingAddress, + webServerAddr = apiAddress, + configuration = object : NodeConfiguration { + override val myLegalName = legalName + override val exportJMXto = "" + override val nearestCity = cliParams.nearestCity + override val keyStorePassword = "keypass" + override val trustStorePassword = "trustpass" + }, + networkMapAddress = networkMapNodeInfo, + advertisedServices = services.toSet() + ) + + + println("Starting $legalName with services $services on addresses $messagingAddress and $apiAddress") + node.start() + } + } + } + + class CliParams ( + val services: Set, + val networkMapName: String?, + val networkMapPublicKey: PublicKey?, + val networkMapAddress: HostAndPort?, + val messagingAddress: HostAndPort, + val apiAddress: HostAndPort, + val baseDirectory: String, + val legalName: String, + val nearestCity: String + ) { + + companion object { + val parser = OptionParser() + val services = + parser.accepts("services").withRequiredArg().ofType(String::class.java) + val networkMapName = + parser.accepts("network-map-name").withOptionalArg().ofType(String::class.java) + val networkMapPublicKey = + parser.accepts("network-map-public-key").withOptionalArg().ofType(String::class.java) + val networkMapAddress = + parser.accepts("network-map-address").withOptionalArg().ofType(String::class.java) + val messagingAddress = + parser.accepts("messaging-address").withRequiredArg().ofType(String::class.java) + val apiAddress = + parser.accepts("api-address").withRequiredArg().ofType(String::class.java) + val baseDirectory = + parser.accepts("base-directory").withRequiredArg().ofType(String::class.java) + val nearestCity = + parser.accepts("nearest-city").withRequiredArg().ofType(String::class.java) + val legalName = + parser.accepts("legal-name").withRequiredArg().ofType(String::class.java) + + private fun requiredArgument(optionSet: OptionSet, spec: ArgumentAcceptingOptionSpec) = + optionSet.valueOf(spec) ?: throw IllegalArgumentException("Must provide $spec") + + fun parse(optionSet: OptionSet): CliParams { + val services = optionSet.valuesOf(services) + if (services.isEmpty()) { + throw IllegalArgumentException("Must provide at least one --services") + } + val networkMapName = optionSet.valueOf(networkMapName) + val networkMapPublicKey = optionSet.valueOf(networkMapPublicKey)?.toPublicKey() + val networkMapAddress = optionSet.valueOf(networkMapAddress) + val messagingAddress = requiredArgument(optionSet, messagingAddress) + val apiAddress = requiredArgument(optionSet, apiAddress) + val baseDirectory = requiredArgument(optionSet, baseDirectory) + val nearestCity = requiredArgument(optionSet, nearestCity) + val legalName = requiredArgument(optionSet, legalName) + + return CliParams( + services = services.map { object : ServiceType(it) {} }.toSet(), + messagingAddress = HostAndPort.fromString(messagingAddress), + apiAddress = HostAndPort.fromString(apiAddress), + baseDirectory = baseDirectory, + networkMapName = networkMapName, + networkMapPublicKey = networkMapPublicKey, + networkMapAddress = networkMapAddress?.let { HostAndPort.fromString(it) }, + legalName = legalName, + nearestCity = nearestCity + ) + } + } + + fun toCliArguments(): List { + val cliArguments = LinkedList() + cliArguments.add("--services") + cliArguments.addAll(services.map { it.toString() }) + if (networkMapName != null) { + cliArguments.add("--network-map-name") + cliArguments.add(networkMapName) + } + if (networkMapPublicKey != null) { + cliArguments.add("--network-map-public-key") + cliArguments.add(networkMapPublicKey.toBase58String()) + } + if (networkMapAddress != null) { + cliArguments.add("--network-map-address") + cliArguments.add(networkMapAddress.toString()) + } + cliArguments.add("--messaging-address") + cliArguments.add(messagingAddress.toString()) + cliArguments.add("--api-address") + cliArguments.add(apiAddress.toString()) + cliArguments.add("--base-directory") + cliArguments.add(baseDirectory.toString()) + cliArguments.add("--nearest-city") + cliArguments.add(nearestCity) + cliArguments.add("--legal-name") + cliArguments.add(legalName) + return cliArguments + } + } +} + +fun createNodeRunDirectory(directory: Path) { + directory.toFile().mkdirs() +} +