CORDA-2228: Exceptions emanating from ReceiveFinalityFlow are sent to the flow hospital (#4621)

This commit is contained in:
Shams Asari 2019-01-23 16:24:49 +00:00 committed by GitHub
parent e93327bb6a
commit 5bb5244e55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 117 additions and 38 deletions

View File

@ -0,0 +1,81 @@
package net.corda.core.flows
import net.corda.core.contracts.FungibleAsset
import net.corda.core.contracts.TransactionVerificationException
import net.corda.core.node.services.queryBy
import net.corda.core.toFuture
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.finance.GBP
import net.corda.finance.POUNDS
import net.corda.finance.contracts.getCashBalance
import net.corda.finance.flows.CashIssueAndPaymentFlow
import net.corda.finance.flows.CashPaymentReceiverFlow
import net.corda.node.services.statemachine.StaffedFlowHospital.*
import net.corda.node.services.statemachine.StaffedFlowHospital.MedicalRecord.Flow
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.node.MockNetworkNotarySpec
import net.corda.testing.node.internal.*
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Test
import rx.Observable
class ReceiveFinalityFlowTest {
private val mockNet = InternalMockNetwork(notarySpecs = listOf(MockNetworkNotarySpec(DUMMY_NOTARY_NAME, validating = false)))
@After
fun cleanUp() {
mockNet.stopNodes()
}
@Test
fun `sent to flow hospital on error and retry on node restart`() {
val alice = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME, additionalCordapps = FINANCE_CORDAPPS))
// Bob initially does not have the finance contracts CorDapp so that it can throw an exception in ReceiveFinalityFlow when receiving
// the payment from Alice
var bob = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME, additionalCordapps = listOf(FINANCE_WORKFLOWS_CORDAPP)))
val paymentReceiverFuture = bob.smm.track().updates.filter { it.logic is CashPaymentReceiverFlow }.map { it.logic.runId }.toFuture()
alice.services.startFlow(CashIssueAndPaymentFlow(
100.POUNDS,
OpaqueBytes.of(0),
bob.info.singleIdentity(),
false,
mockNet.defaultNotaryIdentity
))
mockNet.runNetwork()
val paymentReceiverId = paymentReceiverFuture.getOrThrow()
assertThat(bob.services.vaultService.queryBy<FungibleAsset<*>>().states).isEmpty()
bob.assertFlowSentForObservationDueToConstraintError(paymentReceiverId)
// Restart Bob with the contracts CorDapp so that it can recover from the error
bob = mockNet.restartNode(bob, parameters = InternalMockNodeParameters(additionalCordapps = listOf(FINANCE_CONTRACTS_CORDAPP)))
mockNet.runNetwork()
assertThat(bob.services.getCashBalance(GBP)).isEqualTo(100.POUNDS)
}
private inline fun <reified R : MedicalRecord> TestStartedNode.medicalRecordsOfType(): Observable<R> {
return smm
.flowHospital
.track()
.let { it.updates.startWith(it.snapshot) }
.ofType(R::class.java)
}
private fun TestStartedNode.assertFlowSentForObservationDueToConstraintError(runId: StateMachineRunId) {
val observation = medicalRecordsOfType<Flow>()
.filter { it.flowId == runId }
.toBlocking()
.first()
assertThat(observation.outcome).isEqualTo(Outcome.OVERNIGHT_OBSERVATION)
assertThat(observation.by).contains(FinalityDoctor)
val error = observation.errors.single()
assertThat(error).isInstanceOf(TransactionVerificationException.ContractConstraintRejection::class.java)
}
}

View File

