From e42d8c2e8f18b8525dd73676eb6b14bd6f7df02d Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Fri, 29 Jul 2016 16:42:32 +0100 Subject: [PATCH 01/20] node: Add driver DSL for starting up nodes --- .../kotlin/com/r3corda/node/driver/Driver.kt | 244 ++++++++++++++++++ .../com/r3corda/node/driver/NodeRunner.kt | 165 ++++++++++++ 2 files changed, 409 insertions(+) create mode 100644 node/src/main/kotlin/com/r3corda/node/driver/Driver.kt create mode 100644 node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt new file mode 100644 index 0000000000..c30a3109cb --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -0,0 +1,244 @@ +package com.r3corda.node.driver + +import com.google.common.net.HostAndPort +import com.r3corda.core.crypto.Party +import com.r3corda.core.crypto.generateKeyPair +import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.services.ServiceType +import com.r3corda.node.driver.NodeRunner +import com.r3corda.node.services.config.NodeConfiguration +import com.r3corda.node.services.messaging.ArtemisMessagingService +import com.r3corda.node.services.network.InMemoryNetworkMapCache +import com.r3corda.node.services.network.NetworkMapService +import java.net.Socket +import java.net.SocketException +import java.nio.file.Paths +import java.text.SimpleDateFormat +import java.util.* +import java.util.concurrent.TimeUnit + +/** + * This file defines a small "Driver" DSL for starting up nodes. + * + * The process the driver is run behaves as an Artemis client and starts up other processes. Namely it first + * bootstraps a network map service to allow the specified nodes to connect to, then starts up the actual nodes/explorers. + * + * Usage: + * driver { + * startNode(setOf(NotaryService.Type), "Notary") + * val aliceMonitor = startNode(setOf(WalletMonitorService.Type), "Alice") + * startExplorer(aliceMonitor) + * } + * + * The base directory node directories go into may be specified in [driver] and defaults to "build//". + * The node directories themselves are "//", where legalName defaults to + * "-" and may be specified in [DriverDSL.startNode]. + * + * TODO The driver actually starts up as an Artemis server now that may route traffic. Fix this once the client MessagingService is done. + * TODO The nodes are started up sequentially which is quite slow. Either speed up node startup or make startup parallel somehow. + * TODO The driver now polls the network map cache for info about newly started up nodes, this could be done asynchronously(?). + * TODO The network map service bootstrap is hacky (needs to fake the service's public key in order to retrieve the true one), needs some thought. + */ + +fun driver(baseDirectory: String? = null, dsl: DriverDSL.() -> A): Pair { + val driverDsl = DriverDSL(10000, baseDirectory ?: "build/${getTimestampAsDirectoryName()}") + driverDsl.start() + val returnValue = dsl(driverDsl) + val shutdownHook = Thread({ + driverDsl.shutdown() + }) + Runtime.getRuntime().addShutdownHook(shutdownHook) + return Pair(DriverHandle(driverDsl, shutdownHook), returnValue) +} + +private fun getTimestampAsDirectoryName(): String { + val tz = TimeZone.getTimeZone("UTC") + val df = SimpleDateFormat("yyyyMMddHHmmss") + df.timeZone = tz + return df.format(Date()) +} + +class DriverHandle(private val driverDsl: DriverDSL, private val shutdownHook: Thread) { + val messagingService = driverDsl.messagingService + + fun waitForAllNodesToFinish() { + driverDsl.waitForAllNodesToFinish() + } + + fun shutdown() { + driverDsl.shutdown() + Runtime.getRuntime().removeShutdownHook(shutdownHook) + } +} + +private fun poll(f: () -> A?): A { + var counter = 0 + var result = f() + while (result == null && counter < 30) { + counter++ + Thread.sleep(500) + result = f() + } + if (result == null) { + throw Exception("Poll timed out") + } + return result +} + +class DriverDSL(private var portCounter: Int, val baseDirectory: String) { + + private fun nextLocalHostAndPort() = HostAndPort.fromParts("localhost", nextPort()) + + val messagingService = ArtemisMessagingService( + Paths.get(baseDirectory, "driver-artemis"), + nextLocalHostAndPort(), + object : NodeConfiguration { + override val myLegalName = "driver-artemis" + override val exportJMXto = "" + override val nearestCity = "Zion" + override val keyStorePassword = "keypass" + override val trustStorePassword = "trustpass" + } + ) + + val networkMapCache = InMemoryNetworkMapCache(null) + private val networkMapName = "NetworkMapService" + private val networkMapAddress = nextLocalHostAndPort() + private lateinit var networkMapNodeInfo: NodeInfo + private val registeredProcesses = LinkedList() + + private fun nextPort(): Int { + val nextPort = portCounter + portCounter++ + return nextPort + } + + fun registerProcess(process: Process) = registeredProcesses.push(process) + + internal fun waitForAllNodesToFinish() { + registeredProcesses.forEach { + it.waitFor() + } + } + + internal fun shutdown() { + registeredProcesses.forEach { + it.destroy() + } + registeredProcesses.forEach { + if (!it.waitFor(5, TimeUnit.SECONDS)) { + it.destroyForcibly() + } + } + } + + fun startNode(advertisedServices: Set, providedName: String? = null): NodeInfo { + val messagingAddress = nextLocalHostAndPort() + val name = providedName ?: "${pickA(name)}-${messagingAddress.port}" + val nearestCity = pickA(city) + + val driverCliParams = NodeRunner.CliParams( + services = advertisedServices, + networkMapName = networkMapNodeInfo.identity.name, + networkMapPublicKey = networkMapNodeInfo.identity.owningKey, + networkMapAddress = networkMapAddress, + messagingAddress = messagingAddress, + apiAddress = nextLocalHostAndPort(), + baseDirectory = baseDirectory, + nearestCity = nearestCity, + legalName = name + ) + registerProcess(startNode(driverCliParams)) + + return poll { + networkMapCache.partyNodes.forEach { + if (it.identity.name == name) { + return@poll it + } + } + null + } + } + + internal fun start() { + messagingService.configureWithDevSSLCertificate() + messagingService.start() + startNetworkMapService() + } + + private fun startNetworkMapService() { + val driverCliParams = NodeRunner.CliParams( + services = setOf(NetworkMapService.Type), + networkMapName = null, + networkMapPublicKey = null, + networkMapAddress = null, + messagingAddress = networkMapAddress, + apiAddress = nextLocalHostAndPort(), + baseDirectory = baseDirectory, + nearestCity = pickA(city), + legalName = networkMapName + ) + println("Starting network-map-service") + registerProcess(startNode(driverCliParams)) + // We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from + // the network map service itself + val nodeInfo = NodeInfo( + address = ArtemisMessagingService.makeRecipient(networkMapAddress), + identity = Party( + name = networkMapName, + owningKey = generateKeyPair().public + ), + advertisedServices = setOf(NetworkMapService.Type) + ) + networkMapCache.addMapService(messagingService, nodeInfo, true) + networkMapNodeInfo = poll { + networkMapCache.partyNodes.forEach { + if (it.identity.name == networkMapName) { + return@poll it + } + } + null + } + } + + companion object { + + private val city = arrayOf( + "London", + "Paris", + "New York", + "Tokyo" + ) + private val name = arrayOf( + "Alice", + "Bob", + "EvilBank", + "NotSoEvilBank" + ) + private fun pickA(array: Array): A = array[Math.abs(Random().nextInt()) % array.size] + + private fun startNode(cliParams: NodeRunner.CliParams): Process { + val className = NodeRunner::class.java.canonicalName + val separator = System.getProperty("file.separator") + val classpath = System.getProperty("java.class.path") + val path = System.getProperty("java.home") + separator + "bin" + separator + "java" + val javaArgs = listOf(path) + + listOf("-Dname=${cliParams.legalName}", "-javaagent:lib/quasar.jar", "-cp", classpath, className) + + cliParams.toCliArguments() + val builder = ProcessBuilder(javaArgs) + builder.redirectError(Paths.get("error.$className.log").toFile()) + builder.inheritIO() + val process = builder.start() + poll { + try { + Socket(cliParams.messagingAddress.hostText, cliParams.messagingAddress.port).close() + Unit + } catch (_exception: SocketException) { + null + } + } + + return process + } + } +} diff --git a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt new file mode 100644 index 0000000000..896111ce5d --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt @@ -0,0 +1,165 @@ +package com.r3corda.node.driver + +import com.google.common.net.HostAndPort +import com.r3corda.core.crypto.Party +import com.r3corda.core.crypto.toBase58String +import com.r3corda.core.crypto.toPublicKey +import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.services.ServiceType +import com.r3corda.node.internal.Node +import com.r3corda.node.services.config.NodeConfiguration +import com.r3corda.node.services.messaging.ArtemisMessagingService +import com.r3corda.node.services.network.NetworkMapService +import joptsimple.ArgumentAcceptingOptionSpec +import joptsimple.OptionParser +import joptsimple.OptionSet +import java.nio.file.Path +import java.nio.file.Paths +import java.security.PublicKey +import java.util.* + +class NodeRunner { + companion object { + @JvmStatic fun main(arguments: Array) { + val cliParams = CliParams.parse(CliParams.parser.parse(*arguments)) + + val nodeDirectory = Paths.get(cliParams.baseDirectory, cliParams.legalName) + createNodeRunDirectory(nodeDirectory) + + with(cliParams) { + + val networkMapNodeInfo = + if (networkMapName != null && networkMapPublicKey != null && networkMapAddress != null) { + NodeInfo( + address = ArtemisMessagingService.makeRecipient(networkMapAddress), + identity = Party( + name = networkMapName, + owningKey = networkMapPublicKey + ), + advertisedServices = setOf(NetworkMapService.Type) + ) + } else { + null + } + + val node = Node( + dir = nodeDirectory, + p2pAddr = messagingAddress, + webServerAddr = apiAddress, + configuration = object : NodeConfiguration { + override val myLegalName = legalName + override val exportJMXto = "" + override val nearestCity = cliParams.nearestCity + override val keyStorePassword = "keypass" + override val trustStorePassword = "trustpass" + }, + networkMapAddress = networkMapNodeInfo, + advertisedServices = services.toSet() + ) + + + println("Starting $legalName with services $services on addresses $messagingAddress and $apiAddress") + node.start() + } + } + } + + class CliParams ( + val services: Set, + val networkMapName: String?, + val networkMapPublicKey: PublicKey?, + val networkMapAddress: HostAndPort?, + val messagingAddress: HostAndPort, + val apiAddress: HostAndPort, + val baseDirectory: String, + val legalName: String, + val nearestCity: String + ) { + + companion object { + val parser = OptionParser() + val services = + parser.accepts("services").withRequiredArg().ofType(String::class.java) + val networkMapName = + parser.accepts("network-map-name").withOptionalArg().ofType(String::class.java) + val networkMapPublicKey = + parser.accepts("network-map-public-key").withOptionalArg().ofType(String::class.java) + val networkMapAddress = + parser.accepts("network-map-address").withOptionalArg().ofType(String::class.java) + val messagingAddress = + parser.accepts("messaging-address").withRequiredArg().ofType(String::class.java) + val apiAddress = + parser.accepts("api-address").withRequiredArg().ofType(String::class.java) + val baseDirectory = + parser.accepts("base-directory").withRequiredArg().ofType(String::class.java) + val nearestCity = + parser.accepts("nearest-city").withRequiredArg().ofType(String::class.java) + val legalName = + parser.accepts("legal-name").withRequiredArg().ofType(String::class.java) + + private fun requiredArgument(optionSet: OptionSet, spec: ArgumentAcceptingOptionSpec) = + optionSet.valueOf(spec) ?: throw IllegalArgumentException("Must provide $spec") + + fun parse(optionSet: OptionSet): CliParams { + val services = optionSet.valuesOf(services) + if (services.isEmpty()) { + throw IllegalArgumentException("Must provide at least one --services") + } + val networkMapName = optionSet.valueOf(networkMapName) + val networkMapPublicKey = optionSet.valueOf(networkMapPublicKey)?.toPublicKey() + val networkMapAddress = optionSet.valueOf(networkMapAddress) + val messagingAddress = requiredArgument(optionSet, messagingAddress) + val apiAddress = requiredArgument(optionSet, apiAddress) + val baseDirectory = requiredArgument(optionSet, baseDirectory) + val nearestCity = requiredArgument(optionSet, nearestCity) + val legalName = requiredArgument(optionSet, legalName) + + return CliParams( + services = services.map { object : ServiceType(it) {} }.toSet(), + messagingAddress = HostAndPort.fromString(messagingAddress), + apiAddress = HostAndPort.fromString(apiAddress), + baseDirectory = baseDirectory, + networkMapName = networkMapName, + networkMapPublicKey = networkMapPublicKey, + networkMapAddress = networkMapAddress?.let { HostAndPort.fromString(it) }, + legalName = legalName, + nearestCity = nearestCity + ) + } + } + + fun toCliArguments(): List { + val cliArguments = LinkedList() + cliArguments.add("--services") + cliArguments.addAll(services.map { it.toString() }) + if (networkMapName != null) { + cliArguments.add("--network-map-name") + cliArguments.add(networkMapName) + } + if (networkMapPublicKey != null) { + cliArguments.add("--network-map-public-key") + cliArguments.add(networkMapPublicKey.toBase58String()) + } + if (networkMapAddress != null) { + cliArguments.add("--network-map-address") + cliArguments.add(networkMapAddress.toString()) + } + cliArguments.add("--messaging-address") + cliArguments.add(messagingAddress.toString()) + cliArguments.add("--api-address") + cliArguments.add(apiAddress.toString()) + cliArguments.add("--base-directory") + cliArguments.add(baseDirectory.toString()) + cliArguments.add("--nearest-city") + cliArguments.add(nearestCity) + cliArguments.add("--legal-name") + cliArguments.add(legalName) + return cliArguments + } + } +} + +fun createNodeRunDirectory(directory: Path) { + directory.toFile().mkdirs() +} + From a317a5bfd24feee63ac25a4d293ff854be15a9c6 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Mon, 1 Aug 2016 10:35:49 +0100 Subject: [PATCH 02/20] node: Expose some Driver functions to allow extension, some cleanup --- .../kotlin/com/r3corda/node/driver/Driver.kt | 31 +++++++++++++------ 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index c30a3109cb..95ef619bd8 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -5,7 +5,6 @@ import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.generateKeyPair import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.ServiceType -import com.r3corda.node.driver.NodeRunner import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.messaging.ArtemisMessagingService import com.r3corda.node.services.network.InMemoryNetworkMapCache @@ -15,7 +14,9 @@ import java.net.SocketException import java.nio.file.Paths import java.text.SimpleDateFormat import java.util.* +import java.util.concurrent.Executors import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException /** * This file defines a small "Driver" DSL for starting up nodes. @@ -27,7 +28,6 @@ import java.util.concurrent.TimeUnit * driver { * startNode(setOf(NotaryService.Type), "Notary") * val aliceMonitor = startNode(setOf(WalletMonitorService.Type), "Alice") - * startExplorer(aliceMonitor) * } * * The base directory node directories go into may be specified in [driver] and defaults to "build//". @@ -40,6 +40,10 @@ import java.util.concurrent.TimeUnit * TODO The network map service bootstrap is hacky (needs to fake the service's public key in order to retrieve the true one), needs some thought. */ +interface DriverDSLInterface { + fun startNode(advertisedServices: Set, providedName: String? = null): NodeInfo +} + fun driver(baseDirectory: String? = null, dsl: DriverDSL.() -> A): Pair { val driverDsl = DriverDSL(10000, baseDirectory ?: "build/${getTimestampAsDirectoryName()}") driverDsl.start() @@ -85,9 +89,9 @@ private fun poll(f: () -> A?): A { return result } -class DriverDSL(private var portCounter: Int, val baseDirectory: String) { +class DriverDSL(private var portCounter: Int, val baseDirectory: String) : DriverDSLInterface { - private fun nextLocalHostAndPort() = HostAndPort.fromParts("localhost", nextPort()) + fun nextLocalHostAndPort() = HostAndPort.fromParts("localhost", nextPort()) val messagingService = ArtemisMessagingService( Paths.get(baseDirectory, "driver-artemis"), @@ -125,14 +129,21 @@ class DriverDSL(private var portCounter: Int, val baseDirectory: String) { registeredProcesses.forEach { it.destroy() } - registeredProcesses.forEach { - if (!it.waitFor(5, TimeUnit.SECONDS)) { + /** Wait 5 seconds, then [Process.destroyForcibly] */ + val finishedFuture = Executors.newSingleThreadExecutor().submit { + waitForAllNodesToFinish() + } + try { + finishedFuture.get(5, TimeUnit.SECONDS) + } catch (exception: TimeoutException) { + finishedFuture.cancel(true) + registeredProcesses.forEach { it.destroyForcibly() } } } - fun startNode(advertisedServices: Set, providedName: String? = null): NodeInfo { + override fun startNode(advertisedServices: Set, providedName: String?): NodeInfo { val messagingAddress = nextLocalHostAndPort() val name = providedName ?: "${pickA(name)}-${messagingAddress.port}" val nearestCity = pickA(city) @@ -203,19 +214,19 @@ class DriverDSL(private var portCounter: Int, val baseDirectory: String) { companion object { - private val city = arrayOf( + val city = arrayOf( "London", "Paris", "New York", "Tokyo" ) - private val name = arrayOf( + val name = arrayOf( "Alice", "Bob", "EvilBank", "NotSoEvilBank" ) - private fun pickA(array: Array): A = array[Math.abs(Random().nextInt()) % array.size] + fun pickA(array: Array): A = array[Math.abs(Random().nextInt()) % array.size] private fun startNode(cliParams: NodeRunner.CliParams): Process { val className = NodeRunner::class.java.canonicalName From 5f5a5e683d15a50820847a066e70d2d2b3880187 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Mon, 1 Aug 2016 14:06:35 +0100 Subject: [PATCH 03/20] node: Add quasarPath parameter to driver dsl (should be replaced once we can do without the external quasar.jar) --- .../kotlin/com/r3corda/node/driver/Driver.kt | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index 95ef619bd8..5f7d9bed7f 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -44,8 +44,12 @@ interface DriverDSLInterface { fun startNode(advertisedServices: Set, providedName: String? = null): NodeInfo } -fun driver(baseDirectory: String? = null, dsl: DriverDSL.() -> A): Pair { - val driverDsl = DriverDSL(10000, baseDirectory ?: "build/${getTimestampAsDirectoryName()}") +fun driver(baseDirectory: String? = null, quasarPath: String? = null, dsl: DriverDSL.() -> A): Pair { + val driverDsl = DriverDSL( + portCounter = 10000, + baseDirectory = baseDirectory ?: "build/${getTimestampAsDirectoryName()}", + quasarPath = quasarPath ?: "lib/quasar.jar" + ) driverDsl.start() val returnValue = dsl(driverDsl) val shutdownHook = Thread({ @@ -89,7 +93,7 @@ private fun poll(f: () -> A?): A { return result } -class DriverDSL(private var portCounter: Int, val baseDirectory: String) : DriverDSLInterface { +class DriverDSL(private var portCounter: Int, val baseDirectory: String, val quasarPath: String) : DriverDSLInterface { fun nextLocalHostAndPort() = HostAndPort.fromParts("localhost", nextPort()) @@ -159,7 +163,7 @@ class DriverDSL(private var portCounter: Int, val baseDirectory: String) : Drive nearestCity = nearestCity, legalName = name ) - registerProcess(startNode(driverCliParams)) + registerProcess(startNode(driverCliParams, quasarPath)) return poll { networkMapCache.partyNodes.forEach { @@ -190,7 +194,7 @@ class DriverDSL(private var portCounter: Int, val baseDirectory: String) : Drive legalName = networkMapName ) println("Starting network-map-service") - registerProcess(startNode(driverCliParams)) + registerProcess(startNode(driverCliParams, quasarPath)) // We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from // the network map service itself val nodeInfo = NodeInfo( @@ -228,13 +232,13 @@ class DriverDSL(private var portCounter: Int, val baseDirectory: String) : Drive ) fun pickA(array: Array): A = array[Math.abs(Random().nextInt()) % array.size] - private fun startNode(cliParams: NodeRunner.CliParams): Process { + private fun startNode(cliParams: NodeRunner.CliParams, quasarPath: String): Process { val className = NodeRunner::class.java.canonicalName val separator = System.getProperty("file.separator") val classpath = System.getProperty("java.class.path") val path = System.getProperty("java.home") + separator + "bin" + separator + "java" val javaArgs = listOf(path) + - listOf("-Dname=${cliParams.legalName}", "-javaagent:lib/quasar.jar", "-cp", classpath, className) + + listOf("-Dname=${cliParams.legalName}", "-javaagent:$quasarPath", "-cp", classpath, className) + cliParams.toCliArguments() val builder = ProcessBuilder(javaArgs) builder.redirectError(Paths.get("error.$className.log").toFile()) From 3bc62fdb950df84e42f9617433e6cb57ca9f759d Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Mon, 1 Aug 2016 14:07:59 +0100 Subject: [PATCH 04/20] node: Add driver dsl test, expose some needed functions --- .../kotlin/com/r3corda/node/driver/Driver.kt | 3 +- .../com/r3corda/node/driver/DriverTests.kt | 48 +++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) create mode 100644 node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index 5f7d9bed7f..f89c6c173e 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -68,6 +68,7 @@ private fun getTimestampAsDirectoryName(): String { class DriverHandle(private val driverDsl: DriverDSL, private val shutdownHook: Thread) { val messagingService = driverDsl.messagingService + val networkMapCache = driverDsl.networkMapCache fun waitForAllNodesToFinish() { driverDsl.waitForAllNodesToFinish() @@ -79,7 +80,7 @@ class DriverHandle(private val driverDsl: DriverDSL, private val shutdownHook: T } } -private fun poll(f: () -> A?): A { +fun poll(f: () -> A?): A { var counter = 0 var result = f() while (result == null && counter < 30) { diff --git a/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt new file mode 100644 index 0000000000..1c34558226 --- /dev/null +++ b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt @@ -0,0 +1,48 @@ +package com.r3corda.node.driver + +import com.r3corda.node.services.messaging.ArtemisMessagingService +import com.r3corda.node.services.transactions.NotaryService +import org.junit.Test +import java.net.Socket +import java.net.SocketException + + +class DriverTests { + + @Test + fun simpleNodeStartupShutdownWorks() { + + // Start a notary + val (handle, notaryNodeInfo) = driver(quasarPath = "../lib/quasar.jar") { + startNode(setOf(NotaryService.Type), "TestNotary") + } + // Check that the node is registered in the network map + poll { + handle.networkMapCache.get(NotaryService.Type).firstOrNull { + it.identity.name == "TestNotary" + } + } + // Check that the port is bound + val address = notaryNodeInfo.address as ArtemisMessagingService.Address + poll { + try { + Socket(address.hostAndPort.hostText, address.hostAndPort.port).close() + Unit + } catch (_exception: SocketException) { + null + } + } + + // Shutdown + handle.shutdown() + // Check that the port is not bound + poll { + try { + Socket(address.hostAndPort.hostText, address.hostAndPort.port).close() + null + } catch (_exception: SocketException) { + Unit + } + } + } +} From 8cf635cf74e205e296c466207416ec1ad3a23745 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Mon, 1 Aug 2016 14:08:51 +0100 Subject: [PATCH 05/20] node: Expose ArtemisMessagingService.Address --- .../kotlin/com/r3corda/node/driver/Driver.kt | 62 ++++++++++--------- .../com/r3corda/node/driver/NodeRunner.kt | 8 +-- .../messaging/ArtemisMessagingComponent.kt | 2 +- .../com/r3corda/node/driver/DriverTests.kt | 4 +- 4 files changed, 39 insertions(+), 37 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index f89c6c173e..248dcb0d1a 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -6,7 +6,8 @@ import com.r3corda.core.crypto.generateKeyPair import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.ServiceType import com.r3corda.node.services.config.NodeConfiguration -import com.r3corda.node.services.messaging.ArtemisMessagingService +import com.r3corda.node.services.messaging.ArtemisMessagingClient +import com.r3corda.node.services.messaging.ArtemisMessagingComponent import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.NetworkMapService import java.net.Socket @@ -98,24 +99,25 @@ class DriverDSL(private var portCounter: Int, val baseDirectory: String, val qua fun nextLocalHostAndPort() = HostAndPort.fromParts("localhost", nextPort()) - val messagingService = ArtemisMessagingService( + val networkMapCache = InMemoryNetworkMapCache(null) + private val networkMapName = "NetworkMapService" + private val networkMapAddress = nextLocalHostAndPort() + private lateinit var networkMapNodeInfo: NodeInfo + private val registeredProcesses = LinkedList() + + val messagingService = ArtemisMessagingClient( Paths.get(baseDirectory, "driver-artemis"), - nextLocalHostAndPort(), object : NodeConfiguration { override val myLegalName = "driver-artemis" override val exportJMXto = "" override val nearestCity = "Zion" override val keyStorePassword = "keypass" override val trustStorePassword = "trustpass" - } + }, + serverHostPort = networkMapAddress, + myHostPort = nextLocalHostAndPort() ) - val networkMapCache = InMemoryNetworkMapCache(null) - private val networkMapName = "NetworkMapService" - private val networkMapAddress = nextLocalHostAndPort() - private lateinit var networkMapNodeInfo: NodeInfo - private val registeredProcesses = LinkedList() - private fun nextPort(): Int { val nextPort = portCounter portCounter++ @@ -177,9 +179,28 @@ class DriverDSL(private var portCounter: Int, val baseDirectory: String, val qua } internal fun start() { + startNetworkMapService() messagingService.configureWithDevSSLCertificate() messagingService.start() - startNetworkMapService() + // We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from + // the network map service itself + val nodeInfo = NodeInfo( + address = ArtemisMessagingClient.makeRecipient(networkMapAddress), + identity = Party( + name = networkMapName, + owningKey = generateKeyPair().public + ), + advertisedServices = setOf(NetworkMapService.Type) + ) + networkMapCache.addMapService(messagingService, nodeInfo, true) + networkMapNodeInfo = poll { + networkMapCache.partyNodes.forEach { + if (it.identity.name == networkMapName) { + return@poll it + } + } + null + } } private fun startNetworkMapService() { @@ -196,25 +217,6 @@ class DriverDSL(private var portCounter: Int, val baseDirectory: String, val qua ) println("Starting network-map-service") registerProcess(startNode(driverCliParams, quasarPath)) - // We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from - // the network map service itself - val nodeInfo = NodeInfo( - address = ArtemisMessagingService.makeRecipient(networkMapAddress), - identity = Party( - name = networkMapName, - owningKey = generateKeyPair().public - ), - advertisedServices = setOf(NetworkMapService.Type) - ) - networkMapCache.addMapService(messagingService, nodeInfo, true) - networkMapNodeInfo = poll { - networkMapCache.partyNodes.forEach { - if (it.identity.name == networkMapName) { - return@poll it - } - } - null - } } companion object { diff --git a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt index 896111ce5d..1fa733aec3 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt @@ -2,13 +2,13 @@ package com.r3corda.node.driver import com.google.common.net.HostAndPort import com.r3corda.core.crypto.Party +import com.r3corda.core.crypto.parsePublicKeyBase58 import com.r3corda.core.crypto.toBase58String -import com.r3corda.core.crypto.toPublicKey import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.ServiceType import com.r3corda.node.internal.Node import com.r3corda.node.services.config.NodeConfiguration -import com.r3corda.node.services.messaging.ArtemisMessagingService +import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.network.NetworkMapService import joptsimple.ArgumentAcceptingOptionSpec import joptsimple.OptionParser @@ -31,7 +31,7 @@ class NodeRunner { val networkMapNodeInfo = if (networkMapName != null && networkMapPublicKey != null && networkMapAddress != null) { NodeInfo( - address = ArtemisMessagingService.makeRecipient(networkMapAddress), + address = ArtemisMessagingClient.makeRecipient(networkMapAddress), identity = Party( name = networkMapName, owningKey = networkMapPublicKey @@ -106,7 +106,7 @@ class NodeRunner { throw IllegalArgumentException("Must provide at least one --services") } val networkMapName = optionSet.valueOf(networkMapName) - val networkMapPublicKey = optionSet.valueOf(networkMapPublicKey)?.toPublicKey() + val networkMapPublicKey = optionSet.valueOf(networkMapPublicKey)?.let { parsePublicKeyBase58(it) } val networkMapAddress = optionSet.valueOf(networkMapAddress) val messagingAddress = requiredArgument(optionSet, messagingAddress) val apiAddress = requiredArgument(optionSet, apiAddress) diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingComponent.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingComponent.kt index cb190f0af2..39231d4e09 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingComponent.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingComponent.kt @@ -23,7 +23,7 @@ abstract class ArtemisMessagingComponent(val directory: Path, val config: NodeCo private val trustStorePath = directory.resolve("certificates").resolve("truststore.jks") // In future: can contain onion routing info, etc. - protected data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient + data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient protected enum class ConnectionDirection { INBOUND, OUTBOUND } diff --git a/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt index 1c34558226..8af22c0e43 100644 --- a/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt @@ -1,6 +1,6 @@ package com.r3corda.node.driver -import com.r3corda.node.services.messaging.ArtemisMessagingService +import com.r3corda.node.services.messaging.ArtemisMessagingComponent import com.r3corda.node.services.transactions.NotaryService import org.junit.Test import java.net.Socket @@ -23,7 +23,7 @@ class DriverTests { } } // Check that the port is bound - val address = notaryNodeInfo.address as ArtemisMessagingService.Address + val address = notaryNodeInfo.address as ArtemisMessagingComponent.Address poll { try { Socket(address.hostAndPort.hostText, address.hostAndPort.port).close() From 35da6b538865c28ae2d2bbc03d56f580cfe61206 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Mon, 1 Aug 2016 18:31:19 +0100 Subject: [PATCH 06/20] node: Add port allocation strategies to driver dsl, add jvm debug port --- .../kotlin/com/r3corda/node/driver/Driver.kt | 71 +++++++++++++------ 1 file changed, 48 insertions(+), 23 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index 248dcb0d1a..4a9a11853d 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -7,9 +7,9 @@ import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.ServiceType import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.messaging.ArtemisMessagingClient -import com.r3corda.node.services.messaging.ArtemisMessagingComponent import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.NetworkMapService +import java.net.ServerSocket import java.net.Socket import java.net.SocketException import java.nio.file.Paths @@ -45,11 +45,33 @@ interface DriverDSLInterface { fun startNode(advertisedServices: Set, providedName: String? = null): NodeInfo } -fun driver(baseDirectory: String? = null, quasarPath: String? = null, dsl: DriverDSL.() -> A): Pair { +sealed class PortAllocation { + abstract fun nextPort(): Int + fun nextHostAndPort() = HostAndPort.fromParts("localhost", nextPort()) + + class Incremental(private var portCounter: Int) : PortAllocation() { + override fun nextPort() = portCounter++ + } + class RandomFree(): PortAllocation() { + override fun nextPort() = ServerSocket(0).use { it.localPort } + } +} + +/** + * TODO: remove quasarJarPath once we have a proper way of bundling quasar + */ +fun driver( + baseDirectory: String = "build/${getTimestampAsDirectoryName()}", + quasarJarPath: String = "lib/quasar.jar", + portAllocation: PortAllocation = PortAllocation.Incremental(10000), + debugPortAllocation: PortAllocation = PortAllocation.Incremental(5005), + dsl: DriverDSL.() -> A +): Pair { val driverDsl = DriverDSL( - portCounter = 10000, - baseDirectory = baseDirectory ?: "build/${getTimestampAsDirectoryName()}", - quasarPath = quasarPath ?: "lib/quasar.jar" + portAllocation = portAllocation, + debugPortAllocation = debugPortAllocation, + baseDirectory = baseDirectory, + quasarJarPath = quasarJarPath ) driverDsl.start() val returnValue = dsl(driverDsl) @@ -95,13 +117,16 @@ fun poll(f: () -> A?): A { return result } -class DriverDSL(private var portCounter: Int, val baseDirectory: String, val quasarPath: String) : DriverDSLInterface { - - fun nextLocalHostAndPort() = HostAndPort.fromParts("localhost", nextPort()) +class DriverDSL( + private val portAllocation: PortAllocation, + private val debugPortAllocation: PortAllocation, + val baseDirectory: String, + val quasarJarPath: String +) : DriverDSLInterface { val networkMapCache = InMemoryNetworkMapCache(null) private val networkMapName = "NetworkMapService" - private val networkMapAddress = nextLocalHostAndPort() + private val networkMapAddress = portAllocation.nextHostAndPort() private lateinit var networkMapNodeInfo: NodeInfo private val registeredProcesses = LinkedList() @@ -115,15 +140,9 @@ class DriverDSL(private var portCounter: Int, val baseDirectory: String, val qua override val trustStorePassword = "trustpass" }, serverHostPort = networkMapAddress, - myHostPort = nextLocalHostAndPort() + myHostPort = portAllocation.nextHostAndPort() ) - private fun nextPort(): Int { - val nextPort = portCounter - portCounter++ - return nextPort - } - fun registerProcess(process: Process) = registeredProcesses.push(process) internal fun waitForAllNodesToFinish() { @@ -151,7 +170,9 @@ class DriverDSL(private var portCounter: Int, val baseDirectory: String, val qua } override fun startNode(advertisedServices: Set, providedName: String?): NodeInfo { - val messagingAddress = nextLocalHostAndPort() + val messagingAddress = portAllocation.nextHostAndPort() + val apiAddress = portAllocation.nextHostAndPort() + val debugPort = debugPortAllocation.nextPort() val name = providedName ?: "${pickA(name)}-${messagingAddress.port}" val nearestCity = pickA(city) @@ -161,12 +182,12 @@ class DriverDSL(private var portCounter: Int, val baseDirectory: String, val qua networkMapPublicKey = networkMapNodeInfo.identity.owningKey, networkMapAddress = networkMapAddress, messagingAddress = messagingAddress, - apiAddress = nextLocalHostAndPort(), + apiAddress = apiAddress, baseDirectory = baseDirectory, nearestCity = nearestCity, legalName = name ) - registerProcess(startNode(driverCliParams, quasarPath)) + registerProcess(startNode(driverCliParams, quasarJarPath, debugPort)) return poll { networkMapCache.partyNodes.forEach { @@ -204,19 +225,21 @@ class DriverDSL(private var portCounter: Int, val baseDirectory: String, val qua } private fun startNetworkMapService() { + val apiAddress = portAllocation.nextHostAndPort() + val debugPort = debugPortAllocation.nextPort() val driverCliParams = NodeRunner.CliParams( services = setOf(NetworkMapService.Type), networkMapName = null, networkMapPublicKey = null, networkMapAddress = null, messagingAddress = networkMapAddress, - apiAddress = nextLocalHostAndPort(), + apiAddress = apiAddress, baseDirectory = baseDirectory, nearestCity = pickA(city), legalName = networkMapName ) println("Starting network-map-service") - registerProcess(startNode(driverCliParams, quasarPath)) + registerProcess(startNode(driverCliParams, quasarJarPath, debugPort)) } companion object { @@ -235,13 +258,15 @@ class DriverDSL(private var portCounter: Int, val baseDirectory: String, val qua ) fun pickA(array: Array): A = array[Math.abs(Random().nextInt()) % array.size] - private fun startNode(cliParams: NodeRunner.CliParams, quasarPath: String): Process { + private fun startNode(cliParams: NodeRunner.CliParams, quasarJarPath: String, debugPort: Int): Process { val className = NodeRunner::class.java.canonicalName val separator = System.getProperty("file.separator") val classpath = System.getProperty("java.class.path") val path = System.getProperty("java.home") + separator + "bin" + separator + "java" val javaArgs = listOf(path) + - listOf("-Dname=${cliParams.legalName}", "-javaagent:$quasarPath", "-cp", classpath, className) + + listOf("-Dname=${cliParams.legalName}", "-javaagent:$quasarJarPath", + "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$debugPort", + "-cp", classpath, className) + cliParams.toCliArguments() val builder = ProcessBuilder(javaArgs) builder.redirectError(Paths.get("error.$className.log").toFile()) From 303858c3e457a0ed4244300ee8fd1d54f788f05d Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Tue, 2 Aug 2016 10:22:42 +0100 Subject: [PATCH 07/20] node: Use proper logging in Driver --- node/src/main/kotlin/com/r3corda/node/driver/Driver.kt | 6 +++++- node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt | 7 +++++-- {src => node/src}/main/resources/reference.conf | 0 3 files changed, 10 insertions(+), 3 deletions(-) rename {src => node/src}/main/resources/reference.conf (100%) diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index 4a9a11853d..2892a24b54 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -9,6 +9,8 @@ import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.NetworkMapService +import org.slf4j.Logger +import org.slf4j.LoggerFactory import java.net.ServerSocket import java.net.Socket import java.net.SocketException @@ -57,6 +59,8 @@ sealed class PortAllocation { } } +private val log: Logger = LoggerFactory.getLogger("Driver") + /** * TODO: remove quasarJarPath once we have a proper way of bundling quasar */ @@ -238,7 +242,7 @@ class DriverDSL( nearestCity = pickA(city), legalName = networkMapName ) - println("Starting network-map-service") + log.info("Starting network-map-service") registerProcess(startNode(driverCliParams, quasarJarPath, debugPort)) } diff --git a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt index 1fa733aec3..2214d40f67 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt @@ -13,11 +13,15 @@ import com.r3corda.node.services.network.NetworkMapService import joptsimple.ArgumentAcceptingOptionSpec import joptsimple.OptionParser import joptsimple.OptionSet +import org.slf4j.Logger +import org.slf4j.LoggerFactory import java.nio.file.Path import java.nio.file.Paths import java.security.PublicKey import java.util.* +private val log: Logger = LoggerFactory.getLogger("NodeRunner") + class NodeRunner { companion object { @JvmStatic fun main(arguments: Array) { @@ -57,8 +61,7 @@ class NodeRunner { advertisedServices = services.toSet() ) - - println("Starting $legalName with services $services on addresses $messagingAddress and $apiAddress") + log.info("Starting $legalName with services $services on addresses $messagingAddress and $apiAddress") node.start() } } diff --git a/src/main/resources/reference.conf b/node/src/main/resources/reference.conf similarity index 100% rename from src/main/resources/reference.conf rename to node/src/main/resources/reference.conf From 773d53b4c8d586ec1876a2c406f7b363218baac3 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Tue, 2 Aug 2016 10:23:27 +0100 Subject: [PATCH 08/20] node: Load config from disk in Driver, further cleanup --- .../kotlin/com/r3corda/node/driver/Driver.kt | 39 +++++++++------- .../com/r3corda/node/driver/NodeRunner.kt | 44 ++++++++++--------- .../node/services/config/NodeConfiguration.kt | 18 +++++++- .../com/r3corda/node/driver/DriverTests.kt | 2 +- 4 files changed, 65 insertions(+), 38 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index 2892a24b54..76a3016e23 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -7,10 +7,15 @@ import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.ServiceType import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.messaging.ArtemisMessagingClient +import com.r3corda.node.services.config.NodeConfigurationFromConfig +import com.r3corda.node.services.config.copy import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.NetworkMapService +import com.typesafe.config.ConfigFactory +import com.typesafe.config.ConfigParseOptions import org.slf4j.Logger import org.slf4j.LoggerFactory +import java.io.File import java.net.ServerSocket import java.net.Socket import java.net.SocketException @@ -66,6 +71,7 @@ private val log: Logger = LoggerFactory.getLogger("Driver") */ fun driver( baseDirectory: String = "build/${getTimestampAsDirectoryName()}", + nodeConfigurationPath: String = "reference.conf", quasarJarPath: String = "lib/quasar.jar", portAllocation: PortAllocation = PortAllocation.Incremental(10000), debugPortAllocation: PortAllocation = PortAllocation.Incremental(5005), @@ -75,6 +81,7 @@ fun driver( portAllocation = portAllocation, debugPortAllocation = debugPortAllocation, baseDirectory = baseDirectory, + nodeConfigurationPath = nodeConfigurationPath, quasarJarPath = quasarJarPath ) driverDsl.start() @@ -125,6 +132,7 @@ class DriverDSL( private val portAllocation: PortAllocation, private val debugPortAllocation: PortAllocation, val baseDirectory: String, + val nodeConfigurationPath: String, val quasarJarPath: String ) : DriverDSLInterface { @@ -134,15 +142,19 @@ class DriverDSL( private lateinit var networkMapNodeInfo: NodeInfo private val registeredProcesses = LinkedList() + val nodeConfiguration = + NodeConfigurationFromConfig( + ConfigFactory.parseResources( + nodeConfigurationPath, + ConfigParseOptions.defaults().setAllowMissing(false) + ) + ).copy( + myLegalName = "driver-artemis" + ) + val messagingService = ArtemisMessagingClient( Paths.get(baseDirectory, "driver-artemis"), - object : NodeConfiguration { - override val myLegalName = "driver-artemis" - override val exportJMXto = "" - override val nearestCity = "Zion" - override val keyStorePassword = "keypass" - override val trustStorePassword = "trustpass" - }, + nodeConfiguration, serverHostPort = networkMapAddress, myHostPort = portAllocation.nextHostAndPort() ) @@ -178,7 +190,6 @@ class DriverDSL( val apiAddress = portAllocation.nextHostAndPort() val debugPort = debugPortAllocation.nextPort() val name = providedName ?: "${pickA(name)}-${messagingAddress.port}" - val nearestCity = pickA(city) val driverCliParams = NodeRunner.CliParams( services = advertisedServices, @@ -188,7 +199,7 @@ class DriverDSL( messagingAddress = messagingAddress, apiAddress = apiAddress, baseDirectory = baseDirectory, - nearestCity = nearestCity, + nodeConfigurationPath = nodeConfigurationPath, legalName = name ) registerProcess(startNode(driverCliParams, quasarJarPath, debugPort)) @@ -228,6 +239,8 @@ class DriverDSL( } } + + private fun startNetworkMapService() { val apiAddress = portAllocation.nextHostAndPort() val debugPort = debugPortAllocation.nextPort() @@ -239,7 +252,7 @@ class DriverDSL( messagingAddress = networkMapAddress, apiAddress = apiAddress, baseDirectory = baseDirectory, - nearestCity = pickA(city), + nodeConfigurationPath = nodeConfigurationPath, legalName = networkMapName ) log.info("Starting network-map-service") @@ -248,12 +261,6 @@ class DriverDSL( companion object { - val city = arrayOf( - "London", - "Paris", - "New York", - "Tokyo" - ) val name = arrayOf( "Alice", "Bob", diff --git a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt index 2214d40f67..7ea1f777df 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt @@ -7,9 +7,12 @@ import com.r3corda.core.crypto.toBase58String import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.ServiceType import com.r3corda.node.internal.Node -import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.messaging.ArtemisMessagingClient +import com.r3corda.node.services.config.NodeConfigurationFromConfig +import com.r3corda.node.services.config.copy import com.r3corda.node.services.network.NetworkMapService +import com.typesafe.config.ConfigFactory +import com.typesafe.config.ConfigParseOptions import joptsimple.ArgumentAcceptingOptionSpec import joptsimple.OptionParser import joptsimple.OptionSet @@ -45,18 +48,21 @@ class NodeRunner { } else { null } + val nodeConfiguration = + NodeConfigurationFromConfig( + ConfigFactory.parseResources( + nodeConfigurationPath, + ConfigParseOptions.defaults().setAllowMissing(false) + ) + ).copy( + myLegalName = legalName + ) val node = Node( dir = nodeDirectory, p2pAddr = messagingAddress, webServerAddr = apiAddress, - configuration = object : NodeConfiguration { - override val myLegalName = legalName - override val exportJMXto = "" - override val nearestCity = cliParams.nearestCity - override val keyStorePassword = "keypass" - override val trustStorePassword = "trustpass" - }, + configuration = nodeConfiguration, networkMapAddress = networkMapNodeInfo, advertisedServices = services.toSet() ) @@ -75,8 +81,8 @@ class NodeRunner { val messagingAddress: HostAndPort, val apiAddress: HostAndPort, val baseDirectory: String, - val legalName: String, - val nearestCity: String + val nodeConfigurationPath: String, + val legalName: String ) { companion object { @@ -95,8 +101,8 @@ class NodeRunner { parser.accepts("api-address").withRequiredArg().ofType(String::class.java) val baseDirectory = parser.accepts("base-directory").withRequiredArg().ofType(String::class.java) - val nearestCity = - parser.accepts("nearest-city").withRequiredArg().ofType(String::class.java) + val nodeConfigurationPath = + parser.accepts("node-configuration-path").withRequiredArg().ofType(String::class.java) val legalName = parser.accepts("legal-name").withRequiredArg().ofType(String::class.java) @@ -114,7 +120,7 @@ class NodeRunner { val messagingAddress = requiredArgument(optionSet, messagingAddress) val apiAddress = requiredArgument(optionSet, apiAddress) val baseDirectory = requiredArgument(optionSet, baseDirectory) - val nearestCity = requiredArgument(optionSet, nearestCity) + val nodeConfigurationPath = requiredArgument(optionSet, nodeConfigurationPath) val legalName = requiredArgument(optionSet, legalName) return CliParams( @@ -125,8 +131,8 @@ class NodeRunner { networkMapName = networkMapName, networkMapPublicKey = networkMapPublicKey, networkMapAddress = networkMapAddress?.let { HostAndPort.fromString(it) }, - legalName = legalName, - nearestCity = nearestCity + nodeConfigurationPath = nodeConfigurationPath, + legalName = legalName ) } } @@ -153,8 +159,8 @@ class NodeRunner { cliArguments.add(apiAddress.toString()) cliArguments.add("--base-directory") cliArguments.add(baseDirectory.toString()) - cliArguments.add("--nearest-city") - cliArguments.add(nearestCity) + cliArguments.add("--node-configuration-path") + cliArguments.add(nodeConfigurationPath) cliArguments.add("--legal-name") cliArguments.add(legalName) return cliArguments @@ -162,7 +168,5 @@ class NodeRunner { } } -fun createNodeRunDirectory(directory: Path) { - directory.toFile().mkdirs() -} +fun createNodeRunDirectory(directory: Path) = directory.toFile().mkdirs() diff --git a/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt index 3cefe85e1e..94d536fe48 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt @@ -96,4 +96,20 @@ class FullNodeConfiguration(conf: Config) : NodeConfiguration { messagingServerAddress ) } -} \ No newline at end of file +} + +fun NodeConfiguration.copy( + myLegalName: String = this.myLegalName, + exportJMXto: String = this.exportJMXto, + nearestCity: String = this.nearestCity, + keyStorePassword: String = this.keyStorePassword, + trustStorePassword: String = this.trustStorePassword +): NodeConfiguration { + return object : NodeConfiguration { + override val myLegalName: String = myLegalName + override val exportJMXto: String = exportJMXto + override val nearestCity: String = nearestCity + override val keyStorePassword: String = keyStorePassword + override val trustStorePassword: String = trustStorePassword + } +} diff --git a/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt index 8af22c0e43..6763d95b72 100644 --- a/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt @@ -13,7 +13,7 @@ class DriverTests { fun simpleNodeStartupShutdownWorks() { // Start a notary - val (handle, notaryNodeInfo) = driver(quasarPath = "../lib/quasar.jar") { + val (handle, notaryNodeInfo) = driver(quasarJarPath = "../lib/quasar.jar") { startNode(setOf(NotaryService.Type), "TestNotary") } // Check that the node is registered in the network map From e7677c91ce1bb99643e46e6980cd0b560dfba8c6 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Tue, 2 Aug 2016 11:34:46 +0100 Subject: [PATCH 09/20] node: Add DriverDSL(Exposed/Internal)Interface, add generic `driver` method to allow extension --- .../kotlin/com/r3corda/node/driver/Driver.kt | 59 ++++++++++++++----- 1 file changed, 44 insertions(+), 15 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index 76a3016e23..007504f268 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -3,7 +3,9 @@ package com.r3corda.node.driver import com.google.common.net.HostAndPort import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.generateKeyPair +import com.r3corda.core.messaging.MessagingService import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.node.services.ServiceType import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.messaging.ArtemisMessagingClient @@ -15,7 +17,6 @@ import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions import org.slf4j.Logger import org.slf4j.LoggerFactory -import java.io.File import java.net.ServerSocket import java.net.Socket import java.net.SocketException @@ -48,10 +49,21 @@ import java.util.concurrent.TimeoutException * TODO The network map service bootstrap is hacky (needs to fake the service's public key in order to retrieve the true one), needs some thought. */ -interface DriverDSLInterface { +/** + * This is the interface that's exposed to + */ +interface DriverDSLExposedInterface { fun startNode(advertisedServices: Set, providedName: String? = null): NodeInfo } +interface DriverDSLInternalInterface : DriverDSLExposedInterface { + val messagingService: MessagingService + val networkMapCache: NetworkMapCache + fun start() + fun shutdown() + fun waitForAllNodesToFinish() +} + sealed class PortAllocation { abstract fun nextPort(): Int fun nextHostAndPort() = HostAndPort.fromParts("localhost", nextPort()) @@ -75,17 +87,35 @@ fun driver( quasarJarPath: String = "lib/quasar.jar", portAllocation: PortAllocation = PortAllocation.Incremental(10000), debugPortAllocation: PortAllocation = PortAllocation.Incremental(5005), - dsl: DriverDSL.() -> A -): Pair { - val driverDsl = DriverDSL( + dsl: DriverDSLExposedInterface.() -> A +) = genericDriver( + driverDsl = DriverDSL( portAllocation = portAllocation, debugPortAllocation = debugPortAllocation, baseDirectory = baseDirectory, nodeConfigurationPath = nodeConfigurationPath, quasarJarPath = quasarJarPath - ) + ), + coerce = { it }, + dsl = dsl +) + + +/** + * This is a helper method to allow extending of the DSL, along the lines of + * interface SomeOtherExposedDSLInterface : DriverDSLExposedInterface + * interface SomeOtherInternalDSLInterface : DriverDSLInternalInterface, SomeOtherExposedDSLInterface + * class SomeOtherDSL(val driverDSL : DriverDSL) : DriverDSLInternalInterface by driverDSL, SomeOtherInternalDSLInterface + * + * @param coerce We need this explicit coercion witness because we can't put an extra DI : D bound in a `where` clause + */ +fun genericDriver( + driverDsl: D, + coerce: (D) -> DI, + dsl: DI.() -> A +): Pair { driverDsl.start() - val returnValue = dsl(driverDsl) + val returnValue = dsl(coerce(driverDsl)) val shutdownHook = Thread({ driverDsl.shutdown() }) @@ -100,7 +130,7 @@ private fun getTimestampAsDirectoryName(): String { return df.format(Date()) } -class DriverHandle(private val driverDsl: DriverDSL, private val shutdownHook: Thread) { +class DriverHandle(private val driverDsl: DriverDSLInternalInterface, private val shutdownHook: Thread) { val messagingService = driverDsl.messagingService val networkMapCache = driverDsl.networkMapCache @@ -134,9 +164,9 @@ class DriverDSL( val baseDirectory: String, val nodeConfigurationPath: String, val quasarJarPath: String -) : DriverDSLInterface { +) : DriverDSLInternalInterface { - val networkMapCache = InMemoryNetworkMapCache(null) + override val networkMapCache = InMemoryNetworkMapCache(null) private val networkMapName = "NetworkMapService" private val networkMapAddress = portAllocation.nextHostAndPort() private lateinit var networkMapNodeInfo: NodeInfo @@ -152,7 +182,7 @@ class DriverDSL( myLegalName = "driver-artemis" ) - val messagingService = ArtemisMessagingClient( + override val messagingService = ArtemisMessagingClient( Paths.get(baseDirectory, "driver-artemis"), nodeConfiguration, serverHostPort = networkMapAddress, @@ -161,13 +191,13 @@ class DriverDSL( fun registerProcess(process: Process) = registeredProcesses.push(process) - internal fun waitForAllNodesToFinish() { + override fun waitForAllNodesToFinish() { registeredProcesses.forEach { it.waitFor() } } - internal fun shutdown() { + override fun shutdown() { registeredProcesses.forEach { it.destroy() } @@ -214,8 +244,7 @@ class DriverDSL( } } - internal fun start() { - startNetworkMapService() + override fun start() { messagingService.configureWithDevSSLCertificate() messagingService.start() // We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from From 9d071809efc38c8fc594dc4ae43a7f185b716397 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Tue, 2 Aug 2016 13:58:22 +0100 Subject: [PATCH 10/20] node: Expose port allocation in Driver --- node/src/main/kotlin/com/r3corda/node/driver/Driver.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index 007504f268..37698ef6ae 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -159,8 +159,8 @@ fun poll(f: () -> A?): A { } class DriverDSL( - private val portAllocation: PortAllocation, - private val debugPortAllocation: PortAllocation, + val portAllocation: PortAllocation, + val debugPortAllocation: PortAllocation, val baseDirectory: String, val nodeConfigurationPath: String, val quasarJarPath: String From feecc3666104d7125dc412040d1a9afb4b952dcd Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Thu, 4 Aug 2016 12:04:08 +0100 Subject: [PATCH 11/20] node: Address PR comments, better resource releasing, add kdoc --- .../kotlin/com/r3corda/node/driver/Driver.kt | 81 +++++++++----- .../com/r3corda/node/driver/NodeRunner.kt | 9 +- .../com/r3corda/node/driver/DriverTests.kt | 103 +++++++++++++----- 3 files changed, 129 insertions(+), 64 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index 37698ef6ae..c66e6bc70b 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -30,18 +30,8 @@ import java.util.concurrent.TimeoutException /** * This file defines a small "Driver" DSL for starting up nodes. * - * The process the driver is run behaves as an Artemis client and starts up other processes. Namely it first - * bootstraps a network map service to allow the specified nodes to connect to, then starts up the actual nodes/explorers. - * - * Usage: - * driver { - * startNode(setOf(NotaryService.Type), "Notary") - * val aliceMonitor = startNode(setOf(WalletMonitorService.Type), "Alice") - * } - * - * The base directory node directories go into may be specified in [driver] and defaults to "build//". - * The node directories themselves are "//", where legalName defaults to - * "-" and may be specified in [DriverDSL.startNode]. + * The process the driver is run in behaves as an Artemis client and starts up other processes. Namely it first + * bootstraps a network map service to allow the specified nodes to connect to, then starts up the actual nodes. * * TODO The driver actually starts up as an Artemis server now that may route traffic. Fix this once the client MessagingService is done. * TODO The nodes are started up sequentially which is quite slow. Either speed up node startup or make startup parallel somehow. @@ -53,12 +43,12 @@ import java.util.concurrent.TimeoutException * This is the interface that's exposed to */ interface DriverDSLExposedInterface { - fun startNode(advertisedServices: Set, providedName: String? = null): NodeInfo + fun startNode(providedName: String? = null, advertisedServices: Set = setOf()): NodeInfo + val messagingService: MessagingService + val networkMapCache: NetworkMapCache } interface DriverDSLInternalInterface : DriverDSLExposedInterface { - val messagingService: MessagingService - val networkMapCache: NetworkMapCache fun start() fun shutdown() fun waitForAllNodesToFinish() @@ -66,7 +56,7 @@ interface DriverDSLInternalInterface : DriverDSLExposedInterface { sealed class PortAllocation { abstract fun nextPort(): Int - fun nextHostAndPort() = HostAndPort.fromParts("localhost", nextPort()) + fun nextHostAndPort(): HostAndPort = HostAndPort.fromParts("localhost", nextPort()) class Incremental(private var portCounter: Int) : PortAllocation() { override fun nextPort() = portCounter++ @@ -79,14 +69,35 @@ sealed class PortAllocation { private val log: Logger = LoggerFactory.getLogger("Driver") /** - * TODO: remove quasarJarPath once we have a proper way of bundling quasar - */ + * [driver] allows one to start up nodes like this: + * driver { + * val noService = startNode("NoService") + * val notary = startNode("Notary") + * + * (...) + * } + * + * The driver implicitly bootstraps a [NetworkMapService] that may be accessed through a local cache [DriverDSL.networkMapCache] + * The driver is an artemis node itself, the messaging service may be accessed by [DriverDSL.messagingService] + * + * @param baseDirectory The base directory node directories go into, defaults to "build//". The node + * directories themselves are "//", where legalName defaults to "-" + * and may be specified in [DriverDSL.startNode]. + * @param nodeConfigurationPath The path to the node's .conf, defaults to "reference.conf". + * @param quasarJarPath The path to quasar.jar, relative to cwd. Defaults to "lib/quasar.jar". TODO remove this once we can bundle quasar properly. + * @param portAllocation The port allocation strategy to use for the messaging and the web server addresses. Defaults to incremental. + * @param debugPortAllocation The port allocation strategy to use for jvm debugging. Defaults to incremental. + * @param with A closure to be run once the nodes have started up. Defaults to empty closure. + * @param dsl The dsl itself + * @return The value returned in the [dsl] closure + */ fun driver( baseDirectory: String = "build/${getTimestampAsDirectoryName()}", nodeConfigurationPath: String = "reference.conf", quasarJarPath: String = "lib/quasar.jar", portAllocation: PortAllocation = PortAllocation.Incremental(10000), debugPortAllocation: PortAllocation = PortAllocation.Incremental(5005), + with: (DriverHandle, A) -> Unit = {_driverHandle, _result -> }, dsl: DriverDSLExposedInterface.() -> A ) = genericDriver( driverDsl = DriverDSL( @@ -97,7 +108,8 @@ fun driver( quasarJarPath = quasarJarPath ), coerce = { it }, - dsl = dsl + dsl = dsl, + with = with ) @@ -112,15 +124,22 @@ fun driver( fun genericDriver( driverDsl: D, coerce: (D) -> DI, - dsl: DI.() -> A -): Pair { + dsl: DI.() -> A, + with: (DriverHandle, A) -> Unit +): A { driverDsl.start() val returnValue = dsl(coerce(driverDsl)) val shutdownHook = Thread({ driverDsl.shutdown() }) Runtime.getRuntime().addShutdownHook(shutdownHook) - return Pair(DriverHandle(driverDsl, shutdownHook), returnValue) + try { + with(DriverHandle(driverDsl), returnValue) + } finally { + driverDsl.shutdown() + Runtime.getRuntime().removeShutdownHook(shutdownHook) + } + return returnValue } private fun getTimestampAsDirectoryName(): String { @@ -130,18 +149,13 @@ private fun getTimestampAsDirectoryName(): String { return df.format(Date()) } -class DriverHandle(private val driverDsl: DriverDSLInternalInterface, private val shutdownHook: Thread) { +class DriverHandle(private val driverDsl: DriverDSLInternalInterface) { val messagingService = driverDsl.messagingService val networkMapCache = driverDsl.networkMapCache fun waitForAllNodesToFinish() { driverDsl.waitForAllNodesToFinish() } - - fun shutdown() { - driverDsl.shutdown() - Runtime.getRuntime().removeShutdownHook(shutdownHook) - } } fun poll(f: () -> A?): A { @@ -213,9 +227,18 @@ class DriverDSL( it.destroyForcibly() } } + messagingService.stop() } - override fun startNode(advertisedServices: Set, providedName: String?): NodeInfo { + /** + * Starts a [Node] in a separate process. + * + * @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something + * random. Note that this must be unique as the driver uses it as a primary key! + * @param advertisedServices The set of services to be advertised by the node. Defaults to empty set. + * @return The [NodeInfo] of the started up node retrieved from the network map service. + */ + override fun startNode(providedName: String?, advertisedServices: Set): NodeInfo { val messagingAddress = portAllocation.nextHostAndPort() val apiAddress = portAllocation.nextHostAndPort() val debugPort = debugPortAllocation.nextPort() diff --git a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt index 7ea1f777df..445ae8befd 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt @@ -111,9 +111,6 @@ class NodeRunner { fun parse(optionSet: OptionSet): CliParams { val services = optionSet.valuesOf(services) - if (services.isEmpty()) { - throw IllegalArgumentException("Must provide at least one --services") - } val networkMapName = optionSet.valueOf(networkMapName) val networkMapPublicKey = optionSet.valueOf(networkMapPublicKey)?.let { parsePublicKeyBase58(it) } val networkMapAddress = optionSet.valueOf(networkMapAddress) @@ -139,8 +136,10 @@ class NodeRunner { fun toCliArguments(): List { val cliArguments = LinkedList() - cliArguments.add("--services") - cliArguments.addAll(services.map { it.toString() }) + if (services.isNotEmpty()) { + cliArguments.add("--services") + cliArguments.addAll(services.map { it.toString() }) + } if (networkMapName != null) { cliArguments.add("--network-map-name") cliArguments.add(networkMapName) diff --git a/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt index 6763d95b72..26b9384578 100644 --- a/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt @@ -1,5 +1,9 @@ package com.r3corda.node.driver +import com.google.common.net.HostAndPort +import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.services.NetworkMapCache +import com.r3corda.node.services.api.RegulatorService import com.r3corda.node.services.messaging.ArtemisMessagingComponent import com.r3corda.node.services.transactions.NotaryService import org.junit.Test @@ -9,40 +13,79 @@ import java.net.SocketException class DriverTests { - @Test - fun simpleNodeStartupShutdownWorks() { - - // Start a notary - val (handle, notaryNodeInfo) = driver(quasarJarPath = "../lib/quasar.jar") { - startNode(setOf(NotaryService.Type), "TestNotary") - } - // Check that the node is registered in the network map - poll { - handle.networkMapCache.get(NotaryService.Type).firstOrNull { - it.identity.name == "TestNotary" - } - } - // Check that the port is bound - val address = notaryNodeInfo.address as ArtemisMessagingComponent.Address - poll { - try { - Socket(address.hostAndPort.hostText, address.hostAndPort.port).close() - Unit - } catch (_exception: SocketException) { - null + companion object { + fun addressMustBeBound(hostAndPort: HostAndPort) { + poll { + try { + Socket(hostAndPort.hostText, hostAndPort.port).close() + Unit + } catch (_exception: SocketException) { + null + } } } - // Shutdown - handle.shutdown() - // Check that the port is not bound - poll { - try { - Socket(address.hostAndPort.hostText, address.hostAndPort.port).close() - null - } catch (_exception: SocketException) { - Unit + fun addressMustNotBeBound(hostAndPort: HostAndPort) { + poll { + try { + Socket(hostAndPort.hostText, hostAndPort.port).close() + null + } catch (_exception: SocketException) { + Unit + } } } + + fun nodeMustBeUp(networkMapCache: NetworkMapCache, nodeInfo: NodeInfo, nodeName: String) { + val address = nodeInfo.address as ArtemisMessagingComponent.Address + // Check that the node is registered in the network map + poll { + networkMapCache.get().firstOrNull { + it.identity.name == nodeName + } + } + // Check that the port is bound + addressMustBeBound(address.hostAndPort) + } + + fun nodeMustBeDown(nodeInfo: NodeInfo) { + val address = nodeInfo.address as ArtemisMessagingComponent.Address + // Check that the port is bound + addressMustNotBeBound(address.hostAndPort) + } + } + + @Test + fun simpleNodeStartupShutdownWorks() { + val (notary, regulator) = driver(quasarJarPath = "../lib/quasar.jar") { + val notary = startNode("TestNotary", setOf(NotaryService.Type)) + val regulator = startNode("Regulator", setOf(RegulatorService.Type)) + + nodeMustBeUp(networkMapCache, notary, "TestNotary") + nodeMustBeUp(networkMapCache, regulator, "Regulator") + Pair(notary, regulator) + } + nodeMustBeDown(notary) + nodeMustBeDown(regulator) + } + + @Test + fun startingNodeWithNoServicesWorks() { + val noService = driver(quasarJarPath = "../lib/quasar.jar") { + val noService = startNode("NoService") + nodeMustBeUp(networkMapCache, noService, "NoService") + noService + } + nodeMustBeDown(noService) + } + + @Test + fun randomFreePortAllocationWorks() { + val nodeInfo = driver(quasarJarPath = "../lib/quasar.jar", portAllocation = PortAllocation.RandomFree()) { + val nodeInfo = startNode("NoService") + nodeMustBeUp(networkMapCache, nodeInfo, "NoService") + nodeInfo + } + nodeMustBeDown(nodeInfo) } } From a5e8c8692822ce71c83b7361435679534332f7f1 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Thu, 4 Aug 2016 12:31:27 +0100 Subject: [PATCH 12/20] node: Increase poll timeout in driver --- node/src/main/kotlin/com/r3corda/node/driver/Driver.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index c66e6bc70b..79bea2070d 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -161,7 +161,7 @@ class DriverHandle(private val driverDsl: DriverDSLInternalInterface) { fun poll(f: () -> A?): A { var counter = 0 var result = f() - while (result == null && counter < 30) { + while (result == null && counter < 120) { counter++ Thread.sleep(500) result = f() From 2e6de61ad0dcd0a1cc2cd4c1916c0cce50eaba32 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Thu, 4 Aug 2016 14:16:21 +0100 Subject: [PATCH 13/20] node: Fix compiler error due to rebase --- node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt index 445ae8befd..fd46d51821 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt @@ -112,7 +112,7 @@ class NodeRunner { fun parse(optionSet: OptionSet): CliParams { val services = optionSet.valuesOf(services) val networkMapName = optionSet.valueOf(networkMapName) - val networkMapPublicKey = optionSet.valueOf(networkMapPublicKey)?.let { parsePublicKeyBase58(it) } + val networkMapPublicKey = optionSet.valueOf(networkMapPublicKey)?.run { parsePublicKeyBase58(this) } val networkMapAddress = optionSet.valueOf(networkMapAddress) val messagingAddress = requiredArgument(optionSet, messagingAddress) val apiAddress = requiredArgument(optionSet, apiAddress) From df4413ab6863fa9bbc7436e93ceee16046455d9a Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Thu, 4 Aug 2016 14:55:21 +0100 Subject: [PATCH 14/20] node: Add extra Driver checks for node shutdown, extend try/finally scope of cleanup --- .../kotlin/com/r3corda/node/driver/Driver.kt | 49 +++++++++++++++---- .../com/r3corda/node/driver/NodeRunner.kt | 2 +- .../com/r3corda/node/driver/DriverTests.kt | 22 --------- 3 files changed, 40 insertions(+), 33 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index 79bea2070d..5a279a73c2 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -39,6 +39,8 @@ import java.util.concurrent.TimeoutException * TODO The network map service bootstrap is hacky (needs to fake the service's public key in order to retrieve the true one), needs some thought. */ +private val log: Logger = LoggerFactory.getLogger(DriverDSL::class.java) + /** * This is the interface that's exposed to */ @@ -66,8 +68,6 @@ sealed class PortAllocation { } } -private val log: Logger = LoggerFactory.getLogger("Driver") - /** * [driver] allows one to start up nodes like this: * driver { @@ -127,19 +127,22 @@ fun genericD dsl: DI.() -> A, with: (DriverHandle, A) -> Unit ): A { - driverDsl.start() - val returnValue = dsl(coerce(driverDsl)) - val shutdownHook = Thread({ - driverDsl.shutdown() - }) - Runtime.getRuntime().addShutdownHook(shutdownHook) + var shutdownHook: Thread? = null try { + driverDsl.start() + val returnValue = dsl(coerce(driverDsl)) + shutdownHook = Thread({ + driverDsl.shutdown() + }) + Runtime.getRuntime().addShutdownHook(shutdownHook) with(DriverHandle(driverDsl), returnValue) + return returnValue } finally { driverDsl.shutdown() - Runtime.getRuntime().removeShutdownHook(shutdownHook) + if (shutdownHook != null) { + Runtime.getRuntime().removeShutdownHook(shutdownHook) + } } - return returnValue } private fun getTimestampAsDirectoryName(): String { @@ -149,6 +152,28 @@ private fun getTimestampAsDirectoryName(): String { return df.format(Date()) } +fun addressMustBeBound(hostAndPort: HostAndPort) { + poll { + try { + Socket(hostAndPort.hostText, hostAndPort.port).close() + Unit + } catch (_exception: SocketException) { + null + } + } +} + +fun addressMustNotBeBound(hostAndPort: HostAndPort) { + poll { + try { + Socket(hostAndPort.hostText, hostAndPort.port).close() + null + } catch (_exception: SocketException) { + Unit + } + } +} + class DriverHandle(private val driverDsl: DriverDSLInternalInterface) { val messagingService = driverDsl.messagingService val networkMapCache = driverDsl.networkMapCache @@ -228,6 +253,10 @@ class DriverDSL( } } messagingService.stop() + + // Check that we shut down properly + addressMustNotBeBound(messagingService.myHostPort) + addressMustNotBeBound((networkMapNodeInfo.address as ArtemisMessagingService.Address).hostAndPort) } /** diff --git a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt index fd46d51821..ba685619ba 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt @@ -23,7 +23,7 @@ import java.nio.file.Paths import java.security.PublicKey import java.util.* -private val log: Logger = LoggerFactory.getLogger("NodeRunner") +private val log: Logger = LoggerFactory.getLogger(NodeRunner::class.java) class NodeRunner { companion object { diff --git a/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt index 26b9384578..55ea69a979 100644 --- a/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt @@ -14,28 +14,6 @@ import java.net.SocketException class DriverTests { companion object { - fun addressMustBeBound(hostAndPort: HostAndPort) { - poll { - try { - Socket(hostAndPort.hostText, hostAndPort.port).close() - Unit - } catch (_exception: SocketException) { - null - } - } - } - - fun addressMustNotBeBound(hostAndPort: HostAndPort) { - poll { - try { - Socket(hostAndPort.hostText, hostAndPort.port).close() - null - } catch (_exception: SocketException) { - Unit - } - } - } - fun nodeMustBeUp(networkMapCache: NetworkMapCache, nodeInfo: NodeInfo, nodeName: String) { val address = nodeInfo.address as ArtemisMessagingComponent.Address // Check that the node is registered in the network map From 20fc20077128d396dd00716d5d0e66465af9adc4 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Thu, 4 Aug 2016 15:26:19 +0100 Subject: [PATCH 15/20] node: Fix checking of lateinited networkMapNodeInfo --- .../kotlin/com/r3corda/node/driver/Driver.kt | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index 5a279a73c2..00682da851 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -7,10 +7,10 @@ import com.r3corda.core.messaging.MessagingService import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.node.services.ServiceType -import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.config.NodeConfigurationFromConfig import com.r3corda.node.services.config.copy +import com.r3corda.node.services.messaging.ArtemisMessagingComponent import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.NetworkMapService import com.typesafe.config.ConfigFactory @@ -208,7 +208,7 @@ class DriverDSL( override val networkMapCache = InMemoryNetworkMapCache(null) private val networkMapName = "NetworkMapService" private val networkMapAddress = portAllocation.nextHostAndPort() - private lateinit var networkMapNodeInfo: NodeInfo + private var networkMapNodeInfo: NodeInfo? = null private val registeredProcesses = LinkedList() val nodeConfiguration = @@ -256,7 +256,10 @@ class DriverDSL( // Check that we shut down properly addressMustNotBeBound(messagingService.myHostPort) - addressMustNotBeBound((networkMapNodeInfo.address as ArtemisMessagingService.Address).hostAndPort) + val nodeInfo = networkMapNodeInfo + if (nodeInfo != null) { + addressMustNotBeBound((nodeInfo.address as ArtemisMessagingComponent.Address).hostAndPort) + } } /** @@ -275,8 +278,8 @@ class DriverDSL( val driverCliParams = NodeRunner.CliParams( services = advertisedServices, - networkMapName = networkMapNodeInfo.identity.name, - networkMapPublicKey = networkMapNodeInfo.identity.owningKey, + networkMapName = networkMapNodeInfo!!.identity.name, + networkMapPublicKey = networkMapNodeInfo!!.identity.owningKey, networkMapAddress = networkMapAddress, messagingAddress = messagingAddress, apiAddress = apiAddress, @@ -297,6 +300,7 @@ class DriverDSL( } override fun start() { + startNetworkMapService() messagingService.configureWithDevSSLCertificate() messagingService.start() // We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from @@ -320,8 +324,6 @@ class DriverDSL( } } - - private fun startNetworkMapService() { val apiAddress = portAllocation.nextHostAndPort() val debugPort = debugPortAllocation.nextPort() From c9527a2cdd1779bb892ff937ddb5c683f4d965f8 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Mon, 8 Aug 2016 18:24:35 +0100 Subject: [PATCH 16/20] node-driver: Only stop() artemis if it started fine --- node/src/main/kotlin/com/r3corda/node/driver/Driver.kt | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index 00682da851..d857d1e75a 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -227,6 +227,7 @@ class DriverDSL( serverHostPort = networkMapAddress, myHostPort = portAllocation.nextHostAndPort() ) + var messagingServiceStarted = false fun registerProcess(process: Process) = registeredProcesses.push(process) @@ -252,7 +253,9 @@ class DriverDSL( it.destroyForcibly() } } - messagingService.stop() + if (messagingServiceStarted){ + messagingService.stop() + } // Check that we shut down properly addressMustNotBeBound(messagingService.myHostPort) @@ -303,6 +306,7 @@ class DriverDSL( startNetworkMapService() messagingService.configureWithDevSSLCertificate() messagingService.start() + messagingServiceStarted = true // We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from // the network map service itself val nodeInfo = NodeInfo( From d0385c420e3b657fd89a80aefc15fc30600283bb Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Mon, 8 Aug 2016 18:27:40 +0100 Subject: [PATCH 17/20] node-driver: Remove parameter and DriverHandle, expose waitForAllNodesToFinish to dsl --- .../kotlin/com/r3corda/node/driver/Driver.kt | 20 +++---------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index d857d1e75a..ab1e08b106 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -46,6 +46,7 @@ private val log: Logger = LoggerFactory.getLogger(DriverDSL::class.java) */ interface DriverDSLExposedInterface { fun startNode(providedName: String? = null, advertisedServices: Set = setOf()): NodeInfo + fun waitForAllNodesToFinish() val messagingService: MessagingService val networkMapCache: NetworkMapCache } @@ -53,7 +54,6 @@ interface DriverDSLExposedInterface { interface DriverDSLInternalInterface : DriverDSLExposedInterface { fun start() fun shutdown() - fun waitForAllNodesToFinish() } sealed class PortAllocation { @@ -87,7 +87,6 @@ sealed class PortAllocation { * @param quasarJarPath The path to quasar.jar, relative to cwd. Defaults to "lib/quasar.jar". TODO remove this once we can bundle quasar properly. * @param portAllocation The port allocation strategy to use for the messaging and the web server addresses. Defaults to incremental. * @param debugPortAllocation The port allocation strategy to use for jvm debugging. Defaults to incremental. - * @param with A closure to be run once the nodes have started up. Defaults to empty closure. * @param dsl The dsl itself * @return The value returned in the [dsl] closure */ @@ -97,7 +96,6 @@ fun driver( quasarJarPath: String = "lib/quasar.jar", portAllocation: PortAllocation = PortAllocation.Incremental(10000), debugPortAllocation: PortAllocation = PortAllocation.Incremental(5005), - with: (DriverHandle, A) -> Unit = {_driverHandle, _result -> }, dsl: DriverDSLExposedInterface.() -> A ) = genericDriver( driverDsl = DriverDSL( @@ -108,8 +106,7 @@ fun driver( quasarJarPath = quasarJarPath ), coerce = { it }, - dsl = dsl, - with = with + dsl = dsl ) @@ -124,8 +121,7 @@ fun driver( fun genericDriver( driverDsl: D, coerce: (D) -> DI, - dsl: DI.() -> A, - with: (DriverHandle, A) -> Unit + dsl: DI.() -> A ): A { var shutdownHook: Thread? = null try { @@ -135,7 +131,6 @@ fun genericD driverDsl.shutdown() }) Runtime.getRuntime().addShutdownHook(shutdownHook) - with(DriverHandle(driverDsl), returnValue) return returnValue } finally { driverDsl.shutdown() @@ -174,15 +169,6 @@ fun addressMustNotBeBound(hostAndPort: HostAndPort) { } } -class DriverHandle(private val driverDsl: DriverDSLInternalInterface) { - val messagingService = driverDsl.messagingService - val networkMapCache = driverDsl.networkMapCache - - fun waitForAllNodesToFinish() { - driverDsl.waitForAllNodesToFinish() - } -} - fun poll(f: () -> A?): A { var counter = 0 var result = f() From 9e3220671c7bef2644da9892b0cbc5f587490b56 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Tue, 9 Aug 2016 14:54:23 +0100 Subject: [PATCH 18/20] node: stop() ClientFactory and ServerLocator when shutting down and ArtemisMessagingClient --- .../services/messaging/ArtemisMessagingClient.kt | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt index 2b2f5374e1..7b35d4bcdc 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt @@ -65,7 +65,8 @@ class ArtemisMessagingClient(directory: Path, private val mutex = ThreadBox(InnerState()) private val handlers = CopyOnWriteArrayList() - private lateinit var clientFactory: ClientSessionFactory + private var serverLocator: ServerLocator? = null + private var clientFactory: ClientSessionFactory? = null private var session: ClientSession? = null private var consumer: ClientConsumer? = null @@ -86,8 +87,11 @@ class ArtemisMessagingClient(directory: Path, private fun configureAndStartClient() { log.info("Connecting to server: $serverHostPort") // Connect to our server. - clientFactory = ActiveMQClient.createServerLocatorWithoutHA( - tcpTransport(ConnectionDirection.OUTBOUND, serverHostPort.hostText, serverHostPort.port)).createSessionFactory() + val serverLocator = ActiveMQClient.createServerLocatorWithoutHA( + tcpTransport(ConnectionDirection.OUTBOUND, serverHostPort.hostText, serverHostPort.port)) + this.serverLocator = serverLocator + val clientFactory = serverLocator.createSessionFactory() + this.clientFactory = clientFactory // Create a queue on which to receive messages and set up the handler. val session = clientFactory.createSession() @@ -168,6 +172,8 @@ class ArtemisMessagingClient(directory: Path, producers.clear() consumer?.close() session?.close() + clientFactory?.close() + serverLocator?.close() // We expect to be garbage collected shortly after being stopped, so we don't null anything explicitly here. running = false } From 5b2c687c64772af87ccb7f39c3aedcea11b6c901 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Tue, 9 Aug 2016 15:01:30 +0100 Subject: [PATCH 19/20] node-driver: Hacky fix of startup race condition --- .../kotlin/com/r3corda/node/driver/Driver.kt | 23 +++++++------------ .../com/r3corda/node/driver/DriverTests.kt | 2 -- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index ab1e08b106..14df357861 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -10,7 +10,6 @@ import com.r3corda.core.node.services.ServiceType import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.config.NodeConfigurationFromConfig import com.r3corda.node.services.config.copy -import com.r3corda.node.services.messaging.ArtemisMessagingComponent import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.NetworkMapService import com.typesafe.config.ConfigFactory @@ -245,10 +244,7 @@ class DriverDSL( // Check that we shut down properly addressMustNotBeBound(messagingService.myHostPort) - val nodeInfo = networkMapNodeInfo - if (nodeInfo != null) { - addressMustNotBeBound((nodeInfo.address as ArtemisMessagingComponent.Address).hostAndPort) - } + addressMustNotBeBound(networkMapAddress) } /** @@ -295,7 +291,7 @@ class DriverDSL( messagingServiceStarted = true // We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from // the network map service itself - val nodeInfo = NodeInfo( + val fakeNodeInfo = NodeInfo( address = ArtemisMessagingClient.makeRecipient(networkMapAddress), identity = Party( name = networkMapName, @@ -303,7 +299,7 @@ class DriverDSL( ), advertisedServices = setOf(NetworkMapService.Type) ) - networkMapCache.addMapService(messagingService, nodeInfo, true) + networkMapCache.addMapService(messagingService, fakeNodeInfo, true) networkMapNodeInfo = poll { networkMapCache.partyNodes.forEach { if (it.identity.name == networkMapName) { @@ -356,14 +352,11 @@ class DriverDSL( builder.redirectError(Paths.get("error.$className.log").toFile()) builder.inheritIO() val process = builder.start() - poll { - try { - Socket(cliParams.messagingAddress.hostText, cliParams.messagingAddress.port).close() - Unit - } catch (_exception: SocketException) { - null - } - } + addressMustBeBound(cliParams.messagingAddress) + // TODO There is a race condition here. Even though the messaging address is bound it may be the case that + // the handlers for the advertised services are not yet registered. A hacky workaround is that we wait for + // the web api address to be bound as well, as that starts after the services. Needs rethinking. + addressMustBeBound(cliParams.apiAddress) return process } diff --git a/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt index 55ea69a979..0e1ebc6b19 100644 --- a/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt @@ -7,8 +7,6 @@ import com.r3corda.node.services.api.RegulatorService import com.r3corda.node.services.messaging.ArtemisMessagingComponent import com.r3corda.node.services.transactions.NotaryService import org.junit.Test -import java.net.Socket -import java.net.SocketException class DriverTests { From 9df7a0faf080296f88eebe146f0f62a0c7d4238c Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Tue, 9 Aug 2016 15:38:33 +0100 Subject: [PATCH 20/20] node-driver: Fix compile error --- node/src/main/kotlin/com/r3corda/node/driver/Driver.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index 14df357861..a7c4d6afc9 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -190,7 +190,7 @@ class DriverDSL( val quasarJarPath: String ) : DriverDSLInternalInterface { - override val networkMapCache = InMemoryNetworkMapCache(null) + override val networkMapCache = InMemoryNetworkMapCache() private val networkMapName = "NetworkMapService" private val networkMapAddress = portAllocation.nextHostAndPort() private var networkMapNodeInfo: NodeInfo? = null