CORDA-1099: Orchestrated clean shutdown from Shell (#2831)

This commit is contained in:
Michele Sollecito
2018-03-19 14:20:10 +00:00
committed by GitHub
parent c964e50696
commit 7a077e76f0
35 changed files with 680 additions and 304 deletions

View File

@ -1,6 +1,7 @@
package net.corda.node.modes.draining
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.internal.drainAndShutdown
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.map
@ -13,11 +14,13 @@ import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.PortAllocation
import net.corda.testing.driver.driver
import net.corda.testing.internal.chooseIdentity
import net.corda.testing.node.User
import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
@ -49,6 +52,7 @@ class P2PFlowsDrainingModeTest {
fun `flows draining mode suspends consumption of initial session messages`() {
driver(DriverParameters(isDebug = true, startNodesInProcess = false, portAllocation = portAllocation)) {
val initiatedNode = startNode().getOrThrow()
val initiating = startNode(rpcUsers = users).getOrThrow().rpc
val counterParty = initiatedNode.nodeInfo.singleIdentity()
@ -76,27 +80,54 @@ class P2PFlowsDrainingModeTest {
}
}
@StartableByRPC
@InitiatingFlow
class InitiateSessionFlow(private val counterParty: Party) : FlowLogic<String>() {
@Test
fun `clean shutdown by draining`() {
@Suspendable
override fun call(): String {
driver(DriverParameters(isDebug = true, startNodesInProcess = true, portAllocation = portAllocation)) {
val session = initiateFlow(counterParty)
session.send("Hi there")
return session.receive<String>().unwrap { it }
val nodeA = startNode(rpcUsers = users).getOrThrow()
val nodeB = startNode(rpcUsers = users).getOrThrow()
var successful = false
val latch = CountDownLatch(1)
nodeB.rpc.setFlowsDrainingModeEnabled(true)
IntRange(1, 10).forEach { nodeA.rpc.startFlow(::InitiateSessionFlow, nodeB.nodeInfo.chooseIdentity()) }
nodeA.rpc.drainAndShutdown()
.doOnError { error ->
error.printStackTrace()
successful = false
}
.doOnCompleted { successful = true }
.doAfterTerminate { latch.countDown() }
.subscribe()
nodeB.rpc.setFlowsDrainingModeEnabled(false)
latch.await()
assertThat(successful).isTrue()
}
}
}
@InitiatedBy(InitiateSessionFlow::class)
class InitiatedFlow(private val initiatingSession: FlowSession) : FlowLogic<Unit>() {
@StartableByRPC
@InitiatingFlow
class InitiateSessionFlow(private val counterParty: Party) : FlowLogic<String>() {
@Suspendable
override fun call() {
@Suspendable
override fun call(): String {
val message = initiatingSession.receive<String>().unwrap { it }
initiatingSession.send("$message answer")
}
val session = initiateFlow(counterParty)
session.send("Hi there")
return session.receive<String>().unwrap { it }
}
}
@InitiatedBy(InitiateSessionFlow::class)
class InitiatedFlow(private val initiatingSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val message = initiatingSession.receive<String>().unwrap { it }
initiatingSession.send("$message answer")
}
}

View File

@ -86,6 +86,7 @@ import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit.SECONDS
import kotlin.collections.set
import kotlin.reflect.KClass
@ -145,6 +146,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
protected lateinit var networkMapUpdater: NetworkMapUpdater
lateinit var securityManager: RPCSecurityManager
private val shutdownExecutor = Executors.newSingleThreadExecutor()
/** Completes once the node has successfully registered with the network map service
* or has loaded network map data from local database */
val nodeReadyFuture: CordaFuture<Unit> get() = _nodeReadyFuture
@ -159,7 +162,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
/** The implementation of the [CordaRPCOps] interface used by this node. */
open fun makeRPCOps(flowStarter: FlowStarter, database: CordaPersistence, smm: StateMachineManager): CordaRPCOps {
return SecureCordaRPCOps(services, smm, database, flowStarter)
return SecureCordaRPCOps(services, smm, database, flowStarter, { shutdownExecutor.submit { stop() } })
}
private fun initCertificate() {
@ -712,6 +716,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
toRun()
}
runOnStop.clear()
shutdownExecutor.shutdown()
_started = null
}

View File

@ -45,7 +45,8 @@ internal class CordaRPCOpsImpl(
private val services: ServiceHubInternal,
private val smm: StateMachineManager,
private val database: CordaPersistence,
private val flowStarter: FlowStarter
private val flowStarter: FlowStarter,
private val shutdownNode: () -> Unit
) : CordaRPCOps {
override fun networkMapSnapshot(): List<NodeInfo> {
val (snapshot, updates) = networkMapFeed()
@ -298,6 +299,10 @@ internal class CordaRPCOpsImpl(
return services.nodeProperties.flowsDrainingMode.isEnabled()
}
override fun shutdown() {
shutdownNode.invoke()
}
private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo {
return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context)
}

View File

@ -167,6 +167,8 @@ class RpcAuthorisationProxy(private val implementation: CordaRPCOps, private val
override fun isFlowsDrainingModeEnabled(): Boolean = guard("isFlowsDrainingModeEnabled", implementation::isFlowsDrainingModeEnabled)
override fun shutdown() = guard("shutdown", implementation::shutdown)
// TODO change to KFunction reference after Kotlin fixes https://youtrack.jetbrains.com/issue/KT-12140
private inline fun <RESULT> guard(methodName: String, action: () -> RESULT) = guard(methodName, emptyList(), action)

View File

@ -14,7 +14,8 @@ class SecureCordaRPCOps(services: ServiceHubInternal,
smm: StateMachineManager,
database: CordaPersistence,
flowStarter: FlowStarter,
val unsafe: CordaRPCOps = CordaRPCOpsImpl(services, smm, database, flowStarter)) : CordaRPCOps by RpcAuthorisationProxy(unsafe, ::rpcContext) {
shutdownNode: () -> Unit,
val unsafe: CordaRPCOps = CordaRPCOpsImpl(services, smm, database, flowStarter, shutdownNode)) : CordaRPCOps by RpcAuthorisationProxy(unsafe, ::rpcContext) {
/**
* Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed

View File

@ -82,7 +82,7 @@ class CordaRPCOpsImplTest {
fun setup() {
mockNet = InternalMockNetwork(cordappPackages = listOf("net.corda.finance.contracts.asset"))
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
rpc = SecureCordaRPCOps(aliceNode.services, aliceNode.smm, aliceNode.database, aliceNode.services)
rpc = SecureCordaRPCOps(aliceNode.services, aliceNode.smm, aliceNode.database, aliceNode.services, { })
CURRENT_RPC_CONTEXT.set(RpcAuthContext(InvocationContext.rpc(testActor()), buildSubject("TEST_USER", emptySet())))
mockNet.runNetwork()