Revert node suspend and resume (#744)

This commit is contained in:
cburlinchon 2018-04-17 15:14:58 +01:00 committed by GitHub
parent 0deed37569
commit 7789d5475f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 8 additions and 383 deletions

View File

@ -1,259 +0,0 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
package net.corda.test.node
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.contracts.Amount
import net.corda.core.identity.Party
import net.corda.core.internal.packageName
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.OpaqueBytes
import net.corda.finance.DOLLARS
import net.corda.finance.USD
import net.corda.finance.contracts.getCashBalance
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.finance.schemas.CashSchemaV1
import net.corda.node.internal.Node
import net.corda.node.services.Permissions
import net.corda.node.services.messaging.P2PMessagingClient
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.internal.IntegrationTestSchemas
import net.corda.testing.internal.toDatabaseSchemaName
import net.corda.testing.node.User
import net.corda.testing.node.internal.NodeBasedTest
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.assertj.core.api.Assertions
import org.junit.ClassRule
import org.junit.Test
import java.util.*
import kotlin.concurrent.thread
import kotlin.test.assertEquals
class NodeSuspendAndResumeTest : NodeBasedTest(listOf("net.corda.finance.contracts", CashSchemaV1::class.packageName)) {
companion object {
@ClassRule
@JvmField
val databaseSchemas = IntegrationTestSchemas(ALICE_NAME.toDatabaseSchemaName(), DUMMY_NOTARY_NAME.toDatabaseSchemaName())
}
private val rpcUser = User("user1", "test", permissions = setOf(
Permissions.startFlow<CashIssueFlow>(),
Permissions.startFlow<CashPaymentFlow>(),
Permissions.invokeRpc("vaultQueryBy"),
Permissions.invokeRpc(CordaRPCOps::stateMachinesFeed),
Permissions.invokeRpc("vaultQueryByCriteria"))
)
@Test
fun `start suspend resume`() {
val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
val node = startedNode.internals
(startedNode.network as P2PMessagingClient).runningFuture.get()
for (i in 1..10) {
node.suspend()
node.resume()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(startedNode.network as P2PMessagingClient).runningFuture.get()
}
node.stop()
}
@Test
fun `start suspend resume issuing cash`() {
val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
val node = startedNode.internals
(startedNode.network as P2PMessagingClient).runningFuture.get()
for (i in 1..10) {
node.suspend()
node.resume()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(startedNode.network as P2PMessagingClient).runningFuture.get()
issueCash(node, startedNode.info.identityFromX500Name(ALICE_NAME))
val currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals((123 * i).DOLLARS, currentCashAmount)
}
node.stop()
}
@Test
fun `cash not issued when suspended`() {
val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
val node = startedNode.internals
(startedNode.network as P2PMessagingClient).runningFuture.get()
issueCash(node, startedNode.info.identityFromX500Name(ALICE_NAME))
var currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals(123.DOLLARS, currentCashAmount)
node.suspend()
Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
issueCash(node, startedNode.info.identityFromX500Name(ALICE_NAME))
}
node.resume()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(startedNode.network as P2PMessagingClient).runningFuture.get()
currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals(123.DOLLARS, currentCashAmount)
node.stop()
}
@Test
fun `initialise node without starting`() {
val node = initNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
// The node hasn't been started yet
Assertions.assertThatExceptionOfType(ActiveMQNotConnectedException::class.java).isThrownBy {
issueCash(node, node.generateAndSaveNodeInfo().identityFromX500Name(ALICE_NAME))
}
node.start()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(node.started!!.network as P2PMessagingClient).runningFuture.get()
issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME))
val currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals(123.DOLLARS, currentCashAmount)
node.stop()
}
@Test
fun `resume called on node not previously started`() {
val node = initNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
// will start the node
node.resume()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(node.started!!.network as P2PMessagingClient).runningFuture.get()
issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME))
val currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals(123.DOLLARS, currentCashAmount)
node.stop()
}
@Test
fun `resume called when node not suspended`() {
val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
val node = startedNode.internals
node.stop()
node.resume()
node.resume()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(node.started!!.network as P2PMessagingClient).runningFuture.get()
issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME))
val currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals(123.DOLLARS, currentCashAmount)
node.stop()
}
@Test
fun `resume called on started node`() {
val node = initNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
node.start()
node.resume()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(node.started!!.network as P2PMessagingClient).runningFuture.get()
issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME))
val currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals(123.DOLLARS, currentCashAmount)
node.stop()
}
@Test
fun `suspend called when node not started`() {
val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
val node = startedNode.internals
node.stop()
node.suspend()
Assertions.assertThatExceptionOfType(ActiveMQNotConnectedException::class.java).isThrownBy {
issueCash(node, node.generateAndSaveNodeInfo().identityFromX500Name(ALICE_NAME))
}
node.suspend()
Assertions.assertThatExceptionOfType(ActiveMQNotConnectedException::class.java).isThrownBy {
issueCash(node, node.generateAndSaveNodeInfo().identityFromX500Name(ALICE_NAME))
}
node.stop()
}
private fun issueCash(node: Node, party: Party) {
val client = CordaRPCClient(node.configuration.rpcOptions.address!!)
val connection = client.start(rpcUser.username, rpcUser.password)
val proxy = connection.proxy
val flowHandle = proxy.startFlow(::CashIssueFlow, 123.DOLLARS, OpaqueBytes.of(0), party)
println("Started issuing cash, waiting on result")
flowHandle.returnValue.get()
val cashDollars = proxy.getCashBalance(USD)
println("Balance: $cashDollars")
connection.close()
}
private fun getCashBalance(node: Node): Amount<Currency> {
val client = CordaRPCClient(node.configuration.rpcOptions.address!!)
val connection = client.start(rpcUser.username, rpcUser.password)
val proxy = connection.proxy
val cashBalance = proxy.getCashBalance(USD)
connection.close()
return cashBalance
}
}

