CORDA-1030 - allow nodes to be waited for inprocess (#2678)

* allow nodes to be waited for inprocess

* review comments

* Andr3ej suggested fix for test

* fix build and rebase

* add 2 spaces indentation to line 892 of DriverDSLImpl.kt

* add one space to line 799

* remove one space from line 892

* (re)add another space to line 799
This commit is contained in:
Stefano Franz 2018-03-06 11:25:05 +00:00 committed by Katelyn Baker
parent 41edc88dd6
commit 53bb9864df
3 changed files with 63 additions and 6 deletions

View File

@ -3,6 +3,8 @@ package net.corda.testing.driver
import net.corda.core.concurrent.CordaFuture
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.CertRole
import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.div
import net.corda.core.internal.list
@ -22,10 +24,12 @@ import net.corda.testing.http.HttpApi
import net.corda.testing.node.NotarySpec
import org.assertj.core.api.Assertions.*
import org.json.simple.JSONObject
import org.junit.Assert
import org.junit.Test
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.*
import java.util.concurrent.*
import kotlin.streams.toList
import kotlin.test.assertEquals
class DriverTests {
private companion object {
@ -153,5 +157,31 @@ class DriverTests {
}
}
@Test
fun `driver waits for nodes to finish`() {
fun NodeHandle.stopQuietly() = try {
stop()
} catch (t: Throwable) {
t.printStackTrace()
}
val handlesFuture = openFuture<List<NodeHandle>>()
val driverExit = CountDownLatch(1)
val testFuture = ForkJoinPool.commonPool().fork {
val handles = LinkedList(handlesFuture.getOrThrow())
val last = handles.removeLast()
handles.forEach { it.stopQuietly() }
assertEquals(1, driverExit.count)
last.stopQuietly()
}
driver(DriverParameters(startNodesInProcess = true, waitForAllNodesToFinish = true)) {
val nodeA = newNode(DUMMY_BANK_A_NAME)().getOrThrow()
handlesFuture.set(listOf(nodeA) + notaryHandles.map { it.nodeHandles.getOrThrow() }.flatten())
}
driverExit.countDown()
testFuture.getOrThrow()
}
private fun DriverDSL.newNode(name: CordaX500Name) = { startNode(NodeParameters(providedName = name)) }
}

View File

@ -75,6 +75,7 @@ import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.collections.ArrayList
import kotlin.concurrent.thread
import net.corda.nodeapi.internal.config.User as InternalUser
@ -110,8 +111,13 @@ class DriverDSLImpl(
private lateinit var _notaries: CordaFuture<List<NotaryHandle>>
override val notaryHandles: List<NotaryHandle> get() = _notaries.getOrThrow()
interface Waitable {
@Throws(InterruptedException::class)
fun waitFor(): Unit
}
class State {
val processes = ArrayList<Process>()
val processes = ArrayList<Waitable>()
}
private val state = ThreadBox(State())
@ -665,20 +671,32 @@ class DriverDSLImpl(
}
}
)
return nodeAndThreadFuture.flatMap { (node, thread) ->
val nodeFuture: CordaFuture<NodeHandle> = nodeAndThreadFuture.flatMap { (node, thread) ->
establishRpc(config, openFuture()).flatMap { rpc ->
allNodesConnected(rpc).map {
InProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, thread, onNodeExit, node)
}
}
}
state.locked {
processes += object : Waitable {
override fun waitFor() {
nodeAndThreadFuture.getOrThrow().second.join()
}
}
}
return nodeFuture
} else {
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val monitorPort = if (jmxPolicy.startJmxHttpServer) jmxPolicy.jmxHttpServerPortAllocation?.nextPort() else null
val process = startOutOfProcessNode(config, quasarJarPath, debugPort, jolokiaJarPath, monitorPort, systemProperties, cordappPackages, maximumHeapSize)
if (waitForAllNodesToFinish) {
state.locked {
processes += process
processes += object : Waitable {
override fun waitFor() {
process.waitFor()
}
}
}
} else {
shutdownManager.registerProcessShutdown(process)

View File

@ -3,7 +3,10 @@ package net.corda.testing.node.internal
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.utilities.*
import net.corda.core.utilities.Try
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeoutException
@ -12,6 +15,7 @@ import java.util.concurrent.atomic.AtomicInteger
class ShutdownManager(private val executorService: ExecutorService) {
private class State {
val registeredShutdowns = ArrayList<CordaFuture<() -> Unit>>()
var isShuttingDown = false
var isShutdown = false
}
@ -32,6 +36,7 @@ class ShutdownManager(private val executorService: ExecutorService) {
}
fun shutdown() {
state.locked { isShuttingDown = true }
val shutdownActionFutures = state.locked {
if (isShutdown) {
emptyList<CordaFuture<() -> Unit>>()
@ -101,4 +106,8 @@ class ShutdownManager(private val executorService: ExecutorService) {
}
}
}
fun isShuttingDown(): Boolean {
return state.locked { isShuttingDown }
}
}