Fixed issue caused by race condition in process registration v node exit.

See https://github.com/corda/corda/issues/88
This commit is contained in:
josecoll 2016-12-23 15:09:13 +00:00
parent 1bcabc8d41
commit b4a5a03992

View File

@ -264,7 +264,7 @@ open class DriverDSL(
private val networkMapAddress = portAllocation.nextHostAndPort() private val networkMapAddress = portAllocation.nextHostAndPort()
class State { class State {
val registeredProcesses = LinkedList<Process>() val registeredProcesses = LinkedList<ListenableFuture<Process>>()
val clients = LinkedList<NodeMessagingClient>() val clients = LinkedList<NodeMessagingClient>()
} }
@ -279,12 +279,12 @@ open class DriverDSL(
Paths.get(quasarFileUrl.toURI()).toString() Paths.get(quasarFileUrl.toURI()).toString()
} }
fun registerProcess(process: Process) = state.locked { registeredProcesses.push(process) } fun registerProcess(process: ListenableFuture<Process>) = state.locked { registeredProcesses.push(process) }
override fun waitForAllNodesToFinish() { override fun waitForAllNodesToFinish() {
state.locked { state.locked {
registeredProcesses.forEach { registeredProcesses.forEach {
it.waitFor() it.getOrThrow().waitFor()
} }
} }
} }
@ -294,7 +294,9 @@ open class DriverDSL(
clients.forEach { clients.forEach {
it.stop() it.stop()
} }
registeredProcesses.forEach(Process::destroy) registeredProcesses.forEach {
it.get().destroy()
}
} }
/** Wait 5 seconds, then [Process.destroyForcibly] */ /** Wait 5 seconds, then [Process.destroyForcibly] */
val finishedFuture = executorService.submit { val finishedFuture = executorService.submit {
@ -306,7 +308,7 @@ open class DriverDSL(
finishedFuture.cancel(true) finishedFuture.cancel(true)
state.locked { state.locked {
registeredProcesses.forEach { registeredProcesses.forEach {
it.destroyForcibly() it.get().destroyForcibly()
} }
} }
} }
@ -368,8 +370,9 @@ open class DriverDSL(
configOverrides = configOverrides configOverrides = configOverrides
) )
return startNode(executorService, FullNodeConfiguration(config), quasarJarPath, debugPort).map { val startNode = startNode(executorService, FullNodeConfiguration(config), quasarJarPath, debugPort)
registerProcess(it) registerProcess(startNode)
return startNode.map {
NodeHandle(queryNodeInfo(apiAddress)!!, config, it) NodeHandle(queryNodeInfo(apiAddress)!!, config, it)
} }
} }
@ -409,7 +412,7 @@ open class DriverDSL(
startNetworkMapService() startNetworkMapService()
} }
private fun startNetworkMapService(): ListenableFuture<Unit> { private fun startNetworkMapService(): ListenableFuture<Process> {
val apiAddress = portAllocation.nextHostAndPort() val apiAddress = portAllocation.nextHostAndPort()
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
@ -428,9 +431,9 @@ open class DriverDSL(
) )
log.info("Starting network-map-service") log.info("Starting network-map-service")
return startNode(executorService, FullNodeConfiguration(config), quasarJarPath, debugPort).map { val startNode = startNode(executorService, FullNodeConfiguration(config), quasarJarPath, debugPort)
registerProcess(it) registerProcess(startNode)
} return startNode
} }
companion object { companion object {