Merged in aslemmer-node-driver-improvements (pull request #287)

node driver improvements
This commit is contained in:
Andras Slemmer 2016-08-19 17:11:07 +01:00
commit c3bf914d30
2 changed files with 178 additions and 91 deletions

View File

@ -1,15 +1,17 @@
package com.r3corda.node.driver
import com.google.common.net.HostAndPort
import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.ServiceType
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.config.NodeConfigurationFromConfig
import com.r3corda.node.services.messaging.ArtemisMessagingClient
import com.r3corda.node.services.messaging.ArtemisMessagingComponent
import com.r3corda.node.services.messaging.ArtemisMessagingServer
import com.r3corda.node.services.network.InMemoryNetworkMapCache
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.utilities.AffinityExecutor
@ -25,9 +27,7 @@ import java.net.URLClassLoader
import java.nio.file.Paths
import java.text.SimpleDateFormat
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.*
import kotlin.concurrent.thread
/**
@ -45,15 +45,45 @@ import kotlin.concurrent.thread
private val log: Logger = LoggerFactory.getLogger(DriverDSL::class.java)
/**
* This is the interface that's exposed to
* This is the interface that's exposed to DSL users.
*/
interface DriverDSLExposedInterface {
fun startNode(providedName: String? = null, advertisedServices: Set<ServiceType> = setOf()): NodeInfo
/**
* Starts a [Node] in a separate process.
*
* @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something
* random. Note that this must be unique as the driver uses it as a primary key!
* @param advertisedServices The set of services to be advertised by the node. Defaults to empty set.
* @return The [NodeInfo] of the started up node retrieved from the network map service.
*/
fun startNode(providedName: String? = null, advertisedServices: Set<ServiceType> = setOf()): Future<NodeInfo>
/**
* Starts an [ArtemisMessagingClient].
*
* @param providedName name of the client, which will be used for creating its directory.
* @param serverAddress the artemis server to connect to, for example a [Node].
* @param clientAddress the address of the client (this is not bound by the client!), defaults to [serverAddress] if null.
*/
fun startClient(providedName: String, serverAddress: HostAndPort, clientAddress: HostAndPort?): Future<ArtemisMessagingClient>
/**
* Starts a local [ArtemisMessagingServer] of which there may only be one.
*/
fun startLocalServer(): Future<ArtemisMessagingServer>
fun waitForAllNodesToFinish()
val messagingService: MessagingService
val networkMapCache: NetworkMapCache
}
fun DriverDSLExposedInterface.startClient(localServer: ArtemisMessagingServer) =
startClient("driver-local-server-client", localServer.myHostPort, localServer.myHostPort)
fun DriverDSLExposedInterface.startClient(remoteNodeInfo: NodeInfo, providedName: String? = null) =
startClient(
providedName = providedName ?: "${remoteNodeInfo.identity.name}-client",
serverAddress = (remoteNodeInfo.address as ArtemisMessagingComponent.Address).hostAndPort,
clientAddress = null
)
interface DriverDSLInternalInterface : DriverDSLExposedInterface {
fun start()
fun shutdown()
@ -80,27 +110,32 @@ sealed class PortAllocation {
* (...)
* }
*
* The driver implicitly bootstraps a [NetworkMapService] that may be accessed through a local cache [DriverDSL.networkMapCache]
* The driver is an artemis node itself, the messaging service may be accessed by [DriverDSL.messagingService]
* Note that [DriverDSL.startNode] does not wait for the node to start up synchronously, but rather returns a [Future]
* of the [NodeInfo] that may be waited on, which completes when the new node registered with the network map service.
*
* The driver implicitly bootstraps a [NetworkMapService] that may be accessed through a local cache [DriverDSL.networkMapCache].
*
* @param baseDirectory The base directory node directories go into, defaults to "build/<timestamp>/". The node
* directories themselves are "<baseDirectory>/<legalName>/", where legalName defaults to "<randomName>-<messagingPort>"
* and may be specified in [DriverDSL.startNode].
* @param portAllocation The port allocation strategy to use for the messaging and the web server addresses. Defaults to incremental.
* @param debugPortAllocation The port allocation strategy to use for jvm debugging. Defaults to incremental.
* @param dsl The dsl itself
* @return The value returned in the [dsl] closure
* @param isDebug Indicates whether the spawned nodes should start in jdwt debug mode.
* @param dsl The dsl itself.
* @return The value returned in the [dsl] closure.
*/
fun <A> driver(
baseDirectory: String = "build/${getTimestampAsDirectoryName()}",
portAllocation: PortAllocation = PortAllocation.Incremental(10000),
debugPortAllocation: PortAllocation = PortAllocation.Incremental(5005),
isDebug: Boolean = false,
dsl: DriverDSLExposedInterface.() -> A
) = genericDriver(
driverDsl = DriverDSL(
portAllocation = portAllocation,
debugPortAllocation = debugPortAllocation,
baseDirectory = baseDirectory
portAllocation = portAllocation,
debugPortAllocation = debugPortAllocation,
baseDirectory = baseDirectory,
isDebug = isDebug
),
coerce = { it },
dsl = dsl
@ -112,7 +147,7 @@ fun <A> driver(
* interface SomeOtherInternalDSLInterface : DriverDSLInternalInterface, SomeOtherExposedDSLInterface
* class SomeOtherDSL(val driverDSL : DriverDSL) : DriverDSLInternalInterface by driverDSL, SomeOtherInternalDSLInterface
*
* @param coerce We need this explicit coercion witness because we can't put an extra DI : D bound in a `where` clause
* @param coerce We need this explicit coercion witness because we can't put an extra DI : D bound in a `where` clause.
*/
fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericDriver(
driverDsl: D,
@ -144,7 +179,7 @@ private fun getTimestampAsDirectoryName(): String {
}
fun addressMustBeBound(hostAndPort: HostAndPort) {
poll {
poll("address $hostAndPort to bind") {
try {
Socket(hostAndPort.hostText, hostAndPort.port).close()
Unit
@ -155,7 +190,7 @@ fun addressMustBeBound(hostAndPort: HostAndPort) {
}
fun addressMustNotBeBound(hostAndPort: HostAndPort) {
poll {
poll("address $hostAndPort to unbind") {
try {
Socket(hostAndPort.hostText, hostAndPort.port).close()
null
@ -165,31 +200,37 @@ fun addressMustNotBeBound(hostAndPort: HostAndPort) {
}
}
fun <A> poll(f: () -> A?): A {
fun <A> poll(pollName: String, pollIntervalMs: Long = 500, warnCount: Int = 120, f: () -> A?): A {
var counter = 0
var result = f()
while (result == null && counter < 120) {
counter++
Thread.sleep(500)
while (result == null) {
if (counter == warnCount) {
log.warn("Been polling $pollName for ${pollIntervalMs * warnCount / 1000.0} seconds...")
}
counter = (counter % warnCount) + 1
Thread.sleep(pollIntervalMs)
result = f()
}
if (result == null) {
throw Exception("Poll timed out")
}
return result
}
class DriverDSL(
val portAllocation: PortAllocation,
val debugPortAllocation: PortAllocation,
val baseDirectory: String
val baseDirectory: String,
val isDebug: Boolean
) : DriverDSLInternalInterface {
override val networkMapCache = InMemoryNetworkMapCache()
private val networkMapName = "NetworkMapService"
private val networkMapAddress = portAllocation.nextHostAndPort()
private var networkMapNodeInfo: NodeInfo? = null
private val registeredProcesses = LinkedList<Process>()
class State {
val registeredProcesses = LinkedList<Process>()
val clients = LinkedList<ArtemisMessagingClient>()
var localServer: ArtemisMessagingServer? = null
}
private val state = ThreadBox(State())
//TODO: remove this once we can bundle quasar properly.
private val quasarJarPath: String by lazy {
@ -200,35 +241,24 @@ class DriverDSL(
Paths.get(quasarFileUrl.toURI()).toString()
}
val driverNodeConfiguration = NodeConfigurationFromConfig(
NodeConfiguration.loadConfig(
baseDirectoryPath = Paths.get(baseDirectory, "driver-artemis"),
allowMissingConfig = true,
configOverrides = mapOf(
"myLegalName" to "driver-artemis"
)
)
)
override val messagingService = ArtemisMessagingClient(
Paths.get(baseDirectory, "driver-artemis"),
driverNodeConfiguration,
serverHostPort = networkMapAddress,
myHostPort = portAllocation.nextHostAndPort(),
executor = AffinityExecutor.ServiceAffinityExecutor("Client thread", 1)
)
var messagingServiceStarted = false
fun registerProcess(process: Process) = registeredProcesses.push(process)
fun registerProcess(process: Process) = state.locked { registeredProcesses.push(process) }
override fun waitForAllNodesToFinish() {
registeredProcesses.forEach {
it.waitFor()
state.locked {
registeredProcesses.forEach {
it.waitFor()
}
}
}
override fun shutdown() {
registeredProcesses.forEach(Process::destroy)
state.locked {
clients.forEach {
it.stop()
}
localServer?.stop()
registeredProcesses.forEach(Process::destroy)
}
/** Wait 5 seconds, then [Process.destroyForcibly] */
val finishedFuture = Executors.newSingleThreadExecutor().submit {
waitForAllNodesToFinish()
@ -237,30 +267,24 @@ class DriverDSL(
finishedFuture.get(5, TimeUnit.SECONDS)
} catch (exception: TimeoutException) {
finishedFuture.cancel(true)
registeredProcesses.forEach {
it.destroyForcibly()
state.locked {
registeredProcesses.forEach {
it.destroyForcibly()
}
}
}
if (messagingServiceStarted)
messagingService.stop()
// Check that we shut down properly
addressMustNotBeBound(messagingService.myHostPort)
state.locked {
localServer?.run { addressMustNotBeBound(myHostPort) }
}
addressMustNotBeBound(networkMapAddress)
}
/**
* Starts a [Node] in a separate process.
*
* @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something
* random. Note that this must be unique as the driver uses it as a primary key!
* @param advertisedServices The set of services to be advertised by the node. Defaults to empty set.
* @return The [NodeInfo] of the started up node retrieved from the network map service.
*/
override fun startNode(providedName: String?, advertisedServices: Set<ServiceType>): NodeInfo {
override fun startNode(providedName: String?, advertisedServices: Set<ServiceType>): Future<NodeInfo> {
val messagingAddress = portAllocation.nextHostAndPort()
val apiAddress = portAllocation.nextHostAndPort()
val debugPort = debugPortAllocation.nextPort()
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val name = providedName ?: "${pickA(name)}-${messagingAddress.port}"
val nodeDirectory = "$baseDirectory/$name"
@ -282,26 +306,86 @@ class DriverDSL(
apiAddress = apiAddress,
baseDirectory = nodeDirectory
)
registerProcess(DriverDSL.startNode(config, driverCliParams, name, quasarJarPath, debugPort))
return poll {
networkMapCache.partyNodes.forEach {
if (it.identity.name == name) {
return@poll it
return Executors.newSingleThreadExecutor().submit(Callable<NodeInfo> {
registerProcess(DriverDSL.startNode(config, driverCliParams, name, quasarJarPath, debugPort))
poll("network map cache for $name") {
networkMapCache.partyNodes.forEach {
if (it.identity.name == name) {
return@poll it
}
}
null
}
null
}
})
}
override fun startClient(
providedName: String,
serverAddress: HostAndPort,
clientAddress: HostAndPort?
): Future<ArtemisMessagingClient> {
val nodeConfiguration = NodeConfigurationFromConfig(
NodeConfiguration.loadConfig(
baseDirectoryPath = Paths.get(baseDirectory, providedName),
allowMissingConfig = true,
configOverrides = mapOf(
"myLegalName" to providedName
)
)
)
val client = ArtemisMessagingClient(
Paths.get(baseDirectory, providedName),
nodeConfiguration,
serverHostPort = serverAddress,
myHostPort = clientAddress ?: serverAddress,
executor = AffinityExecutor.ServiceAffinityExecutor(providedName, 1)
)
return Executors.newSingleThreadExecutor().submit(Callable<ArtemisMessagingClient> {
client.configureWithDevSSLCertificate()
client.start()
thread { client.run() }
state.locked {
clients.add(client)
}
client
})
}
override fun startLocalServer(): Future<ArtemisMessagingServer> {
val name = "driver-local-server"
val config = NodeConfigurationFromConfig(
NodeConfiguration.loadConfig(
baseDirectoryPath = Paths.get(baseDirectory, name),
allowMissingConfig = true,
configOverrides = mapOf(
"myLegalName" to name
)
)
)
val server = ArtemisMessagingServer(
Paths.get(baseDirectory, name),
config,
portAllocation.nextHostAndPort()
)
return Executors.newSingleThreadExecutor().submit(Callable<ArtemisMessagingServer> {
server.configureWithDevSSLCertificate()
server.start()
state.locked {
localServer = server
}
server
})
}
override fun start() {
startNetworkMapService()
messagingService.configureWithDevSSLCertificate()
messagingService.start()
thread { messagingService.run() }
messagingServiceStarted = true
val networkMapClient = startClient("driver-$networkMapName-client", networkMapAddress, portAllocation.nextHostAndPort()).get()
// We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from
// the network map service itself
// the network map service itself.
val fakeNodeInfo = NodeInfo(
address = ArtemisMessagingClient.makeRecipient(networkMapAddress),
identity = Party(
@ -310,8 +394,8 @@ class DriverDSL(
),
advertisedServices = setOf(NetworkMapService.Type)
)
networkMapCache.addMapService(messagingService, fakeNodeInfo, true)
networkMapNodeInfo = poll {
networkMapCache.addMapService(networkMapClient, fakeNodeInfo, true)
networkMapNodeInfo = poll("network map cache for $networkMapName") {
networkMapCache.partyNodes.forEach {
if (it.identity.name == networkMapName) {
return@poll it
@ -323,7 +407,7 @@ class DriverDSL(
private fun startNetworkMapService() {
val apiAddress = portAllocation.nextHostAndPort()
val debugPort = debugPortAllocation.nextPort()
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val nodeDirectory = "$baseDirectory/$networkMapName"
@ -363,7 +447,7 @@ class DriverDSL(
cliParams: NodeRunner.CliParams,
legalName: String,
quasarJarPath: String,
debugPort: Int
debugPort: Int?
): Process {
// Write node.conf
@ -375,9 +459,12 @@ class DriverDSL(
val path = System.getProperty("java.home") + separator + "bin" + separator + "java"
val javaArgs = listOf(path) +
listOf("-Dname=$legalName", "-javaagent:$quasarJarPath",
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$debugPort",
"-cp", classpath, className) +
cliParams.toCliArguments()
cliParams.toCliArguments() +
if (debugPort != null)
listOf("-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$debugPort")
else
listOf()
val builder = ProcessBuilder(javaArgs)
builder.redirectError(Paths.get("error.$className.log").toFile())
builder.inheritIO()

View File

@ -13,7 +13,7 @@ class DriverTests {
fun nodeMustBeUp(networkMapCache: NetworkMapCache, nodeInfo: NodeInfo, nodeName: String) {
val address = nodeInfo.address as ArtemisMessagingComponent.Address
// Check that the node is registered in the network map
poll {
poll("network map cache for $nodeName") {
networkMapCache.get().firstOrNull {
it.identity.name == nodeName
}
@ -35,9 +35,9 @@ class DriverTests {
val notary = startNode("TestNotary", setOf(NotaryService.Type))
val regulator = startNode("Regulator", setOf(RegulatorService.Type))
nodeMustBeUp(networkMapCache, notary, "TestNotary")
nodeMustBeUp(networkMapCache, regulator, "Regulator")
Pair(notary, regulator)
nodeMustBeUp(networkMapCache, notary.get(), "TestNotary")
nodeMustBeUp(networkMapCache, regulator.get(), "Regulator")
Pair(notary.get(), regulator.get())
}
nodeMustBeDown(notary)
nodeMustBeDown(regulator)
@ -47,8 +47,8 @@ class DriverTests {
fun startingNodeWithNoServicesWorks() {
val noService = driver {
val noService = startNode("NoService")
nodeMustBeUp(networkMapCache, noService, "NoService")
noService
nodeMustBeUp(networkMapCache, noService.get(), "NoService")
noService.get()
}
nodeMustBeDown(noService)
}
@ -57,8 +57,8 @@ class DriverTests {
fun randomFreePortAllocationWorks() {
val nodeInfo = driver(portAllocation = PortAllocation.RandomFree()) {
val nodeInfo = startNode("NoService")
nodeMustBeUp(networkMapCache, nodeInfo, "NoService")
nodeInfo
nodeMustBeUp(networkMapCache, nodeInfo.get(), "NoService")
nodeInfo.get()
}
nodeMustBeDown(nodeInfo)
}