diff --git a/.ci/api-current.txt b/.ci/api-current.txt index b1923bb3d1..e00001eee5 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -2542,7 +2542,9 @@ public final class net.corda.core.flows.FinalityFlow extends net.corda.core.flow public (net.corda.core.transactions.SignedTransaction, java.util.Collection, java.util.Collection, net.corda.core.utilities.ProgressTracker) public (net.corda.core.transactions.SignedTransaction, java.util.Collection, net.corda.core.node.StatesToRecord) public (net.corda.core.transactions.SignedTransaction, java.util.Collection, net.corda.core.node.StatesToRecord, net.corda.core.utilities.ProgressTracker) + public (net.corda.core.transactions.SignedTransaction, java.util.Collection, net.corda.core.node.StatesToRecord, net.corda.core.utilities.ProgressTracker, int, kotlin.jvm.internal.DefaultConstructorMarker) public (net.corda.core.transactions.SignedTransaction, java.util.Collection, net.corda.core.utilities.ProgressTracker) + public (net.corda.core.transactions.SignedTransaction, java.util.Collection, net.corda.core.utilities.ProgressTracker, int, kotlin.jvm.internal.DefaultConstructorMarker) public (net.corda.core.transactions.SignedTransaction, java.util.Set) public (net.corda.core.transactions.SignedTransaction, java.util.Set, net.corda.core.utilities.ProgressTracker) public (net.corda.core.transactions.SignedTransaction, net.corda.core.flows.FlowSession, net.corda.core.flows.FlowSession...) @@ -3140,7 +3142,6 @@ public final class net.corda.core.flows.ReceiveFinalityFlow extends net.corda.co public (net.corda.core.flows.FlowSession) public (net.corda.core.flows.FlowSession, net.corda.core.crypto.SecureHash) public (net.corda.core.flows.FlowSession, net.corda.core.crypto.SecureHash, net.corda.core.node.StatesToRecord) - public (net.corda.core.flows.FlowSession, net.corda.core.crypto.SecureHash, net.corda.core.node.StatesToRecord, int, kotlin.jvm.internal.DefaultConstructorMarker) @Suspendable @NotNull public net.corda.core.transactions.SignedTransaction call() diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt index 4b26c624fa..d9857a3350 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt @@ -205,10 +205,9 @@ class FinalityFlowTests : WithFinality { assertEquals(TransactionStatus.VERIFIED, txnStatusBob) try { - aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity(), propagateDoubleSpendErrorToPeers = true)).resultFuture.getOrThrow() + aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity(), handlePropagatedNotaryError = true)).resultFuture.getOrThrow() } catch (e: NotaryException) { - // note: ReceiveFinalityFlow un-notarised transaction clean-up takes place upon catching NotaryError.Conflict val stxId = (e.error as NotaryError.Conflict).txId assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) assertTxnRemovedFromDatabase(aliceNode, stxId) @@ -217,15 +216,14 @@ class FinalityFlowTests : WithFinality { } try { - aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity(), propagateDoubleSpendErrorToPeers = false)).resultFuture.getOrThrow() + aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity(), handlePropagatedNotaryError = false)).resultFuture.getOrThrow() } catch (e: NotaryException) { - // note: ReceiveFinalityFlow un-notarised transaction clean-up takes place upon catching UnexpectedFlowEndException val stxId = (e.error as NotaryError.Conflict).txId assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) assertTxnRemovedFromDatabase(aliceNode, stxId) - assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId)) - assertTxnRemovedFromDatabase(bobNode, stxId) + val (_, txnStatus) = bobNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail() + assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnStatus) } } @@ -320,7 +318,7 @@ class FinalityFlowTests : WithFinality { } @Test(timeout=300_000) - fun `two phase finality flow successfully removes un-notarised transaction where initiator fails to send notary signature`() { + fun `two phase finality flow keeps un-notarised transaction where initiator fails to send notary signature`() { val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY) val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow() @@ -329,10 +327,8 @@ class FinalityFlowTests : WithFinality { } catch (e: UnexpectedFlowEndException) { val stxId = SecureHash.parse(e.message) - assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) - assertTxnRemovedFromDatabase(aliceNode, stxId) - assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId)) - assertTxnRemovedFromDatabase(bobNode, stxId) + val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail() + assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnStatusBob) } } @@ -352,15 +348,15 @@ class FinalityFlowTests : WithFinality { @StartableByRPC @InitiatingFlow class SpendFlow(private val stateAndRef: StateAndRef, private val newOwner: Party, - private val propagateDoubleSpendErrorToPeers: Boolean? = null) : FlowLogic() { + private val handlePropagatedNotaryError: Boolean = false) : FlowLogic() { @Suspendable override fun call(): SignedTransaction { val txBuilder = DummyContract.move(stateAndRef, newOwner) val signedTransaction = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey) val sessionWithCounterParty = initiateFlow(newOwner) - sessionWithCounterParty.sendAndReceive("initial-message") - return subFlow(FinalityFlow(signedTransaction, setOf(sessionWithCounterParty), propagateDoubleSpendErrorToPeers = propagateDoubleSpendErrorToPeers)) + sessionWithCounterParty.send(handlePropagatedNotaryError) + return subFlow(FinalityFlow(signedTransaction, setOf(sessionWithCounterParty))) } } @@ -369,10 +365,8 @@ class FinalityFlowTests : WithFinality { @Suspendable override fun call() { - otherSide.receive() - otherSide.send("initial-response") - - subFlow(ReceiveFinalityFlow(otherSide)) + val handleNotaryError = otherSide.receive().unwrap { it } + subFlow(ReceiveFinalityFlow(otherSide, handlePropagatedNotaryError = handleNotaryError)) } } diff --git a/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt b/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt index 2c738549bd..6cdbc2b832 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt @@ -55,14 +55,13 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, override val progressTracker: ProgressTracker, private val sessions: Collection, private val newApi: Boolean, - private val statesToRecord: StatesToRecord = ONLY_RELEVANT, - private val propagateDoubleSpendErrorToPeers: Boolean? = null) : FlowLogic() { + private val statesToRecord: StatesToRecord = ONLY_RELEVANT) : FlowLogic() { @CordaInternal - data class ExtraConstructorArgs(val oldParticipants: Collection, val sessions: Collection, val newApi: Boolean, val statesToRecord: StatesToRecord, val propagateDoubleSpendErrorToPeers: Boolean?) + data class ExtraConstructorArgs(val oldParticipants: Collection, val sessions: Collection, val newApi: Boolean, val statesToRecord: StatesToRecord) @CordaInternal - fun getExtraConstructorArgs() = ExtraConstructorArgs(oldParticipants, sessions, newApi, statesToRecord, propagateDoubleSpendErrorToPeers) + fun getExtraConstructorArgs() = ExtraConstructorArgs(oldParticipants, sessions, newApi, statesToRecord) @Deprecated(DEPRECATION_MSG) constructor(transaction: SignedTransaction, extraRecipients: Set, progressTracker: ProgressTracker) : this( @@ -91,15 +90,13 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, * @param transaction What to commit. * @param sessions A collection of [FlowSession]s for each non-local participant of the transaction. Sessions to non-participants can * also be provided. - * @param propagateDoubleSpendErrorToPeers Whether to catch and propagate Double Spend exception to peers. */ @JvmOverloads constructor( transaction: SignedTransaction, sessions: Collection, - progressTracker: ProgressTracker = tracker(), - propagateDoubleSpendErrorToPeers: Boolean? = null - ) : this(transaction, emptyList(), progressTracker, sessions, true, propagateDoubleSpendErrorToPeers = propagateDoubleSpendErrorToPeers) + progressTracker: ProgressTracker = tracker() + ) : this(transaction, emptyList(), progressTracker, sessions, true) /** * Notarise the given transaction and broadcast it to all the participants. @@ -108,16 +105,14 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, * @param sessions A collection of [FlowSession]s for each non-local participant of the transaction. Sessions to non-participants can * also be provided. * @param statesToRecord Which states to commit to the vault. - * @param propagateDoubleSpendErrorToPeers Whether to catch and propagate Double Spend exception to peers. */ @JvmOverloads constructor( transaction: SignedTransaction, sessions: Collection, statesToRecord: StatesToRecord, - progressTracker: ProgressTracker = tracker(), - propagateDoubleSpendErrorToPeers: Boolean? = null - ) : this(transaction, emptyList(), progressTracker, sessions, true, statesToRecord, propagateDoubleSpendErrorToPeers = propagateDoubleSpendErrorToPeers) + progressTracker: ProgressTracker = tracker() + ) : this(transaction, emptyList(), progressTracker, sessions, true, statesToRecord) /** * Notarise the given transaction and broadcast it to all the participants. @@ -154,13 +149,13 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, @Suppress("ClassNaming") object BROADCASTING_POST_NOTARISATION : ProgressTracker.Step("Broadcasting notary signature") @Suppress("ClassNaming") - object BROADCASTING_DOUBLE_SPEND_ERROR : ProgressTracker.Step("Broadcasting notary double spend error") + object BROADCASTING_NOTARY_ERROR : ProgressTracker.Step("Broadcasting notary error") @Suppress("ClassNaming") object FINALISING_TRANSACTION : ProgressTracker.Step("Finalising transaction locally") object BROADCASTING : ProgressTracker.Step("Broadcasting notarised transaction to other participants") @JvmStatic - fun tracker() = ProgressTracker(RECORD_UNNOTARISED, BROADCASTING_PRE_NOTARISATION, NOTARISING, BROADCASTING_POST_NOTARISATION, BROADCASTING_DOUBLE_SPEND_ERROR, FINALISING_TRANSACTION, BROADCASTING) + fun tracker() = ProgressTracker(RECORD_UNNOTARISED, BROADCASTING_PRE_NOTARISATION, NOTARISING, BROADCASTING_POST_NOTARISATION, BROADCASTING_NOTARY_ERROR, FINALISING_TRANSACTION, BROADCASTING) } @Suspendable @@ -235,12 +230,10 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, return stxn } catch (e: NotaryException) { - if (e.error is NotaryError.Conflict && useTwoPhaseFinality) { - (serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(e.error.txId) - val overridePropagateDoubleSpendErrorToPeers = propagateDoubleSpendErrorToPeers ?: - (serviceHub.cordappProvider.getAppContext().cordapp.targetPlatformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY) - if (overridePropagateDoubleSpendErrorToPeers && newPlatformSessions.isNotEmpty()) { - broadcastDoubleSpendError(newPlatformSessions, e) + if (useTwoPhaseFinality) { + (serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(transaction.id) + if (newPlatformSessions.isNotEmpty()) { + broadcastNotaryError(newPlatformSessions, e) } else sleep(Duration.ZERO) // force checkpoint to persist db update. } throw e @@ -298,17 +291,17 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, } @Suspendable - private fun broadcastDoubleSpendError(sessions: Collection, error: NotaryException) { - progressTracker.currentStep = BROADCASTING_DOUBLE_SPEND_ERROR + private fun broadcastNotaryError(sessions: Collection, error: NotaryException) { + progressTracker.currentStep = BROADCASTING_NOTARY_ERROR serviceHub.telemetryServiceInternal.span("${this::class.java.name}#broadcastDoubleSpendError", flowLogic = this) { - logger.info("Broadcasting notary double spend error.") + logger.info("Broadcasting notary error.") sessions.forEach { session -> try { - logger.debug { "Sending notary double spend error to party $session." } + logger.debug { "Sending notary error to party $session." } session.send(Try.Failure>(error)) } catch (e: UnexpectedFlowEndException) { throw UnexpectedFlowEndException( - "${session.counterparty} has finished prematurely and we're trying to send them a notary double spend error. " + + "${session.counterparty} has finished prematurely and we're trying to send them a notary error. " + "Did they forget to call ReceiveFinalityFlow? (${e.message})", e.cause, e.originalErrorId @@ -457,10 +450,12 @@ object NotarySigCheck { * @param expectedTxId Expected ID of the transaction that's about to be received. This is typically retrieved from * [SignTransactionFlow]. Setting it to null disables the expected transaction ID check. * @param statesToRecord Which states to commit to the vault. Defaults to [StatesToRecord.ONLY_RELEVANT]. + * @param handlePropagatedNotaryError Whether to catch and propagate Double Spend exception to peers. */ class ReceiveFinalityFlow @JvmOverloads constructor(private val otherSideSession: FlowSession, private val expectedTxId: SecureHash? = null, - private val statesToRecord: StatesToRecord = ONLY_RELEVANT) : FlowLogic() { + private val statesToRecord: StatesToRecord = ONLY_RELEVANT, + private val handlePropagatedNotaryError: Boolean? = null) : FlowLogic() { @Suppress("ComplexMethod") @Suspendable override fun call(): SignedTransaction { @@ -483,21 +478,18 @@ class ReceiveFinalityFlow @JvmOverloads constructor(private val otherSideSession (serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx, notarySignatures, statesToRecord) logger.info("Peer finalised transaction with notary signature.") } - } catch(throwable: NotaryException) { - if(throwable.error is NotaryError.Conflict) { - logger.info("Peer received double spend error.") + } catch (e: NotaryException) { + logger.info("Peer received notary error.") + val overrideHandlePropagatedNotaryError = handlePropagatedNotaryError ?: + (serviceHub.cordappProvider.getAppContext().cordapp.targetPlatformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY) + if (overrideHandlePropagatedNotaryError) { (serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(stx.id) sleep(Duration.ZERO) // force checkpoint to persist db update. + throw e + } + else { + otherSideSession.receive() // simulate unexpected flow end } - throw throwable - } catch (e: UnexpectedFlowEndException) { - (serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(stx.id) - sleep(Duration.ZERO) // force checkpoint to persist db update. - throw UnexpectedFlowEndException( - "${otherSideSession.counterparty} has finished prematurely whilst awaiting transaction notary signature.", - e.cause, - e.originalErrorId - ) } } else { serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordTransactions", flowLogic = this) { diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt index b90f23437e..67a8f8144b 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt @@ -249,6 +249,7 @@ class FlowHospitalTest { it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds) } waitForAllFlowsToComplete(nodeAHandle) + waitForAllFlowsToComplete(nodeBHandle) } // 1 is the notary failing to notarise and propagating the error // 2 is the receiving flow failing due to the unexpected session end error @@ -321,6 +322,8 @@ class FlowHospitalTest { it.startFlow(::SpendStateAndCatchDoubleSpendOldFinalityFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds) it.startFlow(::SpendStateAndCatchDoubleSpendOldFinalityFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds) } + waitForAllFlowsToComplete(nodeAHandle) + waitForAllFlowsToComplete(nodeBHandle) } // 1 is the notary failing to notarise and propagating the error // 2 is the receiving flow failing due to the unexpected session end error @@ -378,6 +381,7 @@ class FlowHospitalTest { it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref, true).returnValue.getOrThrow(20.seconds) } waitForAllFlowsToComplete(nodeAHandle) + waitForAllFlowsToComplete(nodeBHandle) } // 1 is the notary failing to notarise and propagating the error assertEquals(1, dischargedCounter)