mirror of
synced 2025-03-25 13:27:58 +00:00
[CORDA-1941]: Server-side draining node shutdown. (#3909)
This commit is contained in:
@ -3,9 +3,13 @@ package net.corda.client.rpc
import net.corda.core.CordaRuntimeException
* Thrown to indicate a fatal error in the RPC system itself, as opposed to an error generated by the invoked
* method.
* Thrown to indicate a fatal error in the RPC system itself, as opposed to an error generated by the invoked method.
open class RPCException(message: String?, cause: Throwable?) : CordaRuntimeException(message, cause) {
constructor(msg: String) : this(msg, null)
* Signals that the underlying [RPCConnection] dropped.
open class ConnectionFailureException(cause: Throwable? = null) : RPCException("Connection failure detected.", cause)
@ -2,11 +2,8 @@ package net.corda.client.rpc.internal
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.pendingFlowsCount
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.messaging.ClientRpcSslOptions
import rx.Observable
/** Utility which exposes the internal Corda RPC constructor to other internal Corda components */
fun createCordaRPCClientWithSslAndClassLoader(
@ -14,14 +11,4 @@ fun createCordaRPCClientWithSslAndClassLoader(
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null
) = CordaRPCClient.createWithSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader)
fun CordaRPCOps.drainAndShutdown(): Observable<Unit> {
return pendingFlowsCount().updates
.doOnError { error ->
throw error
.doOnCompleted { shutdown() }.map { }
) = CordaRPCClient.createWithSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader)
@ -7,6 +7,7 @@ import com.github.benmanes.caffeine.cache.RemovalCause
import com.github.benmanes.caffeine.cache.RemovalListener
import com.google.common.util.concurrent.SettableFuture
import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.client.rpc.ConnectionFailureException
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.RPCException
import net.corda.client.rpc.RPCSinceVersion
@ -552,7 +553,7 @@ class RPCClientProxyHandler(
m.keys.forEach { k ->
observationExecutorPool.run(k) {
try {
m[k]?.onError(RPCException("Connection failure detected."))
} catch (th: Throwable) {
log.error("Unexpected exception when RPC connection failure handling", th)
@ -561,7 +562,7 @@ class RPCClientProxyHandler(
rpcReplyMap.forEach { _, replyFuture ->
replyFuture.setException(RPCException("Connection failure detected."))
@ -22,7 +22,6 @@ import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.Try
import rx.Observable
import rx.subjects.PublishSubject
import java.io.IOException
import java.io.InputStream
import java.security.PublicKey
@ -405,38 +404,20 @@ interface CordaRPCOps : RPCOps {
* This does not wait for flows to be completed.
fun shutdown()
* Returns a [DataFeed] that keeps track on the count of pending flows.
fun CordaRPCOps.pendingFlowsCount(): DataFeed<Int, Pair<Int, Int>> {
* Shuts the node down. Returns immediately.
* @param drainPendingFlows whether the node will wait for pending flows to be completed before exiting. While draining, new flows from RPC will be rejected.
fun terminate(drainPendingFlows: Boolean = false)
val stateMachineState = stateMachinesFeed()
var pendingFlowsCount = stateMachineState.snapshot.size
var completedFlowsCount = 0
val updates = PublishSubject.create<Pair<Int, Int>>()
.doOnNext { update ->
when (update) {
is StateMachineUpdate.Added -> {
updates.onNext(completedFlowsCount to pendingFlowsCount)
is StateMachineUpdate.Removed -> {
updates.onNext(completedFlowsCount to pendingFlowsCount)
if (completedFlowsCount == pendingFlowsCount) {
if (pendingFlowsCount == 0) {
return DataFeed(pendingFlowsCount, updates)
* Returns whether the node is waiting for pending flows to complete before shutting down.
* Disabling draining mode cancels this state.
* @return whether the node will shutdown when the pending flows count reaches zero.
fun isWaitingForShutdown(): Boolean
inline fun <reified T : ContractState> CordaRPCOps.vaultQueryBy(criteria: QueryCriteria = QueryCriteria.VaultQueryCriteria(),
@ -0,0 +1,52 @@
package net.corda.nodeapi.internal
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.StateMachineUpdate
import rx.Observable
import rx.schedulers.Schedulers
import rx.subjects.PublishSubject
import java.util.concurrent.TimeUnit
* Returns a [DataFeed] of the number of pending flows. The [Observable] for the updates will complete the moment all pending flows will have terminated.
fun CordaRPCOps.pendingFlowsCount(): DataFeed<Int, Pair<Int, Int>> {
val updates = PublishSubject.create<Pair<Int, Int>>()
val initialPendingFlowsCount = stateMachinesFeed().let {
var completedFlowsCount = 0
var pendingFlowsCount = it.snapshot.size
it.updates.observeOn(Schedulers.io()).subscribe({ update ->
when (update) {
is StateMachineUpdate.Added -> {
updates.onNext(completedFlowsCount to pendingFlowsCount)
is StateMachineUpdate.Removed -> {
updates.onNext(completedFlowsCount to pendingFlowsCount)
if (completedFlowsCount == pendingFlowsCount) {
}, updates::onError)
if (pendingFlowsCount == 0) {
return DataFeed(initialPendingFlowsCount, updates)
* Returns an [Observable] that will complete when the node will have cancelled the draining shutdown hook.
* @param interval the value of the polling interval, default is 5.
* @param unit the time unit of the polling interval, default is [TimeUnit.SECONDS].
fun CordaRPCOps.hasCancelledDrainingShutdown(interval: Long = 5, unit: TimeUnit = TimeUnit.SECONDS): Observable<Unit> {
return Observable.interval(interval, unit).map { isWaitingForShutdown() }.takeFirst { waiting -> waiting == false }.map { Unit }
@ -1,15 +1,20 @@
package net.corda.node.modes.draining
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.internal.drainAndShutdown
import net.corda.core.flows.*
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.map
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.services.Permissions
import net.corda.nodeapi.internal.hasCancelledDrainingShutdown
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
@ -18,10 +23,12 @@ import net.corda.testing.driver.PortAllocation
import net.corda.testing.driver.driver
import net.corda.testing.internal.chooseIdentity
import net.corda.testing.node.User
import net.corda.testing.node.internal.waitForShutdown
import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Test
import rx.Observable
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
@ -80,24 +87,74 @@ class P2PFlowsDrainingModeTest {
fun `clean shutdown by draining`() {
driver(DriverParameters(startNodesInProcess = true, portAllocation = portAllocation, notarySpecs = emptyList())) {
fun `terminate node waiting for pending flows`() {
driver(DriverParameters(portAllocation = portAllocation, notarySpecs = emptyList())) {
val nodeA = startNode(providedName = ALICE_NAME, rpcUsers = users).getOrThrow()
val nodeB = startNode(providedName = BOB_NAME, rpcUsers = users).getOrThrow()
var successful = false
val latch = CountDownLatch(1)
IntRange(1, 10).forEach { nodeA.rpc.startFlow(::InitiateSessionFlow, nodeB.nodeInfo.chooseIdentity()) }
.doOnError { error ->
successful = false
.doOnCompleted { successful = true }
.doAfterTerminate { latch.countDown() }
nodeA.waitForShutdown().doOnError(Throwable::printStackTrace).doOnError { successful = false }.doOnCompleted { successful = true }.doAfterTerminate(latch::countDown).subscribe()
fun `terminate resets persistent draining mode property when waiting for pending flows`() {
driver(DriverParameters(portAllocation = portAllocation, notarySpecs = emptyList())) {
val nodeA = startNode(providedName = ALICE_NAME, rpcUsers = users).getOrThrow()
var successful = false
val latch = CountDownLatch(1)
// This would not be needed, as `terminate(true)` sets draining mode anyway, but it's here to ensure that it removes the persistent value anyway.
nodeA.rpc.waitForShutdown().doOnError(Throwable::printStackTrace).doOnError { successful = false }.doOnCompleted(nodeA::stop).doOnCompleted {
val nodeARestarted = startNode(providedName = ALICE_NAME, rpcUsers = users).getOrThrow()
successful = !nodeARestarted.rpc.isFlowsDrainingModeEnabled()
fun `disabling draining mode cancels draining shutdown`() {
driver(DriverParameters(portAllocation = portAllocation, notarySpecs = emptyList())) {
val nodeA = startNode(providedName = ALICE_NAME, rpcUsers = users).getOrThrow()
val nodeB = startNode(providedName = BOB_NAME, rpcUsers = users).getOrThrow()
var successful = false
val latch = CountDownLatch(1)
IntRange(1, 10).forEach { nodeA.rpc.startFlow(::InitiateSessionFlow, nodeB.nodeInfo.chooseIdentity()) }
nodeA.waitForShutdown().doOnError(Throwable::printStackTrace).doAfterTerminate { successful = false }.doAfterTerminate(latch::countDown).subscribe()
nodeA.rpc.hasCancelledDrainingShutdown().doOnError(Throwable::printStackTrace).doOnError { successful = false }.doOnCompleted { successful = true }.doAfterTerminate(latch::countDown).subscribe()
@ -238,7 +238,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
/** The implementation of the [CordaRPCOps] interface used by this node. */
open fun makeRPCOps(): CordaRPCOps {
val ops: CordaRPCOps = CordaRPCOpsImpl(services, smm, flowStarter) { shutdownExecutor.submit { stop() } }
val ops: CordaRPCOps = CordaRPCOpsImpl(services, smm, flowStarter) { shutdownExecutor.submit { stop() } }.also { it.closeOnStop() }
val proxies = mutableListOf<(CordaRPCOps) -> CordaRPCOps>()
// Mind that order is relevant here.
proxies += ::AuthenticatedRpcOpsProxy
@ -28,17 +28,21 @@ import net.corda.core.node.services.vault.*
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.context
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.nodeapi.exceptions.NonRpcFlowException
import net.corda.nodeapi.exceptions.RejectedCommandException
import net.corda.nodeapi.internal.pendingFlowsCount
import rx.Observable
import rx.Subscription
import java.io.InputStream
import java.net.ConnectException
import java.security.PublicKey
import java.time.Instant
import java.util.concurrent.atomic.AtomicReference
* Server side implementations of RPCs available to MQ based client tools. Execution takes place on the server
@ -49,7 +53,24 @@ internal class CordaRPCOpsImpl(
private val smm: StateMachineManager,
private val flowStarter: FlowStarter,
private val shutdownNode: () -> Unit
) : CordaRPCOps {
) : CordaRPCOps, AutoCloseable {
private companion object {
private val logger = loggerFor<CordaRPCOpsImpl>()
private val drainingShutdownHook = AtomicReference<Subscription?>()
init {
services.nodeProperties.flowsDrainingMode.values.filter { it.isDisabled() }.subscribe({
}, {
// Nothing to do in case of errors here.
private fun Pair<Boolean, Boolean>.isDisabled(): Boolean = first && !second
* Returns the RPC protocol version, which is the same the node's platform Version. Exists since version 1 so guaranteed
* to be present.
@ -222,7 +243,7 @@ internal class CordaRPCOpsImpl(
return services.networkMapCache.getNodeByLegalIdentity(party)
override fun registeredFlows(): List<String> = services.rpcFlows.map { it.name }.sorted()
override fun registeredFlows(): List<String> = services.rpcFlows.asSequence().map(Class<*>::getName).sorted().toList()
override fun clearNetworkMapCache() {
@ -271,18 +292,46 @@ internal class CordaRPCOpsImpl(
return vaultTrackBy(criteria, PageSpecification(), sorting, contractStateType)
override fun setFlowsDrainingModeEnabled(enabled: Boolean) {
override fun setFlowsDrainingModeEnabled(enabled: Boolean) = setPersistentDrainingModeProperty(enabled, propagateChange = true)
override fun isFlowsDrainingModeEnabled() = services.nodeProperties.flowsDrainingMode.isEnabled()
override fun shutdown() = terminate(false)
override fun terminate(drainPendingFlows: Boolean) {
if (drainPendingFlows) {
logger.info("Waiting for pending flows to complete before shutting down.")
drainingShutdownHook.set(pendingFlowsCount().updates.doOnNext {(completed, total) ->
logger.info("Pending flows progress before shutdown: $completed / $total.")
}.doOnCompleted { setPersistentDrainingModeProperty(false, false) }.doOnCompleted(::cancelDrainingShutdownHook).doOnCompleted { logger.info("No more pending flows to drain. Shutting down.") }.doOnCompleted(shutdownNode::invoke).subscribe({
// Nothing to do on each update here, only completion matters.
}, { error ->
logger.error("Error while waiting for pending flows to drain in preparation for shutdown. Cause was: ${error.message}", error)
} else {
override fun isFlowsDrainingModeEnabled(): Boolean {
return services.nodeProperties.flowsDrainingMode.isEnabled()
override fun isWaitingForShutdown() = drainingShutdownHook.get() != null
override fun close() {
override fun shutdown() {
private fun cancelDrainingShutdownHook() {
drainingShutdownHook.getAndSet(null)?.let {
logger.info("Cancelled draining shutdown hook.")
private fun setPersistentDrainingModeProperty(enabled: Boolean, propagateChange: Boolean) = services.nodeProperties.flowsDrainingMode.setEnabled(enabled, propagateChange)
private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo {
return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context)
@ -8,7 +8,7 @@ interface NodePropertiesStore {
interface FlowsDrainingModeOperations {
fun setEnabled(enabled: Boolean)
fun setEnabled(enabled: Boolean, propagateChange: Boolean = true)
fun isEnabled(): Boolean
@ -57,12 +57,13 @@ class FlowsDrainingModeOperationsImpl(readPhysicalNodeId: () -> String, private
override val values = PublishSubject.create<Pair<Boolean, Boolean>>()!!
override fun setEnabled(enabled: Boolean) {
var oldValue: Boolean? = null
persistence.transaction {
oldValue = map.put(nodeSpecificFlowsExecutionModeKey, enabled.toString())?.toBoolean() ?: false
override fun setEnabled(enabled: Boolean, propagateChange: Boolean) {
val oldValue = persistence.transaction {
map.put(nodeSpecificFlowsExecutionModeKey, enabled.toString())?.toBoolean() ?: false
if (propagateChange) {
values.onNext(oldValue to enabled)
values.onNext(oldValue!! to enabled)
override fun isEnabled(): Boolean {
@ -54,8 +54,10 @@ import net.corda.testing.node.User
import net.corda.testing.node.internal.DriverDSLImpl.Companion.cordappsInCurrentAndAdditionalPackages
import okhttp3.OkHttpClient
import okhttp3.Request
import rx.Observable
import rx.Subscription
import rx.schedulers.Schedulers
import rx.subjects.AsyncSubject
import java.lang.management.ManagementFactory
import java.net.ConnectException
import java.net.URL
@ -1,5 +1,6 @@
package net.corda.testing.node.internal
import net.corda.client.rpc.ConnectionFailureException
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture
@ -8,17 +9,21 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.times
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.millis
import net.corda.core.utilities.seconds
import net.corda.node.services.api.StartedNodeServices
import net.corda.node.services.messaging.Message
import net.corda.testing.driver.NodeHandle
import net.corda.testing.internal.chooseIdentity
import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.testing.node.User
import net.corda.testing.node.testContext
import org.slf4j.LoggerFactory
import rx.Observable
import rx.subjects.AsyncSubject
import java.net.Socket
import java.net.SocketException
import java.time.Duration
@ -108,4 +113,22 @@ fun StartedNodeServices.newContext(): InvocationContext = testContext(myInfo.cho
fun InMemoryMessagingNetwork.MessageTransfer.getMessage(): Message = message
fun CordaRPCClient.start(user: User) = start(user.username, user.password)
fun CordaRPCClient.start(user: User) = start(user.username, user.password)
fun NodeHandle.waitForShutdown(): Observable<Unit> {
return rpc.waitForShutdown().doAfterTerminate(::stop)
fun CordaRPCOps.waitForShutdown(): Observable<Unit> {
val completable = AsyncSubject.create<Unit>()
stateMachinesFeed().updates.subscribe({ _ -> }, { error ->
if (error is ConnectionFailureException) {
} else {
return completable
@ -44,7 +44,7 @@ public class RunShellCommand extends InteractiveShellCommand {
emitHelp(context, parser);
return null;
return InteractiveShell.runRPCFromString(command, out, context, ops(), objectMapper(), isSsh());
return InteractiveShell.runRPCFromString(command, out, context, ops(), objectMapper());
private void emitHelp(InvocationContext<Map> context, StringToMethodCallParser<CordaRPCOps> parser) {
@ -18,6 +18,7 @@ import net.corda.core.internal.*
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.*
import net.corda.nodeapi.internal.pendingFlowsCount
import net.corda.tools.shell.utlities.ANSIProgressRenderer
import net.corda.tools.shell.utlities.StdoutANSIProgressRenderer
import org.crsh.command.InvocationContext
@ -408,7 +409,7 @@ object InteractiveShell {
fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>, cordaRPCOps: CordaRPCOps, om: ObjectMapper, isSsh: Boolean = false): Any? {
fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>, cordaRPCOps: CordaRPCOps, om: ObjectMapper): Any? {
val cmd = input.joinToString(" ").trim { it <= ' ' }
if (cmd.startsWith("startflow", ignoreCase = true)) {
// The flow command provides better support and startFlow requires special handling anyway due to
@ -417,7 +418,7 @@ object InteractiveShell {
out.println("Please use the 'flow' command to interact with flows rather than the 'run' command.", Color.yellow)
return null
} else if (cmd.substringAfter(" ").trim().equals("gracefulShutdown", ignoreCase = true)) {
return InteractiveShell.gracefulShutdown(out, cordaRPCOps, isSsh)
return InteractiveShell.gracefulShutdown(out, cordaRPCOps)
var result: Any? = null
@ -456,9 +457,8 @@ object InteractiveShell {
return result
fun gracefulShutdown(userSessionOut: RenderPrintWriter, cordaRPCOps: CordaRPCOps, isSsh: Boolean = false) {
fun gracefulShutdown(userSessionOut: RenderPrintWriter, cordaRPCOps: CordaRPCOps) {
fun display(statements: RenderPrintWriter.() -> Unit) {
@ -467,40 +467,48 @@ object InteractiveShell {
var isShuttingDown = false
try {
display { println("Orchestrating a clean shutdown, press CTRL+C to cancel...") }
isShuttingDown = true
display {
println("Orchestrating a clean shutdown...")
println("...enabling draining mode")
display {
println("...waiting for in-flight flows to be completed")
.doOnError { error ->
throw error
.doOnNext { (first, second) ->
display {
println("...remaining: ${first}/${second}")
val latch = CountDownLatch(1)
cordaRPCOps.pendingFlowsCount().updates.doOnError { error ->
throw error
// For each update.
{ (first, second) -> display { println("...remaining: $first / $second") } },
// On error.
{ error ->
if (!isShuttingDown) {
display { println("RPC failed: ${error.rootCause}", Color.red) }
.doOnCompleted {
if (isSsh) {
// print in the original Shell process
System.out.println("Shutting down the node via remote SSH session (it may take a while)")
display {
println("Shutting down the node (it may take a while)")
isShuttingDown = true
// When completed.
display {
println("...done, quitting standalone shell now.")
// This will only show up in the standalone Shell, because the embedded one is killed as part of a node's shutdown.
display { println("...done, quitting the shell now.") }
while (!Thread.currentThread().isInterrupted) {
try {
} catch (e: InterruptedException) {
try {
display { println("...cancelled clean shutdown.") }
} finally {
} catch (e: StringToMethodCallParser.UnparseableCallException) {
display {
println(e.message, Color.red)
@ -508,9 +516,7 @@ object InteractiveShell {
} catch (e: Exception) {
if (!isShuttingDown) {
display {
println("RPC failed: ${e.rootCause}", Color.red)
display { println("RPC failed: ${e.rootCause}", Color.red) }
} finally {
InputStreamSerializer.invokeContext = null
Reference in New Issue
Block a user