From f37638c93db95b87dc90844da2431130e2461754 Mon Sep 17 00:00:00 2001 From: Dimos Raptis Date: Wed, 16 Oct 2019 09:37:28 +0100 Subject: [PATCH] [CORDA-3122] - Cleanup non-finalised, errored flows (#5594) * [CORDA-3122] - Cleanup non-finalised, errored flows * detekt --- .../core/flows/ReceiveTransactionFlow.kt | 3 + .../services/statemachine/FlowHospitalTest.kt | 165 ++++++++++++++++++ .../statemachine/StaffedFlowHospital.kt | 45 ++++- 3 files changed, 212 insertions(+), 1 deletion(-) create mode 100644 node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt diff --git a/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt b/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt index b45f70d767..413f01db3f 100644 --- a/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt @@ -21,6 +21,9 @@ import java.security.SignatureException * Please note that it will *not* store the transaction to the vault unless that is explicitly requested and checkSufficientSignatures is true. * Setting statesToRecord to anything else when checkSufficientSignatures is false will *not* update the vault. * + * Attention: At the moment, this flow receives a [SignedTransaction] first thing and then proceeds by invoking a [ResolveTransactionsFlow] subflow. + * This is used as a criterion to identify cases, where a counterparty has failed notarising a transact + * * @property otherSideSession session to the other side which is calling [SendTransactionFlow]. * @property checkSufficientSignatures if true checks all required signatures are present. See [SignedTransaction.verify]. * @property statesToRecord which transaction states should be recorded in the vault, if any. diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt new file mode 100644 index 0000000000..fcd66478e6 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt @@ -0,0 +1,165 @@ +package net.corda.node.services.statemachine + +import co.paralleluniverse.fibers.Suspendable +import net.corda.client.rpc.CordaRPCClient +import net.corda.core.contracts.PartyAndReference +import net.corda.core.contracts.StateAndRef +import net.corda.core.flows.FinalityFlow +import net.corda.core.flows.FlowException +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.NotaryException +import net.corda.core.flows.ReceiveFinalityFlow +import net.corda.core.flows.StartableByRPC +import net.corda.core.identity.Party +import net.corda.core.messaging.StateMachineUpdate +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.getOrThrow +import net.corda.node.services.Permissions +import net.corda.testing.contracts.DummyContract +import net.corda.testing.contracts.DummyContract.SingleOwnerState +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.CHARLIE_NAME +import net.corda.testing.driver.driver +import net.corda.testing.node.User +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.Test +import java.util.Random +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +class FlowHospitalTest { + + private val rpcUser = User("user1", "test", permissions = setOf(Permissions.all())) + + @Test + fun `when double spend occurs, the flow is successfully deleted on the counterparty`() { + driver { + val charlie = startNode(providedName = CHARLIE_NAME, rpcUsers = listOf(rpcUser)).getOrThrow() + val alice = startNode(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)).getOrThrow() + + val charlieClient = CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy + + val aliceParty = aliceClient.nodeInfo().legalIdentities.first() + + val (firstLatch, secondLatch) = arrayOf(CountDownLatch(1), CountDownLatch(1)) + + // case 1: the notary exception is not caught + val stateAndRef = charlieClient.startFlow(::IssueFlow, defaultNotaryIdentity).returnValue.get() + charlieClient.startFlow(::SpendFlow, stateAndRef, aliceParty).returnValue.get() + + val firstSubscription = aliceClient.stateMachinesFeed().updates.subscribe { + if (it is StateMachineUpdate.Removed && it.result.isFailure) + firstLatch.countDown() + } + + assertThatThrownBy { + charlieClient.startFlow(::SpendFlow, stateAndRef, aliceParty).returnValue.getOrThrow() + }.isInstanceOf(NotaryException::class.java) + + assertThat(firstLatch.await(5, TimeUnit.SECONDS)).isTrue() + firstSubscription.unsubscribe() + assertThat(aliceClient.stateMachinesSnapshot()).isEmpty() + + // case 2: the notary exception is caught and wrapped in a custom exception + val secondStateAndRef = charlieClient.startFlow(::IssueFlow, defaultNotaryIdentity).returnValue.get() + charlieClient.startFlow(::SpendFlowWithCustomException, secondStateAndRef, aliceParty).returnValue.get() + + val secondSubscription = aliceClient.stateMachinesFeed().updates.subscribe{ + if (it is StateMachineUpdate.Removed && it.result.isFailure) + secondLatch.countDown() + } + + assertThatThrownBy { + charlieClient.startFlow(::SpendFlowWithCustomException, secondStateAndRef, aliceParty).returnValue.getOrThrow() + }.isInstanceOf(DoubleSpendException::class.java) + + assertThat(secondLatch.await(5, TimeUnit.SECONDS)).isTrue() + secondSubscription.unsubscribe() + assertThat(aliceClient.stateMachinesSnapshot()).isEmpty() + } + } + + @StartableByRPC + class IssueFlow(val notary: Party): FlowLogic>() { + + @Suspendable + override fun call(): StateAndRef { + val partyAndReference = PartyAndReference(ourIdentity, OpaqueBytes.of(1)) + val txBuilder = DummyContract.generateInitial(Random().nextInt(), notary, partyAndReference) + val signedTransaction = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey) + val notarised = subFlow(FinalityFlow(signedTransaction, emptySet())) + return notarised.coreTransaction.outRef(0) + } + + } + + @StartableByRPC + @InitiatingFlow + class SpendFlow(private val stateAndRef: StateAndRef, private val newOwner: Party): FlowLogic() { + + @Suspendable + override fun call() { + val txBuilder = DummyContract.move(stateAndRef, newOwner) + val signedTransaction = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey) + val sessionWithCounterParty = initiateFlow(newOwner) + sessionWithCounterParty.sendAndReceive("initial-message") + subFlow(FinalityFlow(signedTransaction, setOf(sessionWithCounterParty))) + } + + } + + @InitiatedBy(SpendFlow::class) + class AcceptSpendFlow(private val otherSide: FlowSession): FlowLogic() { + + @Suspendable + override fun call() { + otherSide.receive() + otherSide.send("initial-response") + + subFlow(ReceiveFinalityFlow(otherSide)) + } + + } + + @StartableByRPC + @InitiatingFlow + class SpendFlowWithCustomException(private val stateAndRef: StateAndRef, private val newOwner: Party): + FlowLogic() { + + @Suspendable + override fun call() { + val txBuilder = DummyContract.move(stateAndRef, newOwner) + val signedTransaction = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey) + val sessionWithCounterParty = initiateFlow(newOwner) + sessionWithCounterParty.sendAndReceive("initial-message") + try { + subFlow(FinalityFlow(signedTransaction, setOf(sessionWithCounterParty))) + } catch (e: NotaryException) { + throw DoubleSpendException("double spend!", e) + } + } + + } + + @InitiatedBy(SpendFlowWithCustomException::class) + class AcceptSpendFlowWithCustomException(private val otherSide: FlowSession): FlowLogic() { + + @Suspendable + override fun call() { + otherSide.receive() + otherSide.send("initial-response") + + subFlow(ReceiveFinalityFlow(otherSide)) + } + + } + + class DoubleSpendException(message: String, cause: Throwable): FlowException(message, cause) + +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index aa1179a3be..476bd0829f 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -1,9 +1,13 @@ package net.corda.node.services.statemachine import net.corda.core.crypto.newSecureRandom +import net.corda.core.flows.FlowException import net.corda.core.flows.ReceiveFinalityFlow +import net.corda.core.flows.ReceiveTransactionFlow import net.corda.core.flows.StateMachineRunId +import net.corda.core.flows.UnexpectedFlowEndException import net.corda.core.identity.Party +import net.corda.core.internal.DeclaredField import net.corda.core.internal.ThreadBox import net.corda.core.internal.TimedFlow import net.corda.core.internal.bufferUntilSubscribed @@ -307,10 +311,19 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val object FinalityDoctor : Staff { override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis { - return if (currentState.flowLogic is FinalityHandler || isFromReceiveFinalityFlow(newError)) { + return if (currentState.flowLogic is FinalityHandler) { log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " + "the flow by re-starting the node. State machine state: $currentState", newError) Diagnosis.OVERNIGHT_OBSERVATION + } else if (isFromReceiveFinalityFlow(newError)) { + if (isErrorPropagatedFromCounterparty(newError) && isErrorThrownDuringReceiveFinality(newError)) { + // no need to keep around the flow, since notarisation has already failed at the counterparty. + Diagnosis.NOT_MY_SPECIALTY + } else { + log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " + + "the flow by re-starting the node. State machine state: $currentState", newError) + Diagnosis.OVERNIGHT_OBSERVATION + } } else { Diagnosis.NOT_MY_SPECIALTY } @@ -319,6 +332,36 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val private fun isFromReceiveFinalityFlow(throwable: Throwable): Boolean { return throwable.stackTrace.any { it.className == ReceiveFinalityFlow::class.java.name } } + + private fun isErrorPropagatedFromCounterparty(error: Throwable): Boolean { + return when(error) { + is UnexpectedFlowEndException -> { + val peer = DeclaredField(UnexpectedFlowEndException::class.java, "peer", error).value + peer != null + } + is FlowException -> { + val peer = DeclaredField(FlowException::class.java, "peer", error).value + peer != null + } + else -> false + } + } + + /** + * This method will return true if [ReceiveTransactionFlow] is at the top of the stack during the error. + * As a result, if the failure happened during a sub-flow invoked from [ReceiveTransactionFlow], the method will return false. + * + * This is because in the latter case, the transaction might have already been finalised and deleting the flow + * would introduce risk for inconsistency between nodes. + */ + private fun isErrorThrownDuringReceiveFinality(error: Throwable): Boolean { + val strippedStacktrace = error.stackTrace + .filterNot { it?.className?.contains("counter-flow exception from peer") ?: false } + .filterNot { it?.className?.startsWith("net.corda.node.services.statemachine.") ?: false } + return strippedStacktrace.isNotEmpty() && + strippedStacktrace.first().className.startsWith(ReceiveTransactionFlow::class.qualifiedName!! ) + } + } /**