[CORDA-3122] - Cleanup non-finalised, errored flows (#5594)

* [CORDA-3122] - Cleanup non-finalised, errored flows

* detekt
This commit is contained in:
Dimos Raptis 2019-10-16 09:37:28 +01:00 committed by Jonathan Locke
parent ee09cd8762
commit f37638c93d
3 changed files with 212 additions and 1 deletions

View File

@ -21,6 +21,9 @@ import java.security.SignatureException
* Please note that it will *not* store the transaction to the vault unless that is explicitly requested and checkSufficientSignatures is true.
* Setting statesToRecord to anything else when checkSufficientSignatures is false will *not* update the vault.
*
* Attention: At the moment, this flow receives a [SignedTransaction] first thing and then proceeds by invoking a [ResolveTransactionsFlow] subflow.
* This is used as a criterion to identify cases, where a counterparty has failed notarising a transact
*
* @property otherSideSession session to the other side which is calling [SendTransactionFlow].
* @property checkSufficientSignatures if true checks all required signatures are present. See [SignedTransaction.verify].
* @property statesToRecord which transaction states should be recorded in the vault, if any.

View File

@ -0,0 +1,165 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.contracts.PartyAndReference
import net.corda.core.contracts.StateAndRef
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.NotaryException
import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.Permissions
import net.corda.testing.contracts.DummyContract
import net.corda.testing.contracts.DummyContract.SingleOwnerState
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
import java.util.Random
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
class FlowHospitalTest {
private val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
@Test
fun `when double spend occurs, the flow is successfully deleted on the counterparty`() {
driver {
val charlie = startNode(providedName = CHARLIE_NAME, rpcUsers = listOf(rpcUser)).getOrThrow()
val alice = startNode(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)).getOrThrow()
val charlieClient = CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val aliceParty = aliceClient.nodeInfo().legalIdentities.first()
val (firstLatch, secondLatch) = arrayOf(CountDownLatch(1), CountDownLatch(1))
// case 1: the notary exception is not caught
val stateAndRef = charlieClient.startFlow(::IssueFlow, defaultNotaryIdentity).returnValue.get()
charlieClient.startFlow(::SpendFlow, stateAndRef, aliceParty).returnValue.get()
val firstSubscription = aliceClient.stateMachinesFeed().updates.subscribe {
if (it is StateMachineUpdate.Removed && it.result.isFailure)
firstLatch.countDown()
}
assertThatThrownBy {
charlieClient.startFlow(::SpendFlow, stateAndRef, aliceParty).returnValue.getOrThrow()
}.isInstanceOf(NotaryException::class.java)
assertThat(firstLatch.await(5, TimeUnit.SECONDS)).isTrue()
firstSubscription.unsubscribe()
assertThat(aliceClient.stateMachinesSnapshot()).isEmpty()
// case 2: the notary exception is caught and wrapped in a custom exception
val secondStateAndRef = charlieClient.startFlow(::IssueFlow, defaultNotaryIdentity).returnValue.get()
charlieClient.startFlow(::SpendFlowWithCustomException, secondStateAndRef, aliceParty).returnValue.get()
val secondSubscription = aliceClient.stateMachinesFeed().updates.subscribe{
if (it is StateMachineUpdate.Removed && it.result.isFailure)
secondLatch.countDown()
}
assertThatThrownBy {
charlieClient.startFlow(::SpendFlowWithCustomException, secondStateAndRef, aliceParty).returnValue.getOrThrow()
}.isInstanceOf(DoubleSpendException::class.java)
assertThat(secondLatch.await(5, TimeUnit.SECONDS)).isTrue()
secondSubscription.unsubscribe()
assertThat(aliceClient.stateMachinesSnapshot()).isEmpty()
}
}
@StartableByRPC
class IssueFlow(val notary: Party): FlowLogic<StateAndRef<SingleOwnerState>>() {
@Suspendable
override fun call(): StateAndRef<SingleOwnerState> {
val partyAndReference = PartyAndReference(ourIdentity, OpaqueBytes.of(1))
val txBuilder = DummyContract.generateInitial(Random().nextInt(), notary, partyAndReference)
val signedTransaction = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey)
val notarised = subFlow(FinalityFlow(signedTransaction, emptySet<FlowSession>()))
return notarised.coreTransaction.outRef(0)
}
}
@StartableByRPC
@InitiatingFlow
class SpendFlow(private val stateAndRef: StateAndRef<SingleOwnerState>, private val newOwner: Party): FlowLogic<Unit>() {
@Suspendable
override fun call() {
val txBuilder = DummyContract.move(stateAndRef, newOwner)
val signedTransaction = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey)
val sessionWithCounterParty = initiateFlow(newOwner)
sessionWithCounterParty.sendAndReceive<String>("initial-message")
subFlow(FinalityFlow(signedTransaction, setOf(sessionWithCounterParty)))
}
}
@InitiatedBy(SpendFlow::class)
class AcceptSpendFlow(private val otherSide: FlowSession): FlowLogic<Unit>() {
@Suspendable
override fun call() {
otherSide.receive<String>()
otherSide.send("initial-response")
subFlow(ReceiveFinalityFlow(otherSide))
}
}
@StartableByRPC
@InitiatingFlow
class SpendFlowWithCustomException(private val stateAndRef: StateAndRef<SingleOwnerState>, private val newOwner: Party):
FlowLogic<Unit>() {
@Suspendable
override fun call() {
val txBuilder = DummyContract.move(stateAndRef, newOwner)
val signedTransaction = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey)
val sessionWithCounterParty = initiateFlow(newOwner)
sessionWithCounterParty.sendAndReceive<String>("initial-message")
try {
subFlow(FinalityFlow(signedTransaction, setOf(sessionWithCounterParty)))
} catch (e: NotaryException) {
throw DoubleSpendException("double spend!", e)
}
}
}
@InitiatedBy(SpendFlowWithCustomException::class)
class AcceptSpendFlowWithCustomException(private val otherSide: FlowSession): FlowLogic<Unit>() {
@Suspendable
override fun call() {
otherSide.receive<String>()
otherSide.send("initial-response")
subFlow(ReceiveFinalityFlow(otherSide))
}
}
class DoubleSpendException(message: String, cause: Throwable): FlowException(message, cause)
}