@ -12,7 +12,7 @@ import net.corda.core.node.StatesToRecord
import net.corda.core.transactions.ContractUpgradeWireTransaction import net.corda.core.transactions.ContractUpgradeWireTransaction
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
class FinalityHandler(val sender: FlowSession) : FlowLogic<Unit>() { class FinalityHandler(private val sender: FlowSession) : FlowLogic<Unit>() {
@Suspendable @Suspendable
override fun call() { override fun call() {
subFlow(ReceiveTransactionFlow(sender, true, StatesToRecord.ONLY_RELEVANT)) subFlow(ReceiveTransactionFlow(sender, true, StatesToRecord.ONLY_RELEVANT))

View File

@ -1,6 +1,7 @@
package net.corda.node.services.statemachine package net.corda.node.services.statemachine
import net.corda.core.crypto.newSecureRandom import net.corda.core.crypto.newSecureRandom
import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.ThreadBox import net.corda.core.internal.ThreadBox
@ -298,23 +299,19 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
} }
} }
/**
* Parks [FinalityHandler]s for observation.
*/
object FinalityDoctor : Staff { object FinalityDoctor : Staff {
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis { override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
return if (currentState.flowLogic is FinalityHandler) { return if (currentState.flowLogic is FinalityHandler || isFromReceiveFinalityFlow(newError)) {
warn(currentState.flowLogic, flowFiber, currentState) log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " +
"the flow by re-starting the node. State machine state: $currentState")
Diagnosis.OVERNIGHT_OBSERVATION Diagnosis.OVERNIGHT_OBSERVATION
} else { } else {
Diagnosis.NOT_MY_SPECIALTY Diagnosis.NOT_MY_SPECIALTY
} }
} }
private fun warn(flowLogic: FinalityHandler, flowFiber: FlowFiber, currentState: StateMachineState) { private fun isFromReceiveFinalityFlow(throwable: Throwable): Boolean {
log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " + return throwable.stackTrace.any { it.className == ReceiveFinalityFlow::class.java.name }
"the flow by re-starting the node. State machine state: $currentState, initiating party was: " +
"${flowLogic.sender.counterparty}")
} }
} }
} }

View File

