Merge pull request #381 from corda/aslemmer-driver-improvements

driver: Add ShutdownManager, make network map service optional
This commit is contained in:
Andras Slemmer 2017-03-21 18:38:18 +00:00 committed by GitHub
commit 26044e543d

View File

@ -1,15 +1,15 @@
@file:JvmName("Driver") @file:JvmName("Driver")
package net.corda.node.driver package net.corda.node.driver
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.*
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.config.ConfigRenderOptions import com.typesafe.config.ConfigRenderOptions
import net.corda.core.* import net.corda.core.ThreadBox
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.div
import net.corda.core.flatMap
import net.corda.core.map
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceInfo
@ -92,6 +92,12 @@ interface DriverDSLExposedInterface {
*/ */
fun startWebserver(handle: NodeHandle): ListenableFuture<HostAndPort> fun startWebserver(handle: NodeHandle): ListenableFuture<HostAndPort>
/**
* Starts a network map service node. Note that only a single one should ever be running, so you will probably want
* to set automaticallyStartNetworkMap to false in your [driver] call.
*/
fun startNetworkMapService()
fun waitForAllNodesToFinish() fun waitForAllNodesToFinish()
} }
@ -161,6 +167,7 @@ fun <A> driver(
debugPortAllocation: PortAllocation = PortAllocation.Incremental(5005), debugPortAllocation: PortAllocation = PortAllocation.Incremental(5005),
systemProperties: Map<String, String> = emptyMap(), systemProperties: Map<String, String> = emptyMap(),
useTestClock: Boolean = false, useTestClock: Boolean = false,
automaticallyStartNetworkMap: Boolean = true,
dsl: DriverDSLExposedInterface.() -> A dsl: DriverDSLExposedInterface.() -> A
) = genericDriver( ) = genericDriver(
driverDsl = DriverDSL( driverDsl = DriverDSL(
@ -169,6 +176,7 @@ fun <A> driver(
systemProperties = systemProperties, systemProperties = systemProperties,
driverDirectory = driverDirectory.toAbsolutePath(), driverDirectory = driverDirectory.toAbsolutePath(),
useTestClock = useTestClock, useTestClock = useTestClock,
automaticallyStartNetworkMap = automaticallyStartNetworkMap,
isDebug = isDebug isDebug = isDebug
), ),
coerce = { it }, coerce = { it },
@ -235,7 +243,7 @@ fun addressMustNotBeBound(executorService: ScheduledExecutorService, hostAndPort
} }
} }
private fun <A> poll( fun <A> poll(
executorService: ScheduledExecutorService, executorService: ScheduledExecutorService,
pollName: String, pollName: String,
pollIntervalMs: Long = 500, pollIntervalMs: Long = 500,
@ -267,21 +275,74 @@ private fun <A> poll(
return resultFuture return resultFuture
} }
open class DriverDSL( class ShutdownManager(private val executorService: ExecutorService) {
private class State {
val registeredShutdowns = ArrayList<ListenableFuture<() -> Unit>>()
var isShutdown = false
}
private val state = ThreadBox(State())
fun shutdown() {
val shutdownFutures = state.locked {
require(!isShutdown)
isShutdown = true
registeredShutdowns
}
val shutdownsFuture = Futures.allAsList(shutdownFutures)
val shutdowns = try {
shutdownsFuture.get(1, SECONDS)
} catch (exception: TimeoutException) {
/** Could not get all of them, collect what we have */
shutdownFutures.filter { it.isDone }.map { it.get() }
}
shutdowns.reversed().forEach{ it() }
}
fun registerShutdown(shutdown: ListenableFuture<() -> Unit>) {
state.locked {
require(!isShutdown)
registeredShutdowns.add(shutdown)
}
}
fun registerProcessShutdown(processFuture: ListenableFuture<Process>) {
val processShutdown = processFuture.map { process ->
{
process.destroy()
/** Wait 5 seconds, then [Process.destroyForcibly] */
val finishedFuture = executorService.submit {
process.waitFor()
}
try {
finishedFuture.get(5, SECONDS)
} catch (exception: TimeoutException) {
finishedFuture.cancel(true)
process.destroyForcibly()
}
Unit
}
}
registerShutdown(processShutdown)
}
}
class DriverDSL(
val portAllocation: PortAllocation, val portAllocation: PortAllocation,
val debugPortAllocation: PortAllocation, val debugPortAllocation: PortAllocation,
val systemProperties: Map<String, String>, val systemProperties: Map<String, String>,
val driverDirectory: Path, val driverDirectory: Path,
val useTestClock: Boolean, val useTestClock: Boolean,
val isDebug: Boolean val isDebug: Boolean,
val automaticallyStartNetworkMap: Boolean
) : DriverDSLInternalInterface { ) : DriverDSLInternalInterface {
private val executorService: ScheduledExecutorService = Executors.newScheduledThreadPool(2)
private val networkMapLegalName = "NetworkMapService" private val networkMapLegalName = "NetworkMapService"
private val networkMapAddress = portAllocation.nextHostAndPort() private val networkMapAddress = portAllocation.nextHostAndPort()
val executorService: ListeningScheduledExecutorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(2))
val shutdownManager = ShutdownManager(executorService)
class State { class State {
val registeredProcesses = LinkedList<ListenableFuture<Process>>()
val clients = LinkedList<NodeMessagingClient>() val clients = LinkedList<NodeMessagingClient>()
val processes = ArrayList<ListenableFuture<Process>>()
} }
private val state = ThreadBox(State()) private val state = ThreadBox(State())
@ -295,37 +356,24 @@ open class DriverDSL(
Paths.get(quasarFileUrl.toURI()).toString() Paths.get(quasarFileUrl.toURI()).toString()
} }
fun registerProcess(process: ListenableFuture<Process>) = state.locked { registeredProcesses.push(process) } fun registerProcess(process: ListenableFuture<Process>) {
shutdownManager.registerProcessShutdown(process)
override fun waitForAllNodesToFinish() {
state.locked { state.locked {
registeredProcesses.forEach { processes.add(process)
it.getOrThrow().waitFor()
} }
} }
override fun waitForAllNodesToFinish() = state.locked {
Futures.allAsList(processes).get().forEach {
it.waitFor()
}
} }
override fun shutdown() { override fun shutdown() {
state.locked { state.locked {
clients.forEach(NodeMessagingClient::stop) clients.forEach(NodeMessagingClient::stop)
registeredProcesses.forEach {
it.get().destroy()
}
}
/** Wait 5 seconds, then [Process.destroyForcibly] */
val finishedFuture = executorService.submit {
waitForAllNodesToFinish()
}
try {
finishedFuture.get(5, SECONDS)
} catch (exception: TimeoutException) {
finishedFuture.cancel(true)
state.locked {
registeredProcesses.forEach {
it.get().destroyForcibly()
}
}
} }
shutdownManager.shutdown()
// Check that we shut down properly // Check that we shut down properly
addressMustNotBeBound(executorService, networkMapAddress).get() addressMustNotBeBound(executorService, networkMapAddress).get()
@ -458,10 +506,12 @@ open class DriverDSL(
} }
override fun start() { override fun start() {
if (automaticallyStartNetworkMap) {
startNetworkMapService() startNetworkMapService()
} }
}
private fun startNetworkMapService(): ListenableFuture<Process> { override fun startNetworkMapService() {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val apiAddress = portAllocation.nextHostAndPort().toString() val apiAddress = portAllocation.nextHostAndPort().toString()
val baseDirectory = driverDirectory / networkMapLegalName val baseDirectory = driverDirectory / networkMapLegalName
@ -481,7 +531,6 @@ open class DriverDSL(
log.info("Starting network-map-service") log.info("Starting network-map-service")
val startNode = startNode(executorService, FullNodeConfiguration(baseDirectory, config), quasarJarPath, debugPort, systemProperties) val startNode = startNode(executorService, FullNodeConfiguration(baseDirectory, config), quasarJarPath, debugPort, systemProperties)
registerProcess(startNode) registerProcess(startNode)
return startNode
} }
companion object { companion object {
@ -571,5 +620,5 @@ open class DriverDSL(
fun writeConfig(path: Path, filename: String, config: Config) { fun writeConfig(path: Path, filename: String, config: Config) {
path.toFile().mkdirs() path.toFile().mkdirs()
File("$path/$filename").writeText(config.root().render(ConfigRenderOptions.concise())) File("$path/$filename").writeText(config.root().render(ConfigRenderOptions.defaults()))
} }