Various structural cleanups of node-driver:

* Extracted out ShutdownManager into its own file
* Moved RPCDriver and ProcessUtilities into internal package
* Made n.c.testing.performance package internal
This commit is contained in:
Shams Asari
2017-11-06 20:41:06 +00:00
parent e6feca2f03
commit e26e41a384
21 changed files with 152 additions and 131 deletions

View File

@ -1,4 +1,4 @@
package net.corda.testing.driver
package net.corda.testing.internal
import org.apache.commons.io.FileUtils
import org.junit.Rule

View File

@ -45,7 +45,6 @@ fun ledger(
*/
@JvmOverloads
fun transaction(
transactionLabel: String? = null,
transactionBuilder: TransactionBuilder = TransactionBuilder(notary = DUMMY_NOTARY),
initialiseSerialization: Boolean = true,
cordappPackages: List<String> = emptyList(),

View File

@ -37,6 +37,7 @@ import net.corda.nodeapi.internal.addShutdownHook
import net.corda.testing.*
import net.corda.testing.common.internal.NetworkParametersCopier
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.internal.ProcessUtilities
import net.corda.testing.node.ClusterSpec
import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO
import net.corda.testing.node.NotarySpec
@ -54,12 +55,10 @@ import java.time.Instant
import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter
import java.util.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import kotlin.collections.ArrayList
import kotlin.concurrent.thread
@ -559,102 +558,6 @@ fun <A> poll(
return resultFuture
}
class ShutdownManager(private val executorService: ExecutorService) {
private class State {
val registeredShutdowns = ArrayList<CordaFuture<() -> Unit>>()
var isShutdown = false
}
private val state = ThreadBox(State())
companion object {
inline fun <A> run(providedExecutorService: ExecutorService? = null, block: ShutdownManager.() -> A): A {
val executorService = providedExecutorService ?: Executors.newScheduledThreadPool(1)
val shutdownManager = ShutdownManager(executorService)
try {
return block(shutdownManager)
} finally {
shutdownManager.shutdown()
providedExecutorService ?: executorService.shutdown()
}
}
}
fun shutdown() {
val shutdownActionFutures = state.locked {
if (isShutdown) {
emptyList<CordaFuture<() -> Unit>>()
} else {
isShutdown = true
registeredShutdowns
}
}
val shutdowns = shutdownActionFutures.map { Try.on { it.getOrThrow(1.seconds) } }
shutdowns.reversed().forEach {
when (it) {
is Try.Success ->
try {
it.value()
} catch (t: Throwable) {
log.warn("Exception while shutting down", t)
}
is Try.Failure -> log.warn("Exception while getting shutdown method, disregarding", it.exception)
}
}
}
fun registerShutdown(shutdown: CordaFuture<() -> Unit>) {
state.locked {
require(!isShutdown)
registeredShutdowns.add(shutdown)
}
}
fun registerShutdown(shutdown: () -> Unit) = registerShutdown(doneFuture(shutdown))
fun registerProcessShutdown(processFuture: CordaFuture<Process>) {
val processShutdown = processFuture.map { process ->
{
process.destroy()
/** Wait 5 seconds, then [Process.destroyForcibly] */
val finishedFuture = executorService.submit {
process.waitFor()
}
try {
finishedFuture.get(5, SECONDS)
} catch (exception: TimeoutException) {
finishedFuture.cancel(true)
process.destroyForcibly()
}
Unit
}
}
registerShutdown(processShutdown)
}
interface Follower {
fun unfollow()
fun shutdown()
}
fun follower() = object : Follower {
private val start = state.locked { registeredShutdowns.size }
private val end = AtomicInteger(start - 1)
override fun unfollow() = end.set(state.locked { registeredShutdowns.size })
override fun shutdown() = end.get().let { end ->
start > end && throw IllegalStateException("You haven't called unfollow.")
state.locked {
registeredShutdowns.subList(start, end).listIterator(end - start).run {
while (hasPrevious()) {
previous().getOrThrow().invoke()
set(doneFuture {}) // Don't break other followers by doing a remove.
}
}
}
}
}
}
class DriverDSL(
val portAllocation: PortAllocation,
val debugPortAllocation: PortAllocation,

View File

@ -0,0 +1,113 @@
package net.corda.testing.driver
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.map
import net.corda.core.utilities.Try
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.seconds
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
class ShutdownManager(private val executorService: ExecutorService) {
private class State {
val registeredShutdowns = ArrayList<CordaFuture<() -> Unit>>()
var isShutdown = false
}
private val state = ThreadBox(State())
companion object {
private val log = loggerFor<ShutdownManager>()
inline fun <A> run(providedExecutorService: ExecutorService? = null, block: ShutdownManager.() -> A): A {
val executorService = providedExecutorService ?: Executors.newScheduledThreadPool(1)
val shutdownManager = ShutdownManager(executorService)
try {
return block(shutdownManager)
} finally {
shutdownManager.shutdown()
providedExecutorService ?: executorService.shutdown()
}
}
}
fun shutdown() {
val shutdownActionFutures = state.locked {
if (isShutdown) {
emptyList<CordaFuture<() -> Unit>>()
} else {
isShutdown = true
registeredShutdowns
}
}
val shutdowns = shutdownActionFutures.map { Try.on { it.getOrThrow(1.seconds) } }
shutdowns.reversed().forEach {
when (it) {
is Try.Success ->
try {
it.value()
} catch (t: Throwable) {
log.warn("Exception while shutting down", t)
}
is Try.Failure -> log.warn("Exception while getting shutdown method, disregarding", it.exception)
}
}
}
fun registerShutdown(shutdown: CordaFuture<() -> Unit>) {
state.locked {
require(!isShutdown)
registeredShutdowns.add(shutdown)
}
}
fun registerShutdown(shutdown: () -> Unit) = registerShutdown(doneFuture(shutdown))
fun registerProcessShutdown(processFuture: CordaFuture<Process>) {
val processShutdown = processFuture.map { process ->
{
process.destroy()
/** Wait 5 seconds, then [Process.destroyForcibly] */
val finishedFuture = executorService.submit {
process.waitFor()
}
try {
finishedFuture.get(5, TimeUnit.SECONDS)
} catch (exception: TimeoutException) {
finishedFuture.cancel(true)
process.destroyForcibly()
}
Unit
}
}
registerShutdown(processShutdown)
}
interface Follower {
fun unfollow()
fun shutdown()
}
fun follower() = object : Follower {
private val start = state.locked { registeredShutdowns.size }
private val end = AtomicInteger(start - 1)
override fun unfollow() = end.set(state.locked { registeredShutdowns.size })
override fun shutdown() = end.get().let { end ->
start > end && throw IllegalStateException("You haven't called unfollow.")
state.locked {
registeredShutdowns.subList(start, end).listIterator(end - start).run {
while (hasPrevious()) {
previous().getOrThrow().invoke()
set(doneFuture {}) // Don't break other followers by doing a remove.
}
}
}
}
}
}

View File

@ -1,4 +1,4 @@
package net.corda.testing.driver
package net.corda.testing.internal
import net.corda.core.internal.div
import net.corda.core.internal.exists

View File

@ -1,4 +1,4 @@
package net.corda.testing
package net.corda.testing.internal
import net.corda.client.mock.Generator
import net.corda.client.rpc.internal.KryoClientSerializationScheme

View File

@ -1,4 +1,4 @@
package net.corda.testing.performance
package net.corda.testing.internal.performance
import com.codahale.metrics.Gauge
import com.codahale.metrics.MetricRegistry

View File

@ -1,4 +1,4 @@
package net.corda.testing.performance
package net.corda.testing.internal.performance
import com.codahale.metrics.ConsoleReporter
import com.codahale.metrics.JmxReporter