mirror of
synced 2025-02-20 17:33:15 +00:00
ENT-6376 Don't hospitalize session end in ReceiveFinalityFlow
Do not keep a flow in for observation if it receives an unexpected session end message while in `ReceiveFinalityFlow` and `ReceiveTransactionFlow` (due to being called by the former). This is done by checking the message of the `UnexpectedFlowEndException` that is thrown when a session end message instead of a data message and if the stacktrace has `ReceiveTransactionFlow` at the top, after removing statemachine stack frames. Checking the stacktrace for `ReceiveTransactionFlow` is important because the unexpected session end session message is only ok if a transaction has not already been received. For example, if `ResolveTransactionsFlow` is in the stack, then this indicates failure when receiving transaction dependencies on a transaction that should be recorded. Also added a test that highlights that the `UnexpectedFlowEndException` caused by the session end message can be caught, therefore users can determine their own behaviour if desired.
This commit is contained in:
@ -4,6 +4,8 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.contracts.PartyAndReference
import net.corda.core.contracts.StateAndRef
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.CollectSignaturesFlow
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
@ -12,20 +14,29 @@ import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.NotaryException
import net.corda.core.flows.NotaryFlow
import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.SignTransactionFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.startFlow
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.core.utilities.unwrap
import net.corda.node.services.Permissions
import net.corda.testing.contracts.DummyContract
import net.corda.testing.contracts.DummyContract.SingleOwnerState
import net.corda.testing.contracts.DummyState
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.User
@ -33,6 +44,7 @@ import net.corda.testing.node.internal.enclosedCordapp
import net.corda.testing.node.internal.findCordapp
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Before
import org.junit.Test
import java.sql.SQLException
import java.util.*
@ -47,6 +59,12 @@ class FlowHospitalTest {
private val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
fun before() {
SpendStateAndCatchDoubleSpendResponderFlow.exceptionSeenInUserFlow = false
CreateTransactionButDontFinalizeResponderFlow.exceptionSeenInUserFlow = false
@Test(timeout = 300_000)
fun `when double spend occurs, the flow is successfully deleted on the counterparty`() {
driver(DriverParameters(cordappsForAllNodes = listOf(enclosedCordapp(), findCordapp("net.corda.testing.contracts")))) {
@ -172,7 +190,7 @@ class FlowHospitalTest {
@Test(timeout = 300_000)
fun `HospitalizeFlowException cloaking an important exception thrown`() {
var dischargedCounter = 0
var observationCounter: Int = 0
var observationCounter = 0
StaffedFlowHospital.onFlowDischarged.add { _, _ ->
@ -197,6 +215,84 @@ class FlowHospitalTest {
@Test(timeout = 300_000)
fun `catching a notary error will cause a peer to fail with unexpected session end during ReceiveFinalityFlow that passes through user code`() {
var dischargedCounter = 0
StaffedFlowHospital.onFlowErrorPropagated.add { _, _ ->
val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(isDebug = false, startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
nodeAHandle.rpc.let {
val ref = it.startFlow(::CreateTransactionFlow, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow(20.seconds)
it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds)
it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds)
// 1 is the notary failing to notarise and propagating the error
// 2 is the receiving flow failing due to the unexpected session end error
assertEquals(2, dischargedCounter)
@Test(timeout = 300_000)
fun `unexpected session end errors outside of ReceiveFinalityFlow are not handled`() {
var dischargedCounter = 0
var observationCounter = 0
StaffedFlowHospital.onFlowErrorPropagated.add { _, _ ->
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(isDebug = false, startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeCHandle = startNode(providedName = CHARLIE_NAME, rpcUsers = listOf(user)).getOrThrow()
nodeAHandle.rpc.let {
val ref = it.startFlow(::CreateTransactionFlow, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow(20.seconds)
val ref2 = it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds)
val ref3 = it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeCHandle.nodeInfo.singleIdentity(), ref2).returnValue.getOrThrow(20.seconds)
it.startFlow(::CreateTransactionButDontFinalizeFlow, nodeBHandle.nodeInfo.singleIdentity(), ref3).returnValue.getOrThrow(20.seconds)
assertEquals(0, dischargedCounter)
assertEquals(1, observationCounter)
@Test(timeout = 300_000)
fun `unexpected session end errors within ReceiveFinalityFlow can be caught and the flow can end gracefully`() {
var dischargedCounter = 0
var observationCounter = 0
StaffedFlowHospital.onFlowErrorPropagated.add { _, _ ->
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(isDebug = false, startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
nodeAHandle.rpc.let {
val ref = it.startFlow(::CreateTransactionFlow, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow(20.seconds)
it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds)
it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref, true).returnValue.getOrThrow(20.seconds)
// 1 is the notary failing to notarise and propagating the error
assertEquals(1, dischargedCounter)
assertEquals(0, observationCounter)
class IssueFlow(val notary: Party) : FlowLogic<StateAndRef<SingleOwnerState>>() {
@ -296,4 +392,136 @@ class FlowHospitalTest {
class CreateTransactionFlow(private val peer: Party) : FlowLogic<StateAndRef<DummyState>>() {
override fun call(): StateAndRef<DummyState> {
val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply {
addOutputState(DummyState(participants = listOf(ourIdentity)))
addCommand(DummyContract.Commands.Create(), listOf(ourIdentity.owningKey, peer.owningKey))
val stx = serviceHub.signInitialTransaction(tx)
val session = initiateFlow(peer)
val ftx = subFlow(CollectSignaturesFlow(stx, listOf(session)))
subFlow(FinalityFlow(ftx, session))
return ftx.coreTransaction.outRef(0)
class CreateTransactionResponderFlow(private val session: FlowSession) : FlowLogic<Unit>() {
override fun call() {
val stx = subFlow(object : SignTransactionFlow(session) {
override fun checkTransaction(stx: SignedTransaction) {
logger.info("CREATE TX - SIGNED TO SIGN TX")
subFlow(ReceiveFinalityFlow(session, stx.id))
logger.info("CREATE TX - RECEIVED TX")
class SpendStateAndCatchDoubleSpendFlow(
private val peer: Party,
private val ref: StateAndRef<DummyState>,
private val consumePeerError: Boolean
) : FlowLogic<StateAndRef<DummyState>>() {
constructor(peer: Party, ref: StateAndRef<DummyState>): this(peer, ref, false)
override fun call(): StateAndRef<DummyState> {
val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply {
addOutputState(DummyState(participants = listOf(ourIdentity)))
addCommand(DummyContract.Commands.Move(), listOf(ourIdentity.owningKey, peer.owningKey))
val stx = serviceHub.signInitialTransaction(tx)
val session = initiateFlow(peer)
val ftx = subFlow(CollectSignaturesFlow(stx, listOf(session)))
try {
subFlow(FinalityFlow(ftx, session))
} catch(e: NotaryException) {
logger.info("Caught notary exception")
return ftx.coreTransaction.outRef(0)
class SpendStateAndCatchDoubleSpendResponderFlow(private val session: FlowSession) : FlowLogic<Unit>() {
companion object {
var exceptionSeenInUserFlow = false
override fun call() {
val consumeError = session.receive<Boolean>().unwrap { it }
val stx = subFlow(object : SignTransactionFlow(session) {
override fun checkTransaction(stx: SignedTransaction) {
try {
subFlow(ReceiveFinalityFlow(session, stx.id))
} catch (e: UnexpectedFlowEndException) {
exceptionSeenInUserFlow = true
if (!consumeError) {
throw e
class CreateTransactionButDontFinalizeFlow(private val peer: Party, private val ref: StateAndRef<DummyState>) : FlowLogic<Unit>() {
override fun call() {
val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply {
addOutputState(DummyState(participants = listOf(ourIdentity)))
addCommand(DummyContract.Commands.Move(), listOf(ourIdentity.owningKey))
val stx = serviceHub.signInitialTransaction(tx)
val session = initiateFlow(peer)
// Send the transaction id to the peer instead of the transaction.
// This allows transaction dependency resolution to occur within the peer's [ReceiveTransactionFlow].
// Mimic notarisation from [FinalityFlow] so that failing inside [ResolveTransactionsFlow] can be achieved.
val notarySignatures = subFlow(NotaryFlow.Client(stx, skipVerification = true))
val notarisedTx = stx + notarySignatures
class CreateTransactionButDontFinalizeResponderFlow(private val session: FlowSession) : FlowLogic<Unit>() {
companion object {
var exceptionSeenInUserFlow = false
override fun call() {
val id = session.receive<SecureHash>().unwrap { it }
try {
subFlow(ReceiveFinalityFlow(session, id))
} catch (e: UnexpectedFlowEndException) {
exceptionSeenInUserFlow = true
throw e
@ -11,6 +11,7 @@ import net.corda.core.flows.StateMachineRunId
import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.Party
import net.corda.core.internal.DeclaredField
import net.corda.core.internal.ResolveTransactionsFlow
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.TimedFlow
import net.corda.core.internal.VisibleForTesting
@ -21,6 +22,7 @@ import net.corda.core.utilities.debug
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.node.services.FinalityHandler
import net.corda.node.services.statemachine.transitions.StartedFlowTransition
import org.hibernate.exception.ConstraintViolationException
import rx.subjects.PublishSubject
import java.io.Closeable
@ -29,10 +31,9 @@ import java.sql.SQLTransientConnectionException
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.*
import java.util.Timer
import java.util.concurrent.ConcurrentHashMap
import javax.persistence.PersistenceException
import kotlin.collections.HashMap
import kotlin.concurrent.timerTask
import kotlin.math.pow
@ -485,13 +486,22 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
"the flow by re-starting the node. State machine state: $currentState", newError)
} else if (isFromReceiveFinalityFlow(newError)) {
if (isErrorPropagatedFromCounterparty(newError) && isErrorThrownDuringReceiveFinality(newError)) {
// no need to keep around the flow, since notarisation has already failed at the counterparty.
} else {
log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " +
"the flow by re-starting the node. State machine state: $currentState", newError)
when {
isErrorPropagatedFromCounterparty(newError) && isErrorThrownDuringReceiveTransactionFlow(newError) -> {
// no need to keep around the flow, since notarisation has already failed at the counterparty.
isEndSessionErrorThrownDuringReceiveTransactionFlow(newError) -> {
// Typically occurs if the initiating flow catches a notary exception and ends their flow successfully.
else -> {
"Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " +
"the flow by re-starting the node. State machine state: $currentState", newError
} else {
@ -523,13 +533,26 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
* This is because in the latter case, the transaction might have already been finalised and deleting the flow
* would introduce risk for inconsistency between nodes.
private fun isErrorThrownDuringReceiveFinality(error: Throwable): Boolean {
private fun isErrorThrownDuringReceiveTransactionFlow(error: Throwable): Boolean {
val strippedStacktrace = error.stackTrace
.filterNot { it?.className?.contains("counter-flow exception from peer") ?: false }
.filterNot { it?.className?.startsWith("net.corda.node.services.statemachine.") ?: false }
return strippedStacktrace.isNotEmpty()
&& strippedStacktrace.first().className.startsWith(ReceiveTransactionFlow::class.qualifiedName!!)
* Checks if an end session error exception was thrown and that it did so within [ReceiveTransactionFlow].
* The check for [ReceiveTransactionFlow] is important to ensure that the session didn't end within [ResolveTransactionsFlow] which
* implies that it has been receiving the transaction's dependencies and therefore ending before receiving the whole transaction
* is incorrect behaviour.
private fun isEndSessionErrorThrownDuringReceiveTransactionFlow(error: Throwable): Boolean {
return error is UnexpectedFlowEndException
&& error.message?.contains(StartedFlowTransition.UNEXPECTED_SESSION_END_MESSAGE) == true
&& isErrorThrownDuringReceiveTransactionFlow(error)
@ -28,6 +28,7 @@ class StartedFlowTransition(
companion object {
private val logger: Logger = contextLogger()
const val UNEXPECTED_SESSION_END_MESSAGE = "Received session end message instead of a data session message. Mismatched send and receive?"
override fun transition(): TransitionResult {
@ -253,7 +254,7 @@ class StartedFlowTransition(
newSessionMessages[sessionId] = sessionState.copy(receivedMessages = messages.subList(1, messages.size).toArrayList())
// at this point, we've already checked for errors and session ends, so it's guaranteed that the first message will be a data message.
resultMessages[sessionId] = if (messages[0] is EndSessionMessage) {
throw UnexpectedFlowEndException("Received session end message instead of a data session message. Mismatched send and receive?")
throw UnexpectedFlowEndException(UNEXPECTED_SESSION_END_MESSAGE)
} else {
(messages[0] as DataSessionMessage).payload
Reference in New Issue
Block a user