From 7789d5475f12ffc307502cf8be0a3e6a0417fb88 Mon Sep 17 00:00:00 2001 From: cburlinchon Date: Tue, 17 Apr 2018 15:14:58 +0100 Subject: [PATCH] Revert node suspend and resume (#744) --- .../test/node/NodeSuspendAndResumeTest.kt | 259 ------------------ .../net/corda/node/internal/AbstractNode.kt | 6 +- .../kotlin/net/corda/node/internal/Node.kt | 34 --- .../internal/security/RPCSecurityManager.kt | 5 - .../security/RPCSecurityManagerImpl.kt | 17 +- .../services/events/NodeSchedulerService.kt | 25 +- .../services/messaging/P2PMessagingClient.kt | 5 - .../services/messaging/RPCMessagingClient.kt | 5 - .../MultiThreadedStateMachineManager.kt | 13 - .../SingleThreadedStateMachineManager.kt | 14 - .../statemachine/StateMachineManager.kt | 5 - ...FiberDeserializationCheckingInterceptor.kt | 1 - .../vault/VaultSoftLockManagerTest.kt | 2 +- 13 files changed, 8 insertions(+), 383 deletions(-) delete mode 100644 node/src/integration-test/kotlin/net/corda/test/node/NodeSuspendAndResumeTest.kt diff --git a/node/src/integration-test/kotlin/net/corda/test/node/NodeSuspendAndResumeTest.kt b/node/src/integration-test/kotlin/net/corda/test/node/NodeSuspendAndResumeTest.kt deleted file mode 100644 index 1bf74a8c88..0000000000 --- a/node/src/integration-test/kotlin/net/corda/test/node/NodeSuspendAndResumeTest.kt +++ /dev/null @@ -1,259 +0,0 @@ -/* - * R3 Proprietary and Confidential - * - * Copyright (c) 2018 R3 Limited. All rights reserved. - * - * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. - * - * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. - */ - -package net.corda.test.node - -import net.corda.client.rpc.CordaRPCClient -import net.corda.core.contracts.Amount -import net.corda.core.identity.Party -import net.corda.core.internal.packageName -import net.corda.core.messaging.CordaRPCOps -import net.corda.core.messaging.startFlow -import net.corda.core.utilities.OpaqueBytes -import net.corda.finance.DOLLARS -import net.corda.finance.USD -import net.corda.finance.contracts.getCashBalance -import net.corda.finance.flows.CashIssueFlow -import net.corda.finance.flows.CashPaymentFlow -import net.corda.finance.schemas.CashSchemaV1 -import net.corda.node.internal.Node -import net.corda.node.services.Permissions -import net.corda.node.services.messaging.P2PMessagingClient -import net.corda.testing.core.ALICE_NAME -import net.corda.testing.core.DUMMY_NOTARY_NAME -import net.corda.testing.internal.IntegrationTestSchemas -import net.corda.testing.internal.toDatabaseSchemaName -import net.corda.testing.node.User -import net.corda.testing.node.internal.NodeBasedTest -import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException -import org.apache.activemq.artemis.api.core.ActiveMQSecurityException -import org.assertj.core.api.Assertions -import org.junit.ClassRule -import org.junit.Test -import java.util.* -import kotlin.concurrent.thread -import kotlin.test.assertEquals - -class NodeSuspendAndResumeTest : NodeBasedTest(listOf("net.corda.finance.contracts", CashSchemaV1::class.packageName)) { - companion object { - @ClassRule - @JvmField - val databaseSchemas = IntegrationTestSchemas(ALICE_NAME.toDatabaseSchemaName(), DUMMY_NOTARY_NAME.toDatabaseSchemaName()) - } - - private val rpcUser = User("user1", "test", permissions = setOf( - Permissions.startFlow(), - Permissions.startFlow(), - Permissions.invokeRpc("vaultQueryBy"), - Permissions.invokeRpc(CordaRPCOps::stateMachinesFeed), - Permissions.invokeRpc("vaultQueryByCriteria")) - ) - - @Test - fun `start suspend resume`() { - val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) - val node = startedNode.internals - (startedNode.network as P2PMessagingClient).runningFuture.get() - - for (i in 1..10) { - node.suspend() - node.resume() - thread(name = ALICE_NAME.organisation) { - node.run() - } - (startedNode.network as P2PMessagingClient).runningFuture.get() - } - node.stop() - } - - @Test - fun `start suspend resume issuing cash`() { - val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) - val node = startedNode.internals - (startedNode.network as P2PMessagingClient).runningFuture.get() - - for (i in 1..10) { - node.suspend() - node.resume() - thread(name = ALICE_NAME.organisation) { - node.run() - } - (startedNode.network as P2PMessagingClient).runningFuture.get() - - issueCash(node, startedNode.info.identityFromX500Name(ALICE_NAME)) - val currentCashAmount = getCashBalance(node) - println("Balance: $currentCashAmount") - assertEquals((123 * i).DOLLARS, currentCashAmount) - } - node.stop() - } - - @Test - fun `cash not issued when suspended`() { - val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) - val node = startedNode.internals - (startedNode.network as P2PMessagingClient).runningFuture.get() - - issueCash(node, startedNode.info.identityFromX500Name(ALICE_NAME)) - - var currentCashAmount = getCashBalance(node) - println("Balance: $currentCashAmount") - assertEquals(123.DOLLARS, currentCashAmount) - - node.suspend() - Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy { - issueCash(node, startedNode.info.identityFromX500Name(ALICE_NAME)) - } - - node.resume() - thread(name = ALICE_NAME.organisation) { - node.run() - } - (startedNode.network as P2PMessagingClient).runningFuture.get() - - currentCashAmount = getCashBalance(node) - println("Balance: $currentCashAmount") - assertEquals(123.DOLLARS, currentCashAmount) - - node.stop() - } - - @Test - fun `initialise node without starting`() { - val node = initNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) - - // The node hasn't been started yet - Assertions.assertThatExceptionOfType(ActiveMQNotConnectedException::class.java).isThrownBy { - issueCash(node, node.generateAndSaveNodeInfo().identityFromX500Name(ALICE_NAME)) - } - - node.start() - thread(name = ALICE_NAME.organisation) { - node.run() - } - (node.started!!.network as P2PMessagingClient).runningFuture.get() - - issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME)) - - val currentCashAmount = getCashBalance(node) - println("Balance: $currentCashAmount") - assertEquals(123.DOLLARS, currentCashAmount) - - node.stop() - } - - @Test - fun `resume called on node not previously started`() { - val node = initNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) - - // will start the node - node.resume() - - thread(name = ALICE_NAME.organisation) { - node.run() - } - (node.started!!.network as P2PMessagingClient).runningFuture.get() - - issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME)) - - val currentCashAmount = getCashBalance(node) - println("Balance: $currentCashAmount") - assertEquals(123.DOLLARS, currentCashAmount) - - node.stop() - } - - @Test - fun `resume called when node not suspended`() { - val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) - val node = startedNode.internals - - node.stop() - node.resume() - node.resume() - - thread(name = ALICE_NAME.organisation) { - node.run() - } - (node.started!!.network as P2PMessagingClient).runningFuture.get() - - issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME)) - - val currentCashAmount = getCashBalance(node) - println("Balance: $currentCashAmount") - assertEquals(123.DOLLARS, currentCashAmount) - - node.stop() - } - - @Test - fun `resume called on started node`() { - val node = initNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) - - node.start() - node.resume() - - thread(name = ALICE_NAME.organisation) { - node.run() - } - (node.started!!.network as P2PMessagingClient).runningFuture.get() - - issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME)) - - val currentCashAmount = getCashBalance(node) - println("Balance: $currentCashAmount") - assertEquals(123.DOLLARS, currentCashAmount) - - node.stop() - } - - @Test - fun `suspend called when node not started`() { - val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) - val node = startedNode.internals - - node.stop() - node.suspend() - - Assertions.assertThatExceptionOfType(ActiveMQNotConnectedException::class.java).isThrownBy { - issueCash(node, node.generateAndSaveNodeInfo().identityFromX500Name(ALICE_NAME)) - } - - node.suspend() - - Assertions.assertThatExceptionOfType(ActiveMQNotConnectedException::class.java).isThrownBy { - issueCash(node, node.generateAndSaveNodeInfo().identityFromX500Name(ALICE_NAME)) - } - - node.stop() - } - - private fun issueCash(node: Node, party: Party) { - val client = CordaRPCClient(node.configuration.rpcOptions.address!!) - val connection = client.start(rpcUser.username, rpcUser.password) - val proxy = connection.proxy - val flowHandle = proxy.startFlow(::CashIssueFlow, 123.DOLLARS, OpaqueBytes.of(0), party) - println("Started issuing cash, waiting on result") - flowHandle.returnValue.get() - - val cashDollars = proxy.getCashBalance(USD) - println("Balance: $cashDollars") - connection.close() - } - - private fun getCashBalance(node: Node): Amount { - val client = CordaRPCClient(node.configuration.rpcOptions.address!!) - val connection = client.start(rpcUser.username, rpcUser.password) - val proxy = connection.proxy - val cashBalance = proxy.getCashBalance(USD) - connection.close() - return cashBalance - } -} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 087d5c08c5..5bf2c8924f 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -146,8 +146,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, protected val services: ServiceHubInternal get() = _services private lateinit var _services: ServiceHubInternalImpl - protected lateinit var smm: StateMachineManager - protected lateinit var schedulerService: NodeSchedulerService protected var myNotaryIdentity: PartyAndCertificate? = null protected lateinit var checkpointStorage: CheckpointStorage private lateinit var tokenizableServices: List @@ -255,10 +253,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration, mutualExclusionConfiguration.updateInterval, mutualExclusionConfiguration.waitInterval).start() } val notaryService = makeNotaryService(nodeServices, database) - smm = makeStateMachineManager(database) + val smm = makeStateMachineManager(database) val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader) val flowStarter = FlowStarterImpl(smm, flowLogicRefFactory) - schedulerService = NodeSchedulerService( + val schedulerService = NodeSchedulerService( platformClock, database, flowStarter, diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 9241482528..d9874d42cd 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -396,24 +396,6 @@ open class Node(configuration: NodeConfiguration, return started } - /** - * Resume services stopped after [suspend]. - */ - fun resume() { - if (started == null) { - start() - } else if (suspended) { - bridgeControlListener?.start() - rpcMessagingClient?.resume(started!!.rpcOps, securityManager) - (network as P2PMessagingClient).start() - started!!.database.transaction { - smm.resume() - schedulerService.resume() - } - suspended = false - } - } - override fun getRxIoScheduler(): Scheduler = Schedulers.io() private fun initialiseSerialization() { @@ -464,20 +446,4 @@ open class Node(configuration: NodeConfiguration, log.info("Shutdown complete") } - - private var suspended = false - - /** - * Suspend the minimum number of services([schedulerService], [smm], [network], [rpcMessagingClient], and [bridgeControlListener]). - */ - fun suspend() { - if(started != null && !suspended) { - schedulerService.stop() - smm.stop(0) - (network as P2PMessagingClient).stop() - rpcMessagingClient?.stop() - bridgeControlListener?.stop() - suspended = true - } - } } diff --git a/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManager.kt b/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManager.kt index d139e74e4a..c23929f698 100644 --- a/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManager.kt +++ b/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManager.kt @@ -23,11 +23,6 @@ interface RPCSecurityManager : AutoCloseable { */ val id: AuthServiceId - /** - * Resume - */ - fun resume() - /** * Perform user authentication from principal and password. Return an [AuthorizingSubject] containing * the permissions of the user identified by the given [principal] if authentication via password succeeds, diff --git a/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt b/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt index f8857e5c02..6102d4494f 100644 --- a/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt @@ -45,20 +45,15 @@ private typealias AuthServiceConfig = SecurityConfiguration.AuthService * Default implementation of [RPCSecurityManager] adapting * [org.apache.shiro.mgt.SecurityManager] */ -class RPCSecurityManagerImpl(private val config: AuthServiceConfig) : RPCSecurityManager { +class RPCSecurityManagerImpl(config: AuthServiceConfig) : RPCSecurityManager { override val id = config.id - private var manager: DefaultSecurityManager + private val manager: DefaultSecurityManager init { manager = buildImpl(config) } - override fun resume() { - close() - manager = buildImpl(config) - } - @Throws(FailedLoginException::class) override fun authenticate(principal: String, password: Password): AuthorizingSubject { password.use { @@ -91,12 +86,8 @@ class RPCSecurityManagerImpl(private val config: AuthServiceConfig) : RPCSecurit /** * Instantiate RPCSecurityManager initialised with users data from a list of [User] */ - fun fromUserList(id: AuthServiceId, users: List): RPCSecurityManagerImpl { - val rpcSecurityManagerImpl = RPCSecurityManagerImpl( - AuthServiceConfig.fromUsers(users).copy(id = id)) - return rpcSecurityManagerImpl - } - + fun fromUserList(id: AuthServiceId, users: List) = + RPCSecurityManagerImpl(AuthServiceConfig.fromUsers(users).copy(id = id)) // Build internal Shiro securityManager instance private fun buildImpl(config: AuthServiceConfig): DefaultSecurityManager { diff --git a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt index 4583018ac0..8e591e68d4 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt @@ -188,29 +188,6 @@ class NodeSchedulerService(private val clock: CordaClock, } } - /** - * Stop scheduler service. - */ - fun stop() { - mutex.locked { - schedulerTimerExecutor.shutdown() - scheduledStatesQueue.clear() - scheduledStates.clear() - } - } - - /** - * Resume scheduler service after having called [stop]. - */ - fun resume() { - mutex.locked { - schedulerTimerExecutor = Executors.newSingleThreadExecutor() - scheduledStates.putAll(createMap()) - scheduledStatesQueue.addAll(scheduledStates.values) - rescheduleWakeUp() - } - } - override fun scheduleStateActivity(action: ScheduledStateRef) { log.trace { "Schedule $action" } val previousState = scheduledStates[action.ref] @@ -250,7 +227,7 @@ class NodeSchedulerService(private val clock: CordaClock, } } - private var schedulerTimerExecutor = Executors.newSingleThreadExecutor() + private val schedulerTimerExecutor = Executors.newSingleThreadExecutor() /** * This method first cancels the [java.util.concurrent.Future] for any pending action so that the * [awaitWithDeadline] used below drops through without running the action. We then create a new diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 48001bfd76..097e601722 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -15,7 +15,6 @@ import com.codahale.metrics.MetricRegistry import net.corda.core.crypto.toStringShort import net.corda.core.identity.CordaX500Name import net.corda.core.internal.ThreadBox -import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.SingleMessageRecipient @@ -353,8 +352,6 @@ class P2PMessagingClient(val config: NodeConfiguration, private val shutdownLatch = CountDownLatch(1) - var runningFuture = openFuture() - /** * Starts the p2p event loop: this method only returns once [stop] has been called. */ @@ -365,7 +362,6 @@ class P2PMessagingClient(val config: NodeConfiguration, check(started) { "start must be called first" } check(!running) { "run can't be called twice" } running = true - runningFuture.set(Unit) // If it's null, it means we already called stop, so return immediately. if (p2pConsumer == null) { return @@ -484,7 +480,6 @@ class P2PMessagingClient(val config: NodeConfiguration, check(started) val prevRunning = running running = false - runningFuture = openFuture() networkChangeSubscription?.unsubscribe() require(p2pConsumer != null, { "stop can't be called twice" }) require(producer != null, { "stop can't be called twice" }) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCMessagingClient.kt index 259b6933a8..990589d06a 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCMessagingClient.kt @@ -40,11 +40,6 @@ class RPCMessagingClient( rpcServer!!.start(serverControl) } - fun resume(rpcOps: RPCOps, securityManager: RPCSecurityManager) = synchronized(this) { - start(rpcOps, securityManager) - securityManager.resume() - } - fun stop() = synchronized(this) { rpcServer?.close() artemis.stop() diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt index 9669251b89..4bc667b8a3 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt @@ -149,19 +149,6 @@ class MultiThreadedStateMachineManager( lifeCycle.transition(State.UNSTARTED, State.STARTED) } - override fun resume() { - lifeCycle.requireState(State.STOPPED) - fiberDeserializationChecker?.start(checkpointSerializationContext!!) - val fibers = restoreFlowsFromCheckpoints() - Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable -> - (fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable) - } - serviceHub.networkMapCache.nodeReady.then { - resumeRestoredFlows(fibers) - } - lifeCycle.transition(State.STOPPED, State.STARTED) - } - override fun > findStateMachines(flowClass: Class): List>> { return concurrentBox.content.flows.values.mapNotNull { flowClass.castIfPossible(it.fiber.logic)?.let { it to it.stateMachine.resultFuture } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 74b80db97f..28518acb28 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -143,20 +143,6 @@ class SingleThreadedStateMachineManager( } } - override fun resume() { - fiberDeserializationChecker?.start(checkpointSerializationContext!!) - val fibers = restoreFlowsFromCheckpoints() - Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable -> - (fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable) - } - serviceHub.networkMapCache.nodeReady.then { - resumeRestoredFlows(fibers) - } - mutex.locked { - stopping = false - } - } - override fun > findStateMachines(flowClass: Class): List>> { return mutex.locked { flows.values.mapNotNull { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 51e12e57db..446e99d536 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -49,11 +49,6 @@ interface StateMachineManager { */ fun stop(allowedUnsuspendedFiberCount: Int) - /** - * Resume state machine manager after having called [stop]. - */ - fun resume() - /** * Starts a new flow. * diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt index 72f42be049..c7ded2c353 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt @@ -99,7 +99,6 @@ class FiberDeserializationChecker { fun stop(): Boolean { jobQueue.add(Job.Finish) checkerThread?.join() - checkerThread = null return foundUnrestorableFibers } } diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt index d90b44c538..e4698681d6 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt @@ -99,7 +99,7 @@ class VaultSoftLockManagerTest { return object : VaultServiceInternal by realVault { override fun softLockRelease(lockId: UUID, stateRefs: NonEmptySet?) { // Should be called before flow is removed - assertEquals(1, node.smm.allStateMachines.size) + assertEquals(1, node.started!!.smm.allStateMachines.size) mockVault.softLockRelease(lockId, stateRefs) // No need to also call the real one for these tests. } }