diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt index c26a106910..139bb89505 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt @@ -4,6 +4,8 @@ import co.paralleluniverse.fibers.Suspendable import net.corda.client.rpc.CordaRPCClient import net.corda.core.contracts.PartyAndReference import net.corda.core.contracts.StateAndRef +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.CollectSignaturesFlow import net.corda.core.flows.FinalityFlow import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic @@ -12,20 +14,29 @@ import net.corda.core.flows.HospitalizeFlowException import net.corda.core.flows.InitiatedBy import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.NotaryException +import net.corda.core.flows.NotaryFlow import net.corda.core.flows.ReceiveFinalityFlow +import net.corda.core.flows.SignTransactionFlow import net.corda.core.flows.StartableByRPC +import net.corda.core.flows.UnexpectedFlowEndException import net.corda.core.identity.Party import net.corda.core.internal.concurrent.transpose import net.corda.core.messaging.StateMachineUpdate import net.corda.core.messaging.startFlow +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds +import net.corda.core.utilities.unwrap import net.corda.node.services.Permissions import net.corda.testing.contracts.DummyContract import net.corda.testing.contracts.DummyContract.SingleOwnerState +import net.corda.testing.contracts.DummyState import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME import net.corda.testing.core.CHARLIE_NAME +import net.corda.testing.core.singleIdentity import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.driver import net.corda.testing.node.User @@ -33,6 +44,7 @@ import net.corda.testing.node.internal.enclosedCordapp import net.corda.testing.node.internal.findCordapp import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.Before import org.junit.Test import java.sql.SQLException import java.util.* @@ -47,6 +59,12 @@ class FlowHospitalTest { private val rpcUser = User("user1", "test", permissions = setOf(Permissions.all())) + @Before + fun before() { + SpendStateAndCatchDoubleSpendResponderFlow.exceptionSeenInUserFlow = false + CreateTransactionButDontFinalizeResponderFlow.exceptionSeenInUserFlow = false + } + @Test(timeout = 300_000) fun `when double spend occurs, the flow is successfully deleted on the counterparty`() { driver(DriverParameters(cordappsForAllNodes = listOf(enclosedCordapp(), findCordapp("net.corda.testing.contracts")))) { @@ -172,7 +190,7 @@ class FlowHospitalTest { @Test(timeout = 300_000) fun `HospitalizeFlowException cloaking an important exception thrown`() { var dischargedCounter = 0 - var observationCounter: Int = 0 + var observationCounter = 0 StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++dischargedCounter } @@ -197,6 +215,84 @@ class FlowHospitalTest { } } + @Test(timeout = 300_000) + fun `catching a notary error will cause a peer to fail with unexpected session end during ReceiveFinalityFlow that passes through user code`() { + var dischargedCounter = 0 + StaffedFlowHospital.onFlowErrorPropagated.add { _, _ -> + ++dischargedCounter + } + val user = User("mark", "dadada", setOf(Permissions.all())) + driver(DriverParameters(isDebug = false, startNodesInProcess = true)) { + + val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() + nodeAHandle.rpc.let { + val ref = it.startFlow(::CreateTransactionFlow, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow(20.seconds) + it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds) + it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds) + } + } + // 1 is the notary failing to notarise and propagating the error + // 2 is the receiving flow failing due to the unexpected session end error + assertEquals(2, dischargedCounter) + assertTrue(SpendStateAndCatchDoubleSpendResponderFlow.exceptionSeenInUserFlow) + } + + @Test(timeout = 300_000) + fun `unexpected session end errors outside of ReceiveFinalityFlow are not handled`() { + var dischargedCounter = 0 + var observationCounter = 0 + StaffedFlowHospital.onFlowErrorPropagated.add { _, _ -> + ++dischargedCounter + } + StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> + ++observationCounter + } + val user = User("mark", "dadada", setOf(Permissions.all())) + driver(DriverParameters(isDebug = false, startNodesInProcess = true)) { + + val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() + val nodeCHandle = startNode(providedName = CHARLIE_NAME, rpcUsers = listOf(user)).getOrThrow() + nodeAHandle.rpc.let { + val ref = it.startFlow(::CreateTransactionFlow, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow(20.seconds) + val ref2 = it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds) + val ref3 = it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeCHandle.nodeInfo.singleIdentity(), ref2).returnValue.getOrThrow(20.seconds) + it.startFlow(::CreateTransactionButDontFinalizeFlow, nodeBHandle.nodeInfo.singleIdentity(), ref3).returnValue.getOrThrow(20.seconds) + } + } + assertEquals(0, dischargedCounter) + assertEquals(1, observationCounter) + assertTrue(CreateTransactionButDontFinalizeResponderFlow.exceptionSeenInUserFlow) + } + + @Test(timeout = 300_000) + fun `unexpected session end errors within ReceiveFinalityFlow can be caught and the flow can end gracefully`() { + var dischargedCounter = 0 + var observationCounter = 0 + StaffedFlowHospital.onFlowErrorPropagated.add { _, _ -> + ++dischargedCounter + } + StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> + ++observationCounter + } + val user = User("mark", "dadada", setOf(Permissions.all())) + driver(DriverParameters(isDebug = false, startNodesInProcess = true)) { + + val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() + nodeAHandle.rpc.let { + val ref = it.startFlow(::CreateTransactionFlow, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow(20.seconds) + it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds) + it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref, true).returnValue.getOrThrow(20.seconds) + } + } + // 1 is the notary failing to notarise and propagating the error + assertEquals(1, dischargedCounter) + assertEquals(0, observationCounter) + assertTrue(SpendStateAndCatchDoubleSpendResponderFlow.exceptionSeenInUserFlow) + } + @StartableByRPC class IssueFlow(val notary: Party) : FlowLogic>() { @@ -296,4 +392,136 @@ class FlowHospitalTest { setCause(SQLException("deadlock")) } } + + @InitiatingFlow + @StartableByRPC + class CreateTransactionFlow(private val peer: Party) : FlowLogic>() { + @Suspendable + override fun call(): StateAndRef { + val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply { + addOutputState(DummyState(participants = listOf(ourIdentity))) + addCommand(DummyContract.Commands.Create(), listOf(ourIdentity.owningKey, peer.owningKey)) + } + val stx = serviceHub.signInitialTransaction(tx) + val session = initiateFlow(peer) + val ftx = subFlow(CollectSignaturesFlow(stx, listOf(session))) + subFlow(FinalityFlow(ftx, session)) + + return ftx.coreTransaction.outRef(0) + } + } + + @InitiatedBy(CreateTransactionFlow::class) + class CreateTransactionResponderFlow(private val session: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + logger.info("CREATE TX - WAITING TO SIGN TX") + val stx = subFlow(object : SignTransactionFlow(session) { + override fun checkTransaction(stx: SignedTransaction) { + + } + }) + logger.info("CREATE TX - SIGNED TO SIGN TX") + subFlow(ReceiveFinalityFlow(session, stx.id)) + logger.info("CREATE TX - RECEIVED TX") + } + } + + @InitiatingFlow + @StartableByRPC + class SpendStateAndCatchDoubleSpendFlow( + private val peer: Party, + private val ref: StateAndRef, + private val consumePeerError: Boolean + ) : FlowLogic>() { + + constructor(peer: Party, ref: StateAndRef): this(peer, ref, false) + + @Suspendable + override fun call(): StateAndRef { + val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply { + addInputState(ref) + addOutputState(DummyState(participants = listOf(ourIdentity))) + addCommand(DummyContract.Commands.Move(), listOf(ourIdentity.owningKey, peer.owningKey)) + } + val stx = serviceHub.signInitialTransaction(tx) + val session = initiateFlow(peer) + session.send(consumePeerError) + val ftx = subFlow(CollectSignaturesFlow(stx, listOf(session))) + try { + subFlow(FinalityFlow(ftx, session)) + } catch(e: NotaryException) { + logger.info("Caught notary exception") + } + return ftx.coreTransaction.outRef(0) + } + } + + @InitiatedBy(SpendStateAndCatchDoubleSpendFlow::class) + class SpendStateAndCatchDoubleSpendResponderFlow(private val session: FlowSession) : FlowLogic() { + + companion object { + var exceptionSeenInUserFlow = false + } + + @Suspendable + override fun call() { + val consumeError = session.receive().unwrap { it } + val stx = subFlow(object : SignTransactionFlow(session) { + override fun checkTransaction(stx: SignedTransaction) { + + } + }) + try { + subFlow(ReceiveFinalityFlow(session, stx.id)) + } catch (e: UnexpectedFlowEndException) { + exceptionSeenInUserFlow = true + if (!consumeError) { + throw e + } + } + } + } + + @InitiatingFlow + @StartableByRPC + class CreateTransactionButDontFinalizeFlow(private val peer: Party, private val ref: StateAndRef) : FlowLogic() { + + @Suspendable + override fun call() { + val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply { + addInputState(ref) + addOutputState(DummyState(participants = listOf(ourIdentity))) + addCommand(DummyContract.Commands.Move(), listOf(ourIdentity.owningKey)) + } + val stx = serviceHub.signInitialTransaction(tx) + val session = initiateFlow(peer) + // Send the transaction id to the peer instead of the transaction. + // This allows transaction dependency resolution to occur within the peer's [ReceiveTransactionFlow]. + session.send(stx.id) + // Mimic notarisation from [FinalityFlow] so that failing inside [ResolveTransactionsFlow] can be achieved. + val notarySignatures = subFlow(NotaryFlow.Client(stx, skipVerification = true)) + val notarisedTx = stx + notarySignatures + session.send(notarisedTx) + } + } + + @InitiatedBy(CreateTransactionButDontFinalizeFlow::class) + class CreateTransactionButDontFinalizeResponderFlow(private val session: FlowSession) : FlowLogic() { + + companion object { + var exceptionSeenInUserFlow = false + } + + @Suspendable + override fun call() { + val id = session.receive().unwrap { it } + try { + subFlow(ReceiveFinalityFlow(session, id)) + } catch (e: UnexpectedFlowEndException) { + exceptionSeenInUserFlow = true + throw e + } + } + } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index 519b2bd3d5..311cb5b164 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -11,6 +11,7 @@ import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.UnexpectedFlowEndException import net.corda.core.identity.Party import net.corda.core.internal.DeclaredField +import net.corda.core.internal.ResolveTransactionsFlow import net.corda.core.internal.ThreadBox import net.corda.core.internal.TimedFlow import net.corda.core.internal.VisibleForTesting @@ -21,6 +22,7 @@ import net.corda.core.utilities.debug import net.corda.core.utilities.minutes import net.corda.core.utilities.seconds import net.corda.node.services.FinalityHandler +import net.corda.node.services.statemachine.transitions.StartedFlowTransition import org.hibernate.exception.ConstraintViolationException import rx.subjects.PublishSubject import java.io.Closeable @@ -29,10 +31,9 @@ import java.sql.SQLTransientConnectionException import java.time.Clock import java.time.Duration import java.time.Instant -import java.util.* +import java.util.Timer import java.util.concurrent.ConcurrentHashMap import javax.persistence.PersistenceException -import kotlin.collections.HashMap import kotlin.concurrent.timerTask import kotlin.math.pow @@ -485,13 +486,22 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, "the flow by re-starting the node. State machine state: $currentState", newError) Diagnosis.OVERNIGHT_OBSERVATION } else if (isFromReceiveFinalityFlow(newError)) { - if (isErrorPropagatedFromCounterparty(newError) && isErrorThrownDuringReceiveFinality(newError)) { - // no need to keep around the flow, since notarisation has already failed at the counterparty. - Diagnosis.NOT_MY_SPECIALTY - } else { - log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " + - "the flow by re-starting the node. State machine state: $currentState", newError) - Diagnosis.OVERNIGHT_OBSERVATION + when { + isErrorPropagatedFromCounterparty(newError) && isErrorThrownDuringReceiveTransactionFlow(newError) -> { + // no need to keep around the flow, since notarisation has already failed at the counterparty. + Diagnosis.NOT_MY_SPECIALTY + } + isEndSessionErrorThrownDuringReceiveTransactionFlow(newError) -> { + // Typically occurs if the initiating flow catches a notary exception and ends their flow successfully. + Diagnosis.NOT_MY_SPECIALTY + } + else -> { + log.warn( + "Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " + + "the flow by re-starting the node. State machine state: $currentState", newError + ) + Diagnosis.OVERNIGHT_OBSERVATION + } } } else { Diagnosis.NOT_MY_SPECIALTY @@ -523,13 +533,26 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, * This is because in the latter case, the transaction might have already been finalised and deleting the flow * would introduce risk for inconsistency between nodes. */ - private fun isErrorThrownDuringReceiveFinality(error: Throwable): Boolean { + private fun isErrorThrownDuringReceiveTransactionFlow(error: Throwable): Boolean { val strippedStacktrace = error.stackTrace .filterNot { it?.className?.contains("counter-flow exception from peer") ?: false } .filterNot { it?.className?.startsWith("net.corda.node.services.statemachine.") ?: false } return strippedStacktrace.isNotEmpty() && strippedStacktrace.first().className.startsWith(ReceiveTransactionFlow::class.qualifiedName!!) } + + /** + * Checks if an end session error exception was thrown and that it did so within [ReceiveTransactionFlow]. + * + * The check for [ReceiveTransactionFlow] is important to ensure that the session didn't end within [ResolveTransactionsFlow] which + * implies that it has been receiving the transaction's dependencies and therefore ending before receiving the whole transaction + * is incorrect behaviour. + */ + private fun isEndSessionErrorThrownDuringReceiveTransactionFlow(error: Throwable): Boolean { + return error is UnexpectedFlowEndException + && error.message?.contains(StartedFlowTransition.UNEXPECTED_SESSION_END_MESSAGE) == true + && isErrorThrownDuringReceiveTransactionFlow(error) + } } /** diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt index 2f2b8a167b..79bad802fe 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt @@ -28,6 +28,7 @@ class StartedFlowTransition( companion object { private val logger: Logger = contextLogger() + const val UNEXPECTED_SESSION_END_MESSAGE = "Received session end message instead of a data session message. Mismatched send and receive?" } override fun transition(): TransitionResult { @@ -253,7 +254,7 @@ class StartedFlowTransition( newSessionMessages[sessionId] = sessionState.copy(receivedMessages = messages.subList(1, messages.size).toArrayList()) // at this point, we've already checked for errors and session ends, so it's guaranteed that the first message will be a data message. resultMessages[sessionId] = if (messages[0] is EndSessionMessage) { - throw UnexpectedFlowEndException("Received session end message instead of a data session message. Mismatched send and receive?") + throw UnexpectedFlowEndException(UNEXPECTED_SESSION_END_MESSAGE) } else { (messages[0] as DataSessionMessage).payload }