View File

@ -146,8 +146,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
protected val services: ServiceHubInternal get() = _services protected val services: ServiceHubInternal get() = _services
private lateinit var _services: ServiceHubInternalImpl private lateinit var _services: ServiceHubInternalImpl
protected lateinit var smm: StateMachineManager
protected lateinit var schedulerService: NodeSchedulerService
protected var myNotaryIdentity: PartyAndCertificate? = null protected var myNotaryIdentity: PartyAndCertificate? = null
protected lateinit var checkpointStorage: CheckpointStorage protected lateinit var checkpointStorage: CheckpointStorage
private lateinit var tokenizableServices: List<Any> private lateinit var tokenizableServices: List<Any>
@ -255,10 +253,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
mutualExclusionConfiguration.updateInterval, mutualExclusionConfiguration.waitInterval).start() mutualExclusionConfiguration.updateInterval, mutualExclusionConfiguration.waitInterval).start()
} }
val notaryService = makeNotaryService(nodeServices, database) val notaryService = makeNotaryService(nodeServices, database)
smm = makeStateMachineManager(database) val smm = makeStateMachineManager(database)
val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader) val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader)
val flowStarter = FlowStarterImpl(smm, flowLogicRefFactory) val flowStarter = FlowStarterImpl(smm, flowLogicRefFactory)
schedulerService = NodeSchedulerService( val schedulerService = NodeSchedulerService(
platformClock, platformClock,
database, database,
flowStarter, flowStarter,

View File

@ -396,24 +396,6 @@ open class Node(configuration: NodeConfiguration,
return started return started
} }
/**
* Resume services stopped after [suspend].
*/
fun resume() {
if (started == null) {
start()
} else if (suspended) {
bridgeControlListener?.start()
rpcMessagingClient?.resume(started!!.rpcOps, securityManager)
(network as P2PMessagingClient).start()
started!!.database.transaction {
smm.resume()
schedulerService.resume()
}
suspended = false
}
}
override fun getRxIoScheduler(): Scheduler = Schedulers.io() override fun getRxIoScheduler(): Scheduler = Schedulers.io()
private fun initialiseSerialization() { private fun initialiseSerialization() {
@ -464,20 +446,4 @@ open class Node(configuration: NodeConfiguration,
log.info("Shutdown complete") log.info("Shutdown complete")
} }
private var suspended = false
/**
* Suspend the minimum number of services([schedulerService], [smm], [network], [rpcMessagingClient], and [bridgeControlListener]).
*/
fun suspend() {
if(started != null && !suspended) {
schedulerService.stop()
smm.stop(0)
(network as P2PMessagingClient).stop()
rpcMessagingClient?.stop()
bridgeControlListener?.stop()
suspended = true
}
}
} }

View File

@ -23,11 +23,6 @@ interface RPCSecurityManager : AutoCloseable {
*/ */
val id: AuthServiceId val id: AuthServiceId
/**
* Resume
*/
fun resume()
/** /**
* Perform user authentication from principal and password. Return an [AuthorizingSubject] containing * Perform user authentication from principal and password. Return an [AuthorizingSubject] containing
* the permissions of the user identified by the given [principal] if authentication via password succeeds, * the permissions of the user identified by the given [principal] if authentication via password succeeds,

View File

@ -45,20 +45,15 @@ private typealias AuthServiceConfig = SecurityConfiguration.AuthService
* Default implementation of [RPCSecurityManager] adapting * Default implementation of [RPCSecurityManager] adapting
* [org.apache.shiro.mgt.SecurityManager] * [org.apache.shiro.mgt.SecurityManager]
*/ */
class RPCSecurityManagerImpl(private val config: AuthServiceConfig) : RPCSecurityManager { class RPCSecurityManagerImpl(config: AuthServiceConfig) : RPCSecurityManager {
override val id = config.id override val id = config.id
private var manager: DefaultSecurityManager private val manager: DefaultSecurityManager
init { init {
manager = buildImpl(config) manager = buildImpl(config)
} }
override fun resume() {
close()
manager = buildImpl(config)
}
@Throws(FailedLoginException::class) @Throws(FailedLoginException::class)
override fun authenticate(principal: String, password: Password): AuthorizingSubject { override fun authenticate(principal: String, password: Password): AuthorizingSubject {
password.use { password.use {
@ -91,12 +86,8 @@ class RPCSecurityManagerImpl(private val config: AuthServiceConfig) : RPCSecurit
/** /**
* Instantiate RPCSecurityManager initialised with users data from a list of [User] * Instantiate RPCSecurityManager initialised with users data from a list of [User]
*/ */
fun fromUserList(id: AuthServiceId, users: List<User>): RPCSecurityManagerImpl { fun fromUserList(id: AuthServiceId, users: List<User>) =
val rpcSecurityManagerImpl = RPCSecurityManagerImpl( RPCSecurityManagerImpl(AuthServiceConfig.fromUsers(users).copy(id = id))
AuthServiceConfig.fromUsers(users).copy(id = id))
return rpcSecurityManagerImpl
}
// Build internal Shiro securityManager instance // Build internal Shiro securityManager instance
private fun buildImpl(config: AuthServiceConfig): DefaultSecurityManager { private fun buildImpl(config: AuthServiceConfig): DefaultSecurityManager {

View File

@ -188,29 +188,6 @@ class NodeSchedulerService(private val clock: CordaClock,
} }
} }
/**
* Stop scheduler service.
*/
fun stop() {
mutex.locked {
schedulerTimerExecutor.shutdown()
scheduledStatesQueue.clear()
scheduledStates.clear()
}
}
/**
* Resume scheduler service after having called [stop].
*/
fun resume() {
mutex.locked {
schedulerTimerExecutor = Executors.newSingleThreadExecutor()
scheduledStates.putAll(createMap())
scheduledStatesQueue.addAll(scheduledStates.values)
rescheduleWakeUp()
}
}
override fun scheduleStateActivity(action: ScheduledStateRef) { override fun scheduleStateActivity(action: ScheduledStateRef) {
log.trace { "Schedule $action" } log.trace { "Schedule $action" }
val previousState = scheduledStates[action.ref] val previousState = scheduledStates[action.ref]
@ -250,7 +227,7 @@ class NodeSchedulerService(private val clock: CordaClock,
} }
} }
private var schedulerTimerExecutor = Executors.newSingleThreadExecutor() private val schedulerTimerExecutor = Executors.newSingleThreadExecutor()
/** /**
* This method first cancels the [java.util.concurrent.Future] for any pending action so that the * This method first cancels the [java.util.concurrent.Future] for any pending action so that the
* [awaitWithDeadline] used below drops through without running the action. We then create a new * [awaitWithDeadline] used below drops through without running the action. We then create a new

View File

@ -15,7 +15,6 @@ import com.codahale.metrics.MetricRegistry
import net.corda.core.crypto.toStringShort import net.corda.core.crypto.toStringShort
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.ThreadBox import net.corda.core.internal.ThreadBox
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.messaging.SingleMessageRecipient
@ -353,8 +352,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
private val shutdownLatch = CountDownLatch(1) private val shutdownLatch = CountDownLatch(1)
var runningFuture = openFuture<Unit>()
/** /**
* Starts the p2p event loop: this method only returns once [stop] has been called. * Starts the p2p event loop: this method only returns once [stop] has been called.
*/ */
@ -365,7 +362,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
check(started) { "start must be called first" } check(started) { "start must be called first" }
check(!running) { "run can't be called twice" } check(!running) { "run can't be called twice" }
running = true running = true
runningFuture.set(Unit)
// If it's null, it means we already called stop, so return immediately. // If it's null, it means we already called stop, so return immediately.
if (p2pConsumer == null) { if (p2pConsumer == null) {
return return
@ -484,7 +480,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
check(started) check(started)
val prevRunning = running val prevRunning = running
running = false running = false
runningFuture = openFuture()
networkChangeSubscription?.unsubscribe() networkChangeSubscription?.unsubscribe()
require(p2pConsumer != null, { "stop can't be called twice" }) require(p2pConsumer != null, { "stop can't be called twice" })
require(producer != null, { "stop can't be called twice" }) require(producer != null, { "stop can't be called twice" })

View File

@ -40,11 +40,6 @@ class RPCMessagingClient(
rpcServer!!.start(serverControl) rpcServer!!.start(serverControl)
} }
fun resume(rpcOps: RPCOps, securityManager: RPCSecurityManager) = synchronized(this) {
start(rpcOps, securityManager)
securityManager.resume()
}
fun stop() = synchronized(this) { fun stop() = synchronized(this) {
rpcServer?.close() rpcServer?.close()
artemis.stop() artemis.stop()

View File

@ -149,19 +149,6 @@ class MultiThreadedStateMachineManager(
lifeCycle.transition(State.UNSTARTED, State.STARTED) lifeCycle.transition(State.UNSTARTED, State.STARTED)
} }
override fun resume() {
lifeCycle.requireState(State.STOPPED)
fiberDeserializationChecker?.start(checkpointSerializationContext!!)
val fibers = restoreFlowsFromCheckpoints()
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
(fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable)
}
serviceHub.networkMapCache.nodeReady.then {
resumeRestoredFlows(fibers)
}
lifeCycle.transition(State.STOPPED, State.STARTED)
}
override fun <A : FlowLogic<*>> findStateMachines(flowClass: Class<A>): List<Pair<A, CordaFuture<*>>> { override fun <A : FlowLogic<*>> findStateMachines(flowClass: Class<A>): List<Pair<A, CordaFuture<*>>> {
return concurrentBox.content.flows.values.mapNotNull { return concurrentBox.content.flows.values.mapNotNull {
flowClass.castIfPossible(it.fiber.logic)?.let { it to it.stateMachine.resultFuture } flowClass.castIfPossible(it.fiber.logic)?.let { it to it.stateMachine.resultFuture }

View File

@ -143,20 +143,6 @@ class SingleThreadedStateMachineManager(
} }
} }
override fun resume() {
fiberDeserializationChecker?.start(checkpointSerializationContext!!)
val fibers = restoreFlowsFromCheckpoints()
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
(fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable)
}
serviceHub.networkMapCache.nodeReady.then {
resumeRestoredFlows(fibers)
}
mutex.locked {
stopping = false
}
}
override fun <A : FlowLogic<*>> findStateMachines(flowClass: Class<A>): List<Pair<A, CordaFuture<*>>> { override fun <A : FlowLogic<*>> findStateMachines(flowClass: Class<A>): List<Pair<A, CordaFuture<*>>> {
return mutex.locked { return mutex.locked {
flows.values.mapNotNull { flows.values.mapNotNull {

View File

@ -49,11 +49,6 @@ interface StateMachineManager {
*/ */
fun stop(allowedUnsuspendedFiberCount: Int) fun stop(allowedUnsuspendedFiberCount: Int)
/**
* Resume state machine manager after having called [stop].
*/
fun resume()
/** /**
* Starts a new flow. * Starts a new flow.
* *

View File

@ -99,7 +99,6 @@ class FiberDeserializationChecker {
fun stop(): Boolean { fun stop(): Boolean {
jobQueue.add(Job.Finish) jobQueue.add(Job.Finish)
checkerThread?.join() checkerThread?.join()
checkerThread = null
return foundUnrestorableFibers return foundUnrestorableFibers
} }
} }

View File

@ -99,7 +99,7 @@ class VaultSoftLockManagerTest {
return object : VaultServiceInternal by realVault { return object : VaultServiceInternal by realVault {
override fun softLockRelease(lockId: UUID, stateRefs: NonEmptySet<StateRef>?) { override fun softLockRelease(lockId: UUID, stateRefs: NonEmptySet<StateRef>?) {
// Should be called before flow is removed // Should be called before flow is removed
assertEquals(1, node.smm.allStateMachines.size) assertEquals(1, node.started!!.smm.allStateMachines.size)
mockVault.softLockRelease(lockId, stateRefs) // No need to also call the real one for these tests. mockVault.softLockRelease(lockId, stateRefs) // No need to also call the real one for these tests.
} }
} }