diff --git a/node/src/integration-test/kotlin/net/corda/test/node/NodeStartAndStopTest.kt b/node/src/integration-test/kotlin/net/corda/test/node/NodeStartAndStopTest.kt deleted file mode 100644 index 311a126fee..0000000000 --- a/node/src/integration-test/kotlin/net/corda/test/node/NodeStartAndStopTest.kt +++ /dev/null @@ -1,26 +0,0 @@ -package net.corda.test.node - -import net.corda.core.utilities.getOrThrow -import net.corda.testing.core.ALICE_NAME -import net.corda.testing.internal.IntegrationTestSchemas -import net.corda.testing.internal.toDatabaseSchemaName -import net.corda.testing.node.internal.NodeBasedTest -import org.junit.ClassRule -import org.junit.Test - -class NodeStartAndStopTest : NodeBasedTest() { - companion object { - @ClassRule @JvmField - val databaseSchemas = IntegrationTestSchemas(ALICE_NAME.toDatabaseSchemaName()) - } - - @Test - fun `start stop start`() { - val node = startNode(ALICE_NAME) - node.internals.startupComplete.get() - node.internals.stop() - - node.internals.start() - node.internals.startupComplete.getOrThrow() - } -} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/test/node/NodeSuspendAndResumeTest.kt b/node/src/integration-test/kotlin/net/corda/test/node/NodeSuspendAndResumeTest.kt new file mode 100644 index 0000000000..a44d85c915 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/test/node/NodeSuspendAndResumeTest.kt @@ -0,0 +1,226 @@ +package net.corda.test.node + +import net.corda.client.rpc.CordaRPCClient +import net.corda.core.contracts.Amount +import net.corda.core.identity.Party +import net.corda.core.internal.packageName +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.OpaqueBytes +import net.corda.finance.DOLLARS +import net.corda.finance.USD +import net.corda.finance.contracts.getCashBalance +import net.corda.finance.flows.CashIssueFlow +import net.corda.finance.flows.CashPaymentFlow +import net.corda.finance.schemas.CashSchemaV1 +import net.corda.node.internal.Node +import net.corda.node.services.Permissions +import net.corda.node.services.messaging.P2PMessagingClient +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.node.User +import net.corda.testing.node.internal.NodeBasedTest +import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException +import org.apache.activemq.artemis.api.core.ActiveMQSecurityException +import org.assertj.core.api.Assertions +import org.junit.Test +import java.util.* +import kotlin.concurrent.thread +import kotlin.test.assertEquals + +class NodeSuspendAndResumeTest : NodeBasedTest(listOf("net.corda.finance.contracts", CashSchemaV1::class.packageName)) { + + private val rpcUser = User("user1", "test", permissions = setOf( + Permissions.startFlow(), + Permissions.startFlow(), + Permissions.invokeRpc("vaultQueryBy"), + Permissions.invokeRpc(CordaRPCOps::stateMachinesFeed), + Permissions.invokeRpc("vaultQueryByCriteria")) + ) + + @Test + fun `start suspend resume`() { + val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) + val node = startedNode.internals + (startedNode.network as P2PMessagingClient).runningFuture.get() + + for (i in 1..10) { + node.suspend() + node.resume() + thread(name = ALICE_NAME.organisation) { + node.run() + } + (startedNode.network as P2PMessagingClient).runningFuture.get() + } + } + + @Test + fun `start suspend resume issuing cash`() { + val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) + val node = startedNode.internals + (startedNode.network as P2PMessagingClient).runningFuture.get() + + for (i in 1..10) { + node.suspend() + node.resume() + thread(name = ALICE_NAME.organisation) { + node.run() + } + (startedNode.network as P2PMessagingClient).runningFuture.get() + + issueCash(node, startedNode.info.identityFromX500Name(ALICE_NAME)) + val currentCashAmount = getCashBalance(node) + println("Balance: $currentCashAmount") + assertEquals((123 * i).DOLLARS, currentCashAmount) + } + } + + @Test + fun `cash not issued when suspended`() { + val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) + val node = startedNode.internals + (startedNode.network as P2PMessagingClient).runningFuture.get() + + issueCash(node, startedNode.info.identityFromX500Name(ALICE_NAME)) + + var currentCashAmount = getCashBalance(node) + println("Balance: $currentCashAmount") + assertEquals(123.DOLLARS, currentCashAmount) + + node.suspend() + Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy { + issueCash(node, startedNode.info.identityFromX500Name(ALICE_NAME)) + } + + node.resume() + thread(name = ALICE_NAME.organisation) { + node.run() + } + (startedNode.network as P2PMessagingClient).runningFuture.get() + + currentCashAmount = getCashBalance(node) + println("Balance: $currentCashAmount") + assertEquals(123.DOLLARS, currentCashAmount) + } + + @Test + fun `initialise node without starting`() { + val node = initNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) + + // The node hasn't been started yet + Assertions.assertThatExceptionOfType(ActiveMQNotConnectedException::class.java).isThrownBy { + issueCash(node, node.generateAndSaveNodeInfo().identityFromX500Name(ALICE_NAME)) + } + + node.start() + thread(name = ALICE_NAME.organisation) { + node.run() + } + (node.started!!.network as P2PMessagingClient).runningFuture.get() + + issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME)) + + val currentCashAmount = getCashBalance(node) + println("Balance: $currentCashAmount") + assertEquals(123.DOLLARS, currentCashAmount) + } + + @Test + fun `resume called on node not previously started`() { + val node = initNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) + + // will start the node + node.resume() + + thread(name = ALICE_NAME.organisation) { + node.run() + } + (node.started!!.network as P2PMessagingClient).runningFuture.get() + + issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME)) + + val currentCashAmount = getCashBalance(node) + println("Balance: $currentCashAmount") + assertEquals(123.DOLLARS, currentCashAmount) + } + + @Test + fun `resume called when node not suspended`() { + val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) + val node = startedNode.internals + + node.stop() + node.resume() + node.resume() + + thread(name = ALICE_NAME.organisation) { + node.run() + } + (node.started!!.network as P2PMessagingClient).runningFuture.get() + + issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME)) + + val currentCashAmount = getCashBalance(node) + println("Balance: $currentCashAmount") + assertEquals(123.DOLLARS, currentCashAmount) + } + + @Test + fun `resume called on started node`() { + val node = initNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) + + node.start() + node.resume() + + thread(name = ALICE_NAME.organisation) { + node.run() + } + (node.started!!.network as P2PMessagingClient).runningFuture.get() + + issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME)) + + val currentCashAmount = getCashBalance(node) + println("Balance: $currentCashAmount") + assertEquals(123.DOLLARS, currentCashAmount) + } + + @Test + fun `suspend called when node not started`() { + val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) + val node = startedNode.internals + + node.stop() + node.suspend() + + Assertions.assertThatExceptionOfType(ActiveMQNotConnectedException::class.java).isThrownBy { + issueCash(node, node.generateAndSaveNodeInfo().identityFromX500Name(ALICE_NAME)) + } + + node.suspend() + + Assertions.assertThatExceptionOfType(ActiveMQNotConnectedException::class.java).isThrownBy { + issueCash(node, node.generateAndSaveNodeInfo().identityFromX500Name(ALICE_NAME)) + } + } + + private fun issueCash(node: Node, party: Party) { + val client = CordaRPCClient(node.configuration.rpcOptions.address!!) + val connection = client.start(rpcUser.username, rpcUser.password) + val proxy = connection.proxy + val flowHandle = proxy.startFlow(::CashIssueFlow, 123.DOLLARS, OpaqueBytes.of(0), party) + println("Started issuing cash, waiting on result") + flowHandle.returnValue.get() + + val cashDollars = proxy.getCashBalance(USD) + println("Balance: $cashDollars") + connection.close() + } + + private fun getCashBalance(node: Node): Amount { + val client = CordaRPCClient(node.configuration.rpcOptions.address!!) + val connection = client.start(rpcUser.username, rpcUser.password) + val proxy = connection.proxy + val cashBalance = proxy.getCashBalance(USD) + connection.close() + return cashBalance + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 383ef96db4..58eee3fb4e 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -131,6 +131,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, protected val services: ServiceHubInternal get() = _services private lateinit var _services: ServiceHubInternalImpl + protected lateinit var smm: StateMachineManager + protected lateinit var schedulerService: NodeSchedulerService protected var myNotaryIdentity: PartyAndCertificate? = null protected lateinit var checkpointStorage: CheckpointStorage private lateinit var tokenizableServices: List @@ -225,10 +227,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration, mutualExclusionConfiguration.updateInterval, mutualExclusionConfiguration.waitInterval).start() } val notaryService = makeNotaryService(nodeServices, database) - val smm = makeStateMachineManager(database) + smm = makeStateMachineManager(database) val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader) val flowStarter = FlowStarterImpl(smm, flowLogicRefFactory) - val schedulerService = NodeSchedulerService( + schedulerService = NodeSchedulerService( platformClock, database, flowStarter, diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 203707be8f..fc67d2e1ae 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -2,6 +2,7 @@ package net.corda.node.internal import com.codahale.metrics.JmxReporter import net.corda.core.concurrent.CordaFuture +import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.thenMatch import net.corda.core.internal.div @@ -389,6 +390,24 @@ open class Node(configuration: NodeConfiguration, return started } + /** + * Resume services stopped after [suspend]. + */ + fun resume() { + if (started == null) { + start() + } else if (suspended) { + bridgeControlListener?.start() + rpcMessagingClient?.resume(started!!.rpcOps, securityManager) + (network as P2PMessagingClient).start() + started!!.database.transaction { + smm.resume() + schedulerService.resume() + } + suspended = false + } + } + override fun getRxIoScheduler(): Scheduler = Schedulers.io() private fun initialiseSerialization() { @@ -407,6 +426,7 @@ open class Node(configuration: NodeConfiguration, private var rpcMessagingClient: RPCMessagingClient? = null private var verifierMessagingClient: VerifierMessagingClient? = null + /** Starts a blocking event loop for message dispatch. */ fun run() { rpcMessagingClient?.start2(rpcBroker!!.serverControl) @@ -436,4 +456,20 @@ open class Node(configuration: NodeConfiguration, log.info("Shutdown complete") } + + private var suspended = false + + /** + * Suspend the minimum number of services([schedulerService], [smm], [network], [rpcMessagingClient], and [bridgeControlListener]). + */ + fun suspend() { + if(started != null && !suspended) { + schedulerService.stop() + smm.stop(0) + (network as P2PMessagingClient).stop() + rpcMessagingClient?.stop() + bridgeControlListener?.stop() + suspended = true + } + } } diff --git a/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManager.kt b/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManager.kt index 4307df44e0..35f812b687 100644 --- a/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManager.kt +++ b/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManager.kt @@ -13,6 +13,11 @@ interface RPCSecurityManager : AutoCloseable { */ val id: AuthServiceId + /** + * Resume + */ + fun resume() + /** * Perform user authentication from principal and password. Return an [AuthorizingSubject] containing * the permissions of the user identified by the given [principal] if authentication via password succeeds, diff --git a/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt b/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt index 4b1daabdde..38c62cd516 100644 --- a/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt @@ -34,15 +34,20 @@ private typealias AuthServiceConfig = SecurityConfiguration.AuthService * Default implementation of [RPCSecurityManager] adapting * [org.apache.shiro.mgt.SecurityManager] */ -class RPCSecurityManagerImpl(config: AuthServiceConfig) : RPCSecurityManager { +class RPCSecurityManagerImpl(private val config: AuthServiceConfig) : RPCSecurityManager { override val id = config.id - private val manager: DefaultSecurityManager + private var manager: DefaultSecurityManager init { manager = buildImpl(config) } + override fun resume() { + close() + manager = buildImpl(config) + } + @Throws(FailedLoginException::class) override fun authenticate(principal: String, password: Password): AuthorizingSubject { password.use { @@ -75,9 +80,12 @@ class RPCSecurityManagerImpl(config: AuthServiceConfig) : RPCSecurityManager { /** * Instantiate RPCSecurityManager initialised with users data from a list of [User] */ - fun fromUserList(id: AuthServiceId, users: List) = - RPCSecurityManagerImpl( + fun fromUserList(id: AuthServiceId, users: List): RPCSecurityManagerImpl { + val rpcSecurityManagerImpl = RPCSecurityManagerImpl( AuthServiceConfig.fromUsers(users).copy(id = id)) + return rpcSecurityManagerImpl + } + // Build internal Shiro securityManager instance private fun buildImpl(config: AuthServiceConfig): DefaultSecurityManager { diff --git a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt index f6bf1630fa..f6335ca44a 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt @@ -171,6 +171,29 @@ class NodeSchedulerService(private val clock: CordaClock, } } + /** + * Stop scheduler service. + */ + fun stop() { + mutex.locked { + schedulerTimerExecutor.shutdown() + scheduledStatesQueue.clear() + scheduledStates.clear() + } + } + + /** + * Resume scheduler service after having called [stop]. + */ + fun resume() { + mutex.locked { + schedulerTimerExecutor = Executors.newSingleThreadExecutor() + scheduledStates.putAll(createMap()) + scheduledStatesQueue.addAll(scheduledStates.values) + rescheduleWakeUp() + } + } + override fun scheduleStateActivity(action: ScheduledStateRef) { log.trace { "Schedule $action" } val previousState = scheduledStates[action.ref] @@ -210,7 +233,7 @@ class NodeSchedulerService(private val clock: CordaClock, } } - private val schedulerTimerExecutor = Executors.newSingleThreadExecutor() + private var schedulerTimerExecutor = Executors.newSingleThreadExecutor() /** * This method first cancels the [java.util.concurrent.Future] for any pending action so that the * [awaitWithDeadline] used below drops through without running the action. We then create a new diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index da91235355..03f04e5419 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -5,6 +5,7 @@ import com.codahale.metrics.MetricRegistry import net.corda.core.crypto.toStringShort import net.corda.core.identity.CordaX500Name import net.corda.core.internal.ThreadBox +import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.SingleMessageRecipient @@ -357,6 +358,8 @@ class P2PMessagingClient(val config: NodeConfiguration, private val shutdownLatch = CountDownLatch(1) + var runningFuture = openFuture() + /** * Starts the p2p event loop: this method only returns once [stop] has been called. */ @@ -367,6 +370,7 @@ class P2PMessagingClient(val config: NodeConfiguration, check(started) { "start must be called first" } check(!running) { "run can't be called twice" } running = true + runningFuture.set(Unit) // If it's null, it means we already called stop, so return immediately. if (p2pConsumer == null) { return @@ -479,6 +483,7 @@ class P2PMessagingClient(val config: NodeConfiguration, check(started) val prevRunning = running running = false + runningFuture = openFuture() networkChangeSubscription?.unsubscribe() require(p2pConsumer != null, {"stop can't be called twice"}) require(producer != null, {"stop can't be called twice"}) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCMessagingClient.kt index 7fbd823671..95ffb223b6 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCMessagingClient.kt @@ -30,6 +30,11 @@ class RPCMessagingClient( rpcServer!!.start(serverControl) } + fun resume(rpcOps: RPCOps, securityManager: RPCSecurityManager) = synchronized(this) { + start(rpcOps, securityManager) + securityManager.resume() + } + fun stop() = synchronized(this) { rpcServer?.close() artemis.stop() diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt index ad14c1c36d..63c9c39931 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt @@ -138,6 +138,19 @@ class MultiThreadedStateMachineManager( lifeCycle.transition(State.UNSTARTED, State.STARTED) } + override fun resume() { + lifeCycle.requireState(State.STOPPED) + fiberDeserializationChecker?.start(checkpointSerializationContext!!) + val fibers = restoreFlowsFromCheckpoints() + Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable -> + (fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable) + } + serviceHub.networkMapCache.nodeReady.then { + resumeRestoredFlows(fibers) + } + lifeCycle.transition(State.STOPPED, State.STARTED) + } + override fun > findStateMachines(flowClass: Class): List>> { return concurrentBox.content.flows.values.mapNotNull { flowClass.castIfPossible(it.fiber.logic)?.let { it to it.stateMachine.resultFuture } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index e1f308f881..19b1c20635 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -134,6 +134,20 @@ class SingleThreadedStateMachineManager( } } + override fun resume() { + fiberDeserializationChecker?.start(checkpointSerializationContext!!) + val fibers = restoreFlowsFromCheckpoints() + Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable -> + (fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable) + } + serviceHub.networkMapCache.nodeReady.then { + resumeRestoredFlows(fibers) + } + mutex.locked { + stopping = false + } + } + override fun > findStateMachines(flowClass: Class): List>> { return mutex.locked { flows.values.mapNotNull { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 924de20d51..62867814f5 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -38,6 +38,11 @@ interface StateMachineManager { */ fun stop(allowedUnsuspendedFiberCount: Int) + /** + * Resume state machine manager after having called [stop]. + */ + fun resume() + /** * Starts a new flow. * diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt index cbde382f4d..2a9b96cc5b 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt @@ -89,6 +89,7 @@ class FiberDeserializationChecker { fun stop(): Boolean { jobQueue.add(Job.Finish) checkerThread?.join() + checkerThread = null return foundUnrestorableFibers } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/NodeBasedTest.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/NodeBasedTest.kt index 15a94fb60d..b90411b706 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/NodeBasedTest.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/NodeBasedTest.kt @@ -46,7 +46,7 @@ abstract class NodeBasedTest(private val cordappPackages: List = emptyLi val tempFolder = TemporaryFolder() private lateinit var defaultNetworkParameters: NetworkParametersCopier - private val nodes = mutableListOf>() + private val startedNodes = mutableListOf>() private val nodeInfos = mutableListOf() init { @@ -64,17 +64,17 @@ abstract class NodeBasedTest(private val cordappPackages: List = emptyLi */ @After fun stopAllNodes() { - val shutdownExecutor = Executors.newScheduledThreadPool(nodes.size) + val shutdownExecutor = Executors.newScheduledThreadPool(startedNodes.size) try { - nodes.map { shutdownExecutor.fork(it::dispose) }.transpose().getOrThrow() + startedNodes.map { shutdownExecutor.fork(it::dispose) }.transpose().getOrThrow() // Wait until ports are released - val portNotBoundChecks = nodes.flatMap { + val portNotBoundChecks = startedNodes.flatMap { listOf( it.internals.configuration.p2pAddress.let { addressMustNotBeBoundFuture(shutdownExecutor, it) }, it.internals.configuration.rpcOptions.address?.let { addressMustNotBeBoundFuture(shutdownExecutor, it) } ) }.filterNotNull() - nodes.clear() + startedNodes.clear() portNotBoundChecks.transpose().getOrThrow() } finally { shutdownExecutor.shutdown() @@ -82,10 +82,10 @@ abstract class NodeBasedTest(private val cordappPackages: List = emptyLi } @JvmOverloads - fun startNode(legalName: CordaX500Name, - platformVersion: Int = 1, - rpcUsers: List = emptyList(), - configOverrides: Map = emptyMap()): StartedNode { + fun initNode(legalName: CordaX500Name, + platformVersion: Int = 1, + rpcUsers: List = emptyList(), + configOverrides: Map = emptyMap()): Node { val baseDirectory = baseDirectory(legalName).createDirectories() val localPort = getFreeLocalPorts("localhost", 3) val p2pAddress = configOverrides["p2pAddress"] ?: localPort[0].toString() @@ -109,14 +109,24 @@ abstract class NodeBasedTest(private val cordappPackages: List = emptyLi } } defaultNetworkParameters.install(baseDirectory) - val node = InProcessNode(parsedConfig, MOCK_VERSION_INFO.copy(platformVersion = platformVersion), cordappPackages).start() - nodes += node + + return InProcessNode(parsedConfig, MOCK_VERSION_INFO.copy(platformVersion = platformVersion), cordappPackages) + } + + @JvmOverloads + fun startNode(legalName: CordaX500Name, + platformVersion: Int = 1, + rpcUsers: List = emptyList(), + configOverrides: Map = emptyMap()): StartedNode { + val node = initNode(legalName,platformVersion, rpcUsers,configOverrides) + val startedNode = node.start() + startedNodes += startedNode ensureAllNetworkMapCachesHaveAllNodeInfos() thread(name = legalName.organisation) { - node.internals.run() + node.run() } - return node + return startedNode } protected fun baseDirectory(legalName: CordaX500Name): Path { @@ -124,7 +134,7 @@ abstract class NodeBasedTest(private val cordappPackages: List = emptyLi } private fun ensureAllNetworkMapCachesHaveAllNodeInfos() { - val runningNodes = nodes.filter { it.internals.started != null } + val runningNodes = startedNodes.filter { it.internals.started != null } val runningNodesInfo = runningNodes.map { it.info } for (node in runningNodes) for (nodeInfo in runningNodesInfo) {