View File

@ -1,9 +1,13 @@
package net.corda.node.services.statemachine
import net.corda.core.crypto.newSecureRandom
import net.corda.core.flows.FlowException
import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.ReceiveTransactionFlow
import net.corda.core.flows.StateMachineRunId
import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.Party
import net.corda.core.internal.DeclaredField
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.TimedFlow
import net.corda.core.internal.bufferUntilSubscribed
@ -307,10 +311,19 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
object FinalityDoctor : Staff {
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
return if (currentState.flowLogic is FinalityHandler || isFromReceiveFinalityFlow(newError)) {
return if (currentState.flowLogic is FinalityHandler) {
log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " +
"the flow by re-starting the node. State machine state: $currentState", newError)
Diagnosis.OVERNIGHT_OBSERVATION
} else if (isFromReceiveFinalityFlow(newError)) {
if (isErrorPropagatedFromCounterparty(newError) && isErrorThrownDuringReceiveFinality(newError)) {
// no need to keep around the flow, since notarisation has already failed at the counterparty.
Diagnosis.NOT_MY_SPECIALTY
} else {
log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " +
"the flow by re-starting the node. State machine state: $currentState", newError)
Diagnosis.OVERNIGHT_OBSERVATION
}
} else {
Diagnosis.NOT_MY_SPECIALTY
}
@ -319,6 +332,36 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
private fun isFromReceiveFinalityFlow(throwable: Throwable): Boolean {
return throwable.stackTrace.any { it.className == ReceiveFinalityFlow::class.java.name }
}
private fun isErrorPropagatedFromCounterparty(error: Throwable): Boolean {
return when(error) {
is UnexpectedFlowEndException -> {
val peer = DeclaredField<Party?>(UnexpectedFlowEndException::class.java, "peer", error).value
peer != null
}
is FlowException -> {
val peer = DeclaredField<Party?>(FlowException::class.java, "peer", error).value
peer != null
}
else -> false
}
}
/**
* This method will return true if [ReceiveTransactionFlow] is at the top of the stack during the error.
* As a result, if the failure happened during a sub-flow invoked from [ReceiveTransactionFlow], the method will return false.
*
* This is because in the latter case, the transaction might have already been finalised and deleting the flow
* would introduce risk for inconsistency between nodes.
*/
private fun isErrorThrownDuringReceiveFinality(error: Throwable): Boolean {
val strippedStacktrace = error.stackTrace
.filterNot { it?.className?.contains("counter-flow exception from peer") ?: false }
.filterNot { it?.className?.startsWith("net.corda.node.services.statemachine.") ?: false }
return strippedStacktrace.isNotEmpty() &&
strippedStacktrace.first().className.startsWith(ReceiveTransactionFlow::class.qualifiedName!! )
}
}
/**