@ -272,25 +272,25 @@ class FlowFrameworkTests {
.addCommand(dummyCommand(alice.owningKey)) .addCommand(dummyCommand(alice.owningKey))
val stx = aliceNode.services.signInitialTransaction(ptx) val stx = aliceNode.services.signInitialTransaction(ptx)
val committerStx = aliceNode.registerCordappFlowFactory(CommitReceiverFlow::class) { val committerStx = aliceNode.registerCordappFlowFactory(CommitterFlow::class) {
CommitterFlow(it) CommitReceiverFlow(it, stx.id)
}.flatMap { it.stateMachine.resultFuture } }.flatMap { it.stateMachine.resultFuture }
// The waitForLedgerCommit call has to occur on separate flow // The waitForLedgerCommit call has to occur on separate flow
val waiterStx = bobNode.services.startFlow(WaiterFlow(stx.id)).resultFuture val waiterStx = bobNode.services.startFlow(WaitForLedgerCommitFlow(stx.id)).resultFuture
val commitReceiverStx = bobNode.services.startFlow(CommitReceiverFlow(stx, alice)).resultFuture val commitReceiverStx = bobNode.services.startFlow(CommitterFlow(stx, alice)).resultFuture
mockNet.runNetwork() mockNet.runNetwork()
assertThat(committerStx.getOrThrow()).isEqualTo(waiterStx.getOrThrow()).isEqualTo(commitReceiverStx.getOrThrow()) assertThat(committerStx.getOrThrow()).isEqualTo(waiterStx.getOrThrow()).isEqualTo(commitReceiverStx.getOrThrow())
} }
@Test @Test
fun `committer throws exception before calling the finality flow`() { fun `waitForLedgerCommit throws exception if any active session ends in error`() {
val ptx = TransactionBuilder(notary = notaryIdentity) val ptx = TransactionBuilder(notary = notaryIdentity)
.addOutputState(DummyState(), DummyContract.PROGRAM_ID) .addOutputState(DummyState(), DummyContract.PROGRAM_ID)
.addCommand(dummyCommand()) .addCommand(dummyCommand())
val stx = aliceNode.services.signInitialTransaction(ptx) val stx = aliceNode.services.signInitialTransaction(ptx)
aliceNode.registerCordappFlowFactory(CommitReceiverFlow::class) { CommitterFlow(it) { throw Exception("Error") } } aliceNode.registerCordappFlowFactory(WaitForLedgerCommitFlow::class) { ExceptionFlow { throw Exception("Error") } }
val waiter = bobNode.services.startFlow(CommitReceiverFlow(stx, alice)).resultFuture val waiter = bobNode.services.startFlow(WaitForLedgerCommitFlow(stx.id, alice)).resultFuture
mockNet.runNetwork() mockNet.runNetwork()
assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy { assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy {
waiter.getOrThrow() waiter.getOrThrow()
@ -357,7 +357,7 @@ class FlowFrameworkTests {
} }
@Test @Test
fun `session init with unknown class is sent to the flow hospital, from where it's dropped`() { fun `session init with unknown class is sent to the flow hospital, from where we then drop it`() {
aliceNode.sendSessionMessage(InitialSessionMessage(SessionId(random63BitValue()), 0, "not.a.real.Class", 1, "", null), bob) aliceNode.sendSessionMessage(InitialSessionMessage(SessionId(random63BitValue()), 0, "not.a.real.Class", 1, "", null), bob)
mockNet.runNetwork() mockNet.runNetwork()
assertThat(receivedSessionMessages).hasSize(1) // Only the session-init is expected as the session-reject is blocked by the flow hospital assertThat(receivedSessionMessages).hasSize(1) // Only the session-init is expected as the session-reject is blocked by the flow hospital
@ -484,28 +484,29 @@ class FlowFrameworkTests {
} }
} }
class WaiterFlow(private val txId: SecureHash) : FlowLogic<SignedTransaction>() { @InitiatingFlow
class WaitForLedgerCommitFlow(private val txId: SecureHash, private val party: Party? = null) : FlowLogic<SignedTransaction>() {
@Suspendable @Suspendable
override fun call(): SignedTransaction = waitForLedgerCommit(txId) override fun call(): SignedTransaction {
if (party != null) {
initiateFlow(party).send(Unit)
}
return waitForLedgerCommit(txId)
}
} }
@InitiatingFlow @InitiatingFlow
class CommitReceiverFlow(val stx: SignedTransaction, private val otherParty: Party) : FlowLogic<SignedTransaction>() { class CommitterFlow(private val stx: SignedTransaction, private val otherParty: Party) : FlowLogic<SignedTransaction>() {
@Suspendable @Suspendable
override fun call(): SignedTransaction { override fun call(): SignedTransaction {
val otherPartySession = initiateFlow(otherParty) val session = initiateFlow(otherParty)
otherPartySession.send(stx) return subFlow(FinalityFlow(stx, session))
return subFlow(ReceiveFinalityFlow(otherPartySession, expectedTxId = stx.id))
} }
} }
class CommitterFlow(private val otherPartySession: FlowSession, private val throwException: (() -> Exception)? = null) : FlowLogic<SignedTransaction>() { class CommitReceiverFlow(private val otherSide: FlowSession, private val txId: SecureHash) : FlowLogic<SignedTransaction>() {
@Suspendable @Suspendable
override fun call(): SignedTransaction { override fun call(): SignedTransaction = subFlow(ReceiveFinalityFlow(otherSide, expectedTxId = txId))
val stx = otherPartySession.receive<SignedTransaction>().unwrap { it }
if (throwException != null) throw throwException.invoke()
return subFlow(FinalityFlow(stx, otherPartySession))
}
} }
private class LazyServiceHubAccessFlow : FlowLogic<Unit>() { private class LazyServiceHubAccessFlow : FlowLogic<Unit>() {

View File

@ -1,7 +1,5 @@
package net.corda.testing.node.internal package net.corda.testing.node.internal
import com.google.common.jimfs.Configuration.unix
import com.google.common.jimfs.Jimfs
import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.DoNotImplement import net.corda.core.DoNotImplement
@ -164,7 +162,6 @@ open class InternalMockNetwork(cordappPackages: List<String> = emptyList(),
var nextNodeId = 0 var nextNodeId = 0
private set private set
private val filesystem = Jimfs.newFileSystem(unix())
private val busyLatch = ReusableLatch() private val busyLatch = ReusableLatch()
val messagingNetwork = InMemoryMessagingNetwork.create(networkSendManuallyPumped, servicePeerAllocationStrategy, busyLatch) val messagingNetwork = InMemoryMessagingNetwork.create(networkSendManuallyPumped, servicePeerAllocationStrategy, busyLatch)
// A unique identifier for this network to segregate databases with the same nodeID but different networks. // A unique identifier for this network to segregate databases with the same nodeID but different networks.
@ -231,7 +228,6 @@ open class InternalMockNetwork(cordappPackages: List<String> = emptyList(),
init { init {
try { try {
filesystem.getPath("/nodes").createDirectory()
val notaryInfos = generateNotaryIdentities() val notaryInfos = generateNotaryIdentities()
networkParameters = initialNetworkParameters.copy(notaries = notaryInfos) networkParameters = initialNetworkParameters.copy(notaries = notaryInfos)
// The network parameters must be serialised before starting any of the nodes // The network parameters must be serialised before starting any of the nodes
@ -478,16 +474,20 @@ open class InternalMockNetwork(cordappPackages: List<String> = emptyList(),
return node return node
} }
fun restartNode(node: TestStartedNode, nodeFactory: (MockNodeArgs) -> MockNode): TestStartedNode { fun restartNode(
node: TestStartedNode,
parameters: InternalMockNodeParameters = InternalMockNodeParameters(),
nodeFactory: (MockNodeArgs) -> MockNode = defaultFactory
): TestStartedNode {
node.internals.disableDBCloseOnStop() node.internals.disableDBCloseOnStop()
node.dispose() node.dispose()
return createNode( return createNode(
InternalMockNodeParameters(legalName = node.internals.configuration.myLegalName, forcedID = node.internals.id), parameters.copy(legalName = node.internals.configuration.myLegalName, forcedID = node.internals.id),
nodeFactory nodeFactory
) )
} }
fun restartNode(node: TestStartedNode): TestStartedNode = restartNode(node, defaultFactory) fun baseDirectory(node: TestStartedNode): Path = baseDirectory(node.internals.id)
fun baseDirectory(nodeId: Int): Path = testDirectory / "nodes/$nodeId" fun baseDirectory(nodeId: Int): Path = testDirectory / "nodes/$nodeId"

View File

@ -64,7 +64,7 @@ val FINANCE_CONTRACTS_CORDAPP: TestCordappImpl = findCordapp("net.corda.finance.
val FINANCE_WORKFLOWS_CORDAPP: TestCordappImpl = findCordapp("net.corda.finance.flows") val FINANCE_WORKFLOWS_CORDAPP: TestCordappImpl = findCordapp("net.corda.finance.flows")
@JvmField @JvmField
val FINANCE_CORDAPPS: Set<TestCordappInternal> = setOf(FINANCE_CONTRACTS_CORDAPP, FINANCE_WORKFLOWS_CORDAPP) val FINANCE_CORDAPPS: Set<TestCordappImpl> = setOf(FINANCE_CONTRACTS_CORDAPP, FINANCE_WORKFLOWS_CORDAPP)
/** /**
* *Custom* CorDapp containing the contents of the `net.corda.testing.contracts` package, i.e. the dummy contracts. This is not a real CorDapp * *Custom* CorDapp containing the contents of the `net.corda.testing.contracts` package, i.e. the dummy contracts. This is not a real CorDapp