diff --git a/core/src/main/kotlin/net/corda/core/internal/LifeCycle.kt b/core/src/main/kotlin/net/corda/core/internal/LifeCycle.kt index 4e71271875..e7e08908f7 100644 --- a/core/src/main/kotlin/net/corda/core/internal/LifeCycle.kt +++ b/core/src/main/kotlin/net/corda/core/internal/LifeCycle.kt @@ -39,6 +39,17 @@ class LifeCycle>(initial: S) { } fun requireState(requiredState: S) = requireState(requiredState) {} + fun requireState( + requiredState: S, + throwable: Throwable, + block: () -> A + ): A { + return lock.readLock().withLock { + if (requiredState != state) { throw throwable } + block() + } + } + /** Assert something about the current state atomically. */ fun requireState( errorMessage: (S) -> String, diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index b686c2fcaa..f883606ee8 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -50,6 +50,7 @@ import net.corda.core.node.services.vault.Sort import net.corda.core.serialization.serialize import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.getOrThrow +import net.corda.node.internal.exceptions.StateMachineStoppedException import net.corda.node.services.api.FlowStarter import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.messaging.context @@ -185,7 +186,11 @@ internal class CordaRPCOpsImpl( if (isFlowsDrainingModeEnabled()) { throw RejectedCommandException("Node is draining before shutdown. Cannot start new flows through RPC.") } - return flowStarter.invokeFlowAsync(logicType, context(), *args).getOrThrow() + try { + return flowStarter.invokeFlowAsync(logicType, context(), *args).getOrThrow() + } catch (e: StateMachineStoppedException) { + throw RejectedCommandException("Node is shutting down. Cannot start new flows through RPC.") + } } override fun attachmentExists(id: SecureHash): Boolean { diff --git a/node/src/main/kotlin/net/corda/node/internal/exceptions/StateMachineStoppedException.kt b/node/src/main/kotlin/net/corda/node/internal/exceptions/StateMachineStoppedException.kt new file mode 100644 index 0000000000..88a5bcb31a --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/exceptions/StateMachineStoppedException.kt @@ -0,0 +1,7 @@ +package net.corda.node.internal.exceptions + +import net.corda.core.CordaRuntimeException + +class StateMachineStoppedException(message: String, cause: Throwable?) : CordaRuntimeException(message, cause) { + constructor(msg: String) : this(msg, null) +} diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt index eaeee67788..8aa56cf475 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt @@ -34,6 +34,7 @@ import net.corda.core.utilities.Try import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug import net.corda.node.internal.InitiatedFlowFactory +import net.corda.node.internal.exceptions.StateMachineStoppedException import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.config.shouldCheckCheckpoints @@ -196,7 +197,7 @@ class MultiThreadedStateMachineManager( ourIdentity: Party?, deduplicationHandler: DeduplicationHandler? ): CordaFuture> { - return lifeCycle.requireState(State.STARTED) { + return lifeCycle.requireState(State.STARTED, StateMachineStoppedException("Flow cannot be started. State machine is stopped.")) { startFlowInternal( invocationContext = context, flowLogic = flowLogic,