CORDA-998 Add suspend and resume functionality to Node (#461)

* Add suspend and resume functionality to Node to stop minimum number of services(P2PMessaging, RPCMessaging, NodeScheduler, StateMachine and BridgeManager)
This commit is contained in:
cburlinchon 2018-02-26 11:56:02 +00:00 committed by GitHub
parent 908a614888
commit 60c44a0358
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 374 additions and 47 deletions

View File

@ -1,26 +0,0 @@
package net.corda.test.node
import net.corda.core.utilities.getOrThrow
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.internal.IntegrationTestSchemas
import net.corda.testing.internal.toDatabaseSchemaName
import net.corda.testing.node.internal.NodeBasedTest
import org.junit.ClassRule
import org.junit.Test
class NodeStartAndStopTest : NodeBasedTest() {
companion object {
@ClassRule @JvmField
val databaseSchemas = IntegrationTestSchemas(ALICE_NAME.toDatabaseSchemaName())
}
@Test
fun `start stop start`() {
val node = startNode(ALICE_NAME)
node.internals.startupComplete.get()
node.internals.stop()
node.internals.start()
node.internals.startupComplete.getOrThrow()
}
}

View File

@ -0,0 +1,226 @@
package net.corda.test.node
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.contracts.Amount
import net.corda.core.identity.Party
import net.corda.core.internal.packageName
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.OpaqueBytes
import net.corda.finance.DOLLARS
import net.corda.finance.USD
import net.corda.finance.contracts.getCashBalance
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.finance.schemas.CashSchemaV1
import net.corda.node.internal.Node
import net.corda.node.services.Permissions
import net.corda.node.services.messaging.P2PMessagingClient
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.node.User
import net.corda.testing.node.internal.NodeBasedTest
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.assertj.core.api.Assertions
import org.junit.Test
import java.util.*
import kotlin.concurrent.thread
import kotlin.test.assertEquals
class NodeSuspendAndResumeTest : NodeBasedTest(listOf("net.corda.finance.contracts", CashSchemaV1::class.packageName)) {
private val rpcUser = User("user1", "test", permissions = setOf(
Permissions.startFlow<CashIssueFlow>(),
Permissions.startFlow<CashPaymentFlow>(),
Permissions.invokeRpc("vaultQueryBy"),
Permissions.invokeRpc(CordaRPCOps::stateMachinesFeed),
Permissions.invokeRpc("vaultQueryByCriteria"))
)
@Test
fun `start suspend resume`() {
val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
val node = startedNode.internals
(startedNode.network as P2PMessagingClient).runningFuture.get()
for (i in 1..10) {
node.suspend()
node.resume()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(startedNode.network as P2PMessagingClient).runningFuture.get()
}
}
@Test
fun `start suspend resume issuing cash`() {
val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
val node = startedNode.internals
(startedNode.network as P2PMessagingClient).runningFuture.get()
for (i in 1..10) {
node.suspend()
node.resume()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(startedNode.network as P2PMessagingClient).runningFuture.get()
issueCash(node, startedNode.info.identityFromX500Name(ALICE_NAME))
val currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals((123 * i).DOLLARS, currentCashAmount)
}
}
@Test
fun `cash not issued when suspended`() {
val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
val node = startedNode.internals
(startedNode.network as P2PMessagingClient).runningFuture.get()
issueCash(node, startedNode.info.identityFromX500Name(ALICE_NAME))
var currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals(123.DOLLARS, currentCashAmount)
node.suspend()
Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
issueCash(node, startedNode.info.identityFromX500Name(ALICE_NAME))
}
node.resume()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(startedNode.network as P2PMessagingClient).runningFuture.get()
currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals(123.DOLLARS, currentCashAmount)
}
@Test
fun `initialise node without starting`() {
val node = initNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
// The node hasn't been started yet
Assertions.assertThatExceptionOfType(ActiveMQNotConnectedException::class.java).isThrownBy {
issueCash(node, node.generateAndSaveNodeInfo().identityFromX500Name(ALICE_NAME))
}
node.start()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(node.started!!.network as P2PMessagingClient).runningFuture.get()
issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME))
val currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals(123.DOLLARS, currentCashAmount)
}
@Test
fun `resume called on node not previously started`() {
val node = initNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
// will start the node
node.resume()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(node.started!!.network as P2PMessagingClient).runningFuture.get()
issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME))
val currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals(123.DOLLARS, currentCashAmount)
}
@Test
fun `resume called when node not suspended`() {
val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
val node = startedNode.internals
node.stop()
node.resume()
node.resume()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(node.started!!.network as P2PMessagingClient).runningFuture.get()
issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME))
val currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals(123.DOLLARS, currentCashAmount)
}
@Test
fun `resume called on started node`() {
val node = initNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
node.start()
node.resume()
thread(name = ALICE_NAME.organisation) {
node.run()
}
(node.started!!.network as P2PMessagingClient).runningFuture.get()
issueCash(node, node.started!!.info.identityFromX500Name(ALICE_NAME))
val currentCashAmount = getCashBalance(node)
println("Balance: $currentCashAmount")
assertEquals(123.DOLLARS, currentCashAmount)
}
@Test
fun `suspend called when node not started`() {
val startedNode = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
val node = startedNode.internals
node.stop()
node.suspend()
Assertions.assertThatExceptionOfType(ActiveMQNotConnectedException::class.java).isThrownBy {
issueCash(node, node.generateAndSaveNodeInfo().identityFromX500Name(ALICE_NAME))
}
node.suspend()
Assertions.assertThatExceptionOfType(ActiveMQNotConnectedException::class.java).isThrownBy {
issueCash(node, node.generateAndSaveNodeInfo().identityFromX500Name(ALICE_NAME))
}
}
private fun issueCash(node: Node, party: Party) {
val client = CordaRPCClient(node.configuration.rpcOptions.address!!)
val connection = client.start(rpcUser.username, rpcUser.password)
val proxy = connection.proxy
val flowHandle = proxy.startFlow(::CashIssueFlow, 123.DOLLARS, OpaqueBytes.of(0), party)
println("Started issuing cash, waiting on result")
flowHandle.returnValue.get()
val cashDollars = proxy.getCashBalance(USD)
println("Balance: $cashDollars")
connection.close()
}
private fun getCashBalance(node: Node): Amount<Currency> {
val client = CordaRPCClient(node.configuration.rpcOptions.address!!)
val connection = client.start(rpcUser.username, rpcUser.password)
val proxy = connection.proxy
val cashBalance = proxy.getCashBalance(USD)
connection.close()
return cashBalance
}
}

View File

@ -131,6 +131,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
protected val services: ServiceHubInternal get() = _services
private lateinit var _services: ServiceHubInternalImpl
protected lateinit var smm: StateMachineManager
protected lateinit var schedulerService: NodeSchedulerService
protected var myNotaryIdentity: PartyAndCertificate? = null
protected lateinit var checkpointStorage: CheckpointStorage
private lateinit var tokenizableServices: List<Any>
@ -225,10 +227,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
mutualExclusionConfiguration.updateInterval, mutualExclusionConfiguration.waitInterval).start()
}
val notaryService = makeNotaryService(nodeServices, database)
val smm = makeStateMachineManager(database)
smm = makeStateMachineManager(database)
val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader)
val flowStarter = FlowStarterImpl(smm, flowLogicRefFactory)
val schedulerService = NodeSchedulerService(
schedulerService = NodeSchedulerService(
platformClock,
database,
flowStarter,

View File

@ -2,6 +2,7 @@ package net.corda.node.internal
import com.codahale.metrics.JmxReporter
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.internal.div
@ -389,6 +390,24 @@ open class Node(configuration: NodeConfiguration,
return started
}
/**
* Resume services stopped after [suspend].
*/
fun resume() {
if (started == null) {
start()
} else if (suspended) {
bridgeControlListener?.start()
rpcMessagingClient?.resume(started!!.rpcOps, securityManager)
(network as P2PMessagingClient).start()
started!!.database.transaction {
smm.resume()
schedulerService.resume()
}
suspended = false
}
}
override fun getRxIoScheduler(): Scheduler = Schedulers.io()
private fun initialiseSerialization() {
@ -407,6 +426,7 @@ open class Node(configuration: NodeConfiguration,
private var rpcMessagingClient: RPCMessagingClient? = null
private var verifierMessagingClient: VerifierMessagingClient? = null
/** Starts a blocking event loop for message dispatch. */
fun run() {
rpcMessagingClient?.start2(rpcBroker!!.serverControl)
@ -436,4 +456,20 @@ open class Node(configuration: NodeConfiguration,
log.info("Shutdown complete")
}
private var suspended = false
/**
* Suspend the minimum number of services([schedulerService], [smm], [network], [rpcMessagingClient], and [bridgeControlListener]).
*/
fun suspend() {
if(started != null && !suspended) {
schedulerService.stop()
smm.stop(0)
(network as P2PMessagingClient).stop()
rpcMessagingClient?.stop()
bridgeControlListener?.stop()
suspended = true
}
}
}

View File

@ -13,6 +13,11 @@ interface RPCSecurityManager : AutoCloseable {
*/
val id: AuthServiceId
/**
* Resume
*/
fun resume()
/**
* Perform user authentication from principal and password. Return an [AuthorizingSubject] containing
* the permissions of the user identified by the given [principal] if authentication via password succeeds,

View File

@ -34,15 +34,20 @@ private typealias AuthServiceConfig = SecurityConfiguration.AuthService
* Default implementation of [RPCSecurityManager] adapting
* [org.apache.shiro.mgt.SecurityManager]
*/
class RPCSecurityManagerImpl(config: AuthServiceConfig) : RPCSecurityManager {
class RPCSecurityManagerImpl(private val config: AuthServiceConfig) : RPCSecurityManager {
override val id = config.id
private val manager: DefaultSecurityManager
private var manager: DefaultSecurityManager
init {
manager = buildImpl(config)
}
override fun resume() {
close()
manager = buildImpl(config)
}
@Throws(FailedLoginException::class)
override fun authenticate(principal: String, password: Password): AuthorizingSubject {
password.use {
@ -75,9 +80,12 @@ class RPCSecurityManagerImpl(config: AuthServiceConfig) : RPCSecurityManager {
/**
* Instantiate RPCSecurityManager initialised with users data from a list of [User]
*/
fun fromUserList(id: AuthServiceId, users: List<User>) =
RPCSecurityManagerImpl(
fun fromUserList(id: AuthServiceId, users: List<User>): RPCSecurityManagerImpl {
val rpcSecurityManagerImpl = RPCSecurityManagerImpl(
AuthServiceConfig.fromUsers(users).copy(id = id))
return rpcSecurityManagerImpl
}
// Build internal Shiro securityManager instance
private fun buildImpl(config: AuthServiceConfig): DefaultSecurityManager {

View File

@ -171,6 +171,29 @@ class NodeSchedulerService(private val clock: CordaClock,
}
}
/**
* Stop scheduler service.
*/
fun stop() {
mutex.locked {
schedulerTimerExecutor.shutdown()
scheduledStatesQueue.clear()
scheduledStates.clear()
}
}
/**
* Resume scheduler service after having called [stop].
*/
fun resume() {
mutex.locked {
schedulerTimerExecutor = Executors.newSingleThreadExecutor()
scheduledStates.putAll(createMap())
scheduledStatesQueue.addAll(scheduledStates.values)
rescheduleWakeUp()
}
}
override fun scheduleStateActivity(action: ScheduledStateRef) {
log.trace { "Schedule $action" }
val previousState = scheduledStates[action.ref]
@ -210,7 +233,7 @@ class NodeSchedulerService(private val clock: CordaClock,
}
}
private val schedulerTimerExecutor = Executors.newSingleThreadExecutor()
private var schedulerTimerExecutor = Executors.newSingleThreadExecutor()
/**
* This method first cancels the [java.util.concurrent.Future] for any pending action so that the
* [awaitWithDeadline] used below drops through without running the action. We then create a new

View File

@ -5,6 +5,7 @@ import com.codahale.metrics.MetricRegistry
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
@ -357,6 +358,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
private val shutdownLatch = CountDownLatch(1)
var runningFuture = openFuture<Unit>()
/**
* Starts the p2p event loop: this method only returns once [stop] has been called.
*/
@ -367,6 +370,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
check(started) { "start must be called first" }
check(!running) { "run can't be called twice" }
running = true
runningFuture.set(Unit)
// If it's null, it means we already called stop, so return immediately.
if (p2pConsumer == null) {
return
@ -479,6 +483,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
check(started)
val prevRunning = running
running = false
runningFuture = openFuture()
networkChangeSubscription?.unsubscribe()
require(p2pConsumer != null, {"stop can't be called twice"})
require(producer != null, {"stop can't be called twice"})

View File

@ -30,6 +30,11 @@ class RPCMessagingClient(
rpcServer!!.start(serverControl)
}
fun resume(rpcOps: RPCOps, securityManager: RPCSecurityManager) = synchronized(this) {
start(rpcOps, securityManager)
securityManager.resume()
}
fun stop() = synchronized(this) {
rpcServer?.close()
artemis.stop()

View File

@ -138,6 +138,19 @@ class MultiThreadedStateMachineManager(
lifeCycle.transition(State.UNSTARTED, State.STARTED)
}
override fun resume() {
lifeCycle.requireState(State.STOPPED)
fiberDeserializationChecker?.start(checkpointSerializationContext!!)
val fibers = restoreFlowsFromCheckpoints()
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
(fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable)
}
serviceHub.networkMapCache.nodeReady.then {
resumeRestoredFlows(fibers)
}
lifeCycle.transition(State.STOPPED, State.STARTED)
}
override fun <A : FlowLogic<*>> findStateMachines(flowClass: Class<A>): List<Pair<A, CordaFuture<*>>> {
return concurrentBox.content.flows.values.mapNotNull {
flowClass.castIfPossible(it.fiber.logic)?.let { it to it.stateMachine.resultFuture }

View File

@ -134,6 +134,20 @@ class SingleThreadedStateMachineManager(
}
}
override fun resume() {
fiberDeserializationChecker?.start(checkpointSerializationContext!!)
val fibers = restoreFlowsFromCheckpoints()
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
(fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable)
}
serviceHub.networkMapCache.nodeReady.then {
resumeRestoredFlows(fibers)
}
mutex.locked {
stopping = false
}
}
override fun <A : FlowLogic<*>> findStateMachines(flowClass: Class<A>): List<Pair<A, CordaFuture<*>>> {
return mutex.locked {
flows.values.mapNotNull {

View File

@ -38,6 +38,11 @@ interface StateMachineManager {
*/
fun stop(allowedUnsuspendedFiberCount: Int)
/**
* Resume state machine manager after having called [stop].
*/
fun resume()
/**
* Starts a new flow.
*

View File

@ -89,6 +89,7 @@ class FiberDeserializationChecker {
fun stop(): Boolean {
jobQueue.add(Job.Finish)
checkerThread?.join()
checkerThread = null
return foundUnrestorableFibers
}
}

View File

@ -46,7 +46,7 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
val tempFolder = TemporaryFolder()
private lateinit var defaultNetworkParameters: NetworkParametersCopier
private val nodes = mutableListOf<StartedNode<Node>>()
private val startedNodes = mutableListOf<StartedNode<Node>>()
private val nodeInfos = mutableListOf<NodeInfo>()
init {
@ -64,17 +64,17 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
*/
@After
fun stopAllNodes() {
val shutdownExecutor = Executors.newScheduledThreadPool(nodes.size)
val shutdownExecutor = Executors.newScheduledThreadPool(startedNodes.size)
try {
nodes.map { shutdownExecutor.fork(it::dispose) }.transpose().getOrThrow()
startedNodes.map { shutdownExecutor.fork(it::dispose) }.transpose().getOrThrow()
// Wait until ports are released
val portNotBoundChecks = nodes.flatMap {
val portNotBoundChecks = startedNodes.flatMap {
listOf(
it.internals.configuration.p2pAddress.let { addressMustNotBeBoundFuture(shutdownExecutor, it) },
it.internals.configuration.rpcOptions.address?.let { addressMustNotBeBoundFuture(shutdownExecutor, it) }
)
}.filterNotNull()
nodes.clear()
startedNodes.clear()
portNotBoundChecks.transpose().getOrThrow()
} finally {
shutdownExecutor.shutdown()
@ -82,10 +82,10 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
}
@JvmOverloads
fun startNode(legalName: CordaX500Name,
platformVersion: Int = 1,
rpcUsers: List<User> = emptyList(),
configOverrides: Map<String, Any> = emptyMap()): StartedNode<Node> {
fun initNode(legalName: CordaX500Name,
platformVersion: Int = 1,
rpcUsers: List<User> = emptyList(),
configOverrides: Map<String, Any> = emptyMap()): Node {
val baseDirectory = baseDirectory(legalName).createDirectories()
val localPort = getFreeLocalPorts("localhost", 3)
val p2pAddress = configOverrides["p2pAddress"] ?: localPort[0].toString()
@ -109,14 +109,24 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
}
}
defaultNetworkParameters.install(baseDirectory)
val node = InProcessNode(parsedConfig, MOCK_VERSION_INFO.copy(platformVersion = platformVersion), cordappPackages).start()
nodes += node
return InProcessNode(parsedConfig, MOCK_VERSION_INFO.copy(platformVersion = platformVersion), cordappPackages)
}
@JvmOverloads
fun startNode(legalName: CordaX500Name,
platformVersion: Int = 1,
rpcUsers: List<User> = emptyList(),
configOverrides: Map<String, Any> = emptyMap()): StartedNode<Node> {
val node = initNode(legalName,platformVersion, rpcUsers,configOverrides)
val startedNode = node.start()
startedNodes += startedNode
ensureAllNetworkMapCachesHaveAllNodeInfos()
thread(name = legalName.organisation) {
node.internals.run()
node.run()
}
return node
return startedNode
}
protected fun baseDirectory(legalName: CordaX500Name): Path {
@ -124,7 +134,7 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
}
private fun ensureAllNetworkMapCachesHaveAllNodeInfos() {
val runningNodes = nodes.filter { it.internals.started != null }
val runningNodes = startedNodes.filter { it.internals.started != null }
val runningNodesInfo = runningNodes.map { it.info }
for (node in runningNodes)
for (nodeInfo in runningNodesInfo) {