ENT-9147 Propagate and handle Notary Error (Part 2) (#7346)

This commit is contained in:
Jose Coll 2023-04-26 09:06:32 +01:00 committed by GitHub
parent 5f685be474
commit 1d4feedc62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 48 additions and 57 deletions

View File

@ -2542,7 +2542,9 @@ public final class net.corda.core.flows.FinalityFlow extends net.corda.core.flow
public <init>(net.corda.core.transactions.SignedTransaction, java.util.Collection<? extends net.corda.core.flows.FlowSession>, java.util.Collection<net.corda.core.identity.Party>, net.corda.core.utilities.ProgressTracker)
public <init>(net.corda.core.transactions.SignedTransaction, java.util.Collection<? extends net.corda.core.flows.FlowSession>, net.corda.core.node.StatesToRecord)
public <init>(net.corda.core.transactions.SignedTransaction, java.util.Collection<? extends net.corda.core.flows.FlowSession>, net.corda.core.node.StatesToRecord, net.corda.core.utilities.ProgressTracker)
public <init>(net.corda.core.transactions.SignedTransaction, java.util.Collection, net.corda.core.node.StatesToRecord, net.corda.core.utilities.ProgressTracker, int, kotlin.jvm.internal.DefaultConstructorMarker)
public <init>(net.corda.core.transactions.SignedTransaction, java.util.Collection<? extends net.corda.core.flows.FlowSession>, net.corda.core.utilities.ProgressTracker)
public <init>(net.corda.core.transactions.SignedTransaction, java.util.Collection, net.corda.core.utilities.ProgressTracker, int, kotlin.jvm.internal.DefaultConstructorMarker)
public <init>(net.corda.core.transactions.SignedTransaction, java.util.Set<net.corda.core.identity.Party>)
public <init>(net.corda.core.transactions.SignedTransaction, java.util.Set<net.corda.core.identity.Party>, net.corda.core.utilities.ProgressTracker)
public <init>(net.corda.core.transactions.SignedTransaction, net.corda.core.flows.FlowSession, net.corda.core.flows.FlowSession...)
@ -3140,7 +3142,6 @@ public final class net.corda.core.flows.ReceiveFinalityFlow extends net.corda.co
public <init>(net.corda.core.flows.FlowSession)
public <init>(net.corda.core.flows.FlowSession, net.corda.core.crypto.SecureHash)
public <init>(net.corda.core.flows.FlowSession, net.corda.core.crypto.SecureHash, net.corda.core.node.StatesToRecord)
public <init>(net.corda.core.flows.FlowSession, net.corda.core.crypto.SecureHash, net.corda.core.node.StatesToRecord, int, kotlin.jvm.internal.DefaultConstructorMarker)
@Suspendable
@NotNull
public net.corda.core.transactions.SignedTransaction call()

View File

@ -205,10 +205,9 @@ class FinalityFlowTests : WithFinality {
assertEquals(TransactionStatus.VERIFIED, txnStatusBob)
try {
aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity(), propagateDoubleSpendErrorToPeers = true)).resultFuture.getOrThrow()
aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity(), handlePropagatedNotaryError = true)).resultFuture.getOrThrow()
}
catch (e: NotaryException) {
// note: ReceiveFinalityFlow un-notarised transaction clean-up takes place upon catching NotaryError.Conflict
val stxId = (e.error as NotaryError.Conflict).txId
assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId))
assertTxnRemovedFromDatabase(aliceNode, stxId)
@ -217,15 +216,14 @@ class FinalityFlowTests : WithFinality {
}
try {
aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity(), propagateDoubleSpendErrorToPeers = false)).resultFuture.getOrThrow()
aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity(), handlePropagatedNotaryError = false)).resultFuture.getOrThrow()
}
catch (e: NotaryException) {
// note: ReceiveFinalityFlow un-notarised transaction clean-up takes place upon catching UnexpectedFlowEndException
val stxId = (e.error as NotaryError.Conflict).txId
assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId))
assertTxnRemovedFromDatabase(aliceNode, stxId)
assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId))
assertTxnRemovedFromDatabase(bobNode, stxId)
val (_, txnStatus) = bobNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail()
assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnStatus)
}
}
@ -320,7 +318,7 @@ class FinalityFlowTests : WithFinality {
}
@Test(timeout=300_000)
fun `two phase finality flow successfully removes un-notarised transaction where initiator fails to send notary signature`() {
fun `two phase finality flow keeps un-notarised transaction where initiator fails to send notary signature`() {
val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY)
val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow()
@ -329,10 +327,8 @@ class FinalityFlowTests : WithFinality {
}
catch (e: UnexpectedFlowEndException) {
val stxId = SecureHash.parse(e.message)
assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId))
assertTxnRemovedFromDatabase(aliceNode, stxId)
assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId))
assertTxnRemovedFromDatabase(bobNode, stxId)
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail()
assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnStatusBob)
}
}
@ -352,15 +348,15 @@ class FinalityFlowTests : WithFinality {
@StartableByRPC
@InitiatingFlow
class SpendFlow(private val stateAndRef: StateAndRef<DummyContract.SingleOwnerState>, private val newOwner: Party,
private val propagateDoubleSpendErrorToPeers: Boolean? = null) : FlowLogic<SignedTransaction>() {
private val handlePropagatedNotaryError: Boolean = false) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
val txBuilder = DummyContract.move(stateAndRef, newOwner)
val signedTransaction = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey)
val sessionWithCounterParty = initiateFlow(newOwner)
sessionWithCounterParty.sendAndReceive<String>("initial-message")
return subFlow(FinalityFlow(signedTransaction, setOf(sessionWithCounterParty), propagateDoubleSpendErrorToPeers = propagateDoubleSpendErrorToPeers))
sessionWithCounterParty.send(handlePropagatedNotaryError)
return subFlow(FinalityFlow(signedTransaction, setOf(sessionWithCounterParty)))
}
}
@ -369,10 +365,8 @@ class FinalityFlowTests : WithFinality {
@Suspendable
override fun call() {
otherSide.receive<String>()
otherSide.send("initial-response")
subFlow(ReceiveFinalityFlow(otherSide))
val handleNotaryError = otherSide.receive<Boolean>().unwrap { it }
subFlow(ReceiveFinalityFlow(otherSide, handlePropagatedNotaryError = handleNotaryError))
}
}

View File

@ -55,14 +55,13 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
override val progressTracker: ProgressTracker,
private val sessions: Collection<FlowSession>,
private val newApi: Boolean,
private val statesToRecord: StatesToRecord = ONLY_RELEVANT,
private val propagateDoubleSpendErrorToPeers: Boolean? = null) : FlowLogic<SignedTransaction>() {
private val statesToRecord: StatesToRecord = ONLY_RELEVANT) : FlowLogic<SignedTransaction>() {
@CordaInternal
data class ExtraConstructorArgs(val oldParticipants: Collection<Party>, val sessions: Collection<FlowSession>, val newApi: Boolean, val statesToRecord: StatesToRecord, val propagateDoubleSpendErrorToPeers: Boolean?)
data class ExtraConstructorArgs(val oldParticipants: Collection<Party>, val sessions: Collection<FlowSession>, val newApi: Boolean, val statesToRecord: StatesToRecord)
@CordaInternal
fun getExtraConstructorArgs() = ExtraConstructorArgs(oldParticipants, sessions, newApi, statesToRecord, propagateDoubleSpendErrorToPeers)
fun getExtraConstructorArgs() = ExtraConstructorArgs(oldParticipants, sessions, newApi, statesToRecord)
@Deprecated(DEPRECATION_MSG)
constructor(transaction: SignedTransaction, extraRecipients: Set<Party>, progressTracker: ProgressTracker) : this(
@ -91,15 +90,13 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
* @param transaction What to commit.
* @param sessions A collection of [FlowSession]s for each non-local participant of the transaction. Sessions to non-participants can
* also be provided.
* @param propagateDoubleSpendErrorToPeers Whether to catch and propagate Double Spend exception to peers.
*/
@JvmOverloads
constructor(
transaction: SignedTransaction,
sessions: Collection<FlowSession>,
progressTracker: ProgressTracker = tracker(),
propagateDoubleSpendErrorToPeers: Boolean? = null
) : this(transaction, emptyList(), progressTracker, sessions, true, propagateDoubleSpendErrorToPeers = propagateDoubleSpendErrorToPeers)
progressTracker: ProgressTracker = tracker()
) : this(transaction, emptyList(), progressTracker, sessions, true)
/**
* Notarise the given transaction and broadcast it to all the participants.
@ -108,16 +105,14 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
* @param sessions A collection of [FlowSession]s for each non-local participant of the transaction. Sessions to non-participants can
* also be provided.
* @param statesToRecord Which states to commit to the vault.
* @param propagateDoubleSpendErrorToPeers Whether to catch and propagate Double Spend exception to peers.
*/
@JvmOverloads
constructor(
transaction: SignedTransaction,
sessions: Collection<FlowSession>,
statesToRecord: StatesToRecord,
progressTracker: ProgressTracker = tracker(),
propagateDoubleSpendErrorToPeers: Boolean? = null
) : this(transaction, emptyList(), progressTracker, sessions, true, statesToRecord, propagateDoubleSpendErrorToPeers = propagateDoubleSpendErrorToPeers)
progressTracker: ProgressTracker = tracker()
) : this(transaction, emptyList(), progressTracker, sessions, true, statesToRecord)
/**
* Notarise the given transaction and broadcast it to all the participants.
@ -154,13 +149,13 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
@Suppress("ClassNaming")
object BROADCASTING_POST_NOTARISATION : ProgressTracker.Step("Broadcasting notary signature")
@Suppress("ClassNaming")
object BROADCASTING_DOUBLE_SPEND_ERROR : ProgressTracker.Step("Broadcasting notary double spend error")
object BROADCASTING_NOTARY_ERROR : ProgressTracker.Step("Broadcasting notary error")
@Suppress("ClassNaming")
object FINALISING_TRANSACTION : ProgressTracker.Step("Finalising transaction locally")
object BROADCASTING : ProgressTracker.Step("Broadcasting notarised transaction to other participants")
@JvmStatic
fun tracker() = ProgressTracker(RECORD_UNNOTARISED, BROADCASTING_PRE_NOTARISATION, NOTARISING, BROADCASTING_POST_NOTARISATION, BROADCASTING_DOUBLE_SPEND_ERROR, FINALISING_TRANSACTION, BROADCASTING)
fun tracker() = ProgressTracker(RECORD_UNNOTARISED, BROADCASTING_PRE_NOTARISATION, NOTARISING, BROADCASTING_POST_NOTARISATION, BROADCASTING_NOTARY_ERROR, FINALISING_TRANSACTION, BROADCASTING)
}
@Suspendable
@ -235,12 +230,10 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
return stxn
}
catch (e: NotaryException) {
if (e.error is NotaryError.Conflict && useTwoPhaseFinality) {
(serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(e.error.txId)
val overridePropagateDoubleSpendErrorToPeers = propagateDoubleSpendErrorToPeers ?:
(serviceHub.cordappProvider.getAppContext().cordapp.targetPlatformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY)
if (overridePropagateDoubleSpendErrorToPeers && newPlatformSessions.isNotEmpty()) {
broadcastDoubleSpendError(newPlatformSessions, e)
if (useTwoPhaseFinality) {
(serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(transaction.id)
if (newPlatformSessions.isNotEmpty()) {
broadcastNotaryError(newPlatformSessions, e)
} else sleep(Duration.ZERO) // force checkpoint to persist db update.
}
throw e
@ -298,17 +291,17 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
}
@Suspendable
private fun broadcastDoubleSpendError(sessions: Collection<FlowSession>, error: NotaryException) {
progressTracker.currentStep = BROADCASTING_DOUBLE_SPEND_ERROR
private fun broadcastNotaryError(sessions: Collection<FlowSession>, error: NotaryException) {
progressTracker.currentStep = BROADCASTING_NOTARY_ERROR
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#broadcastDoubleSpendError", flowLogic = this) {
logger.info("Broadcasting notary double spend error.")
logger.info("Broadcasting notary error.")
sessions.forEach { session ->
try {
logger.debug { "Sending notary double spend error to party $session." }
logger.debug { "Sending notary error to party $session." }
session.send(Try.Failure<List<TransactionSignature>>(error))
} catch (e: UnexpectedFlowEndException) {
throw UnexpectedFlowEndException(
"${session.counterparty} has finished prematurely and we're trying to send them a notary double spend error. " +
"${session.counterparty} has finished prematurely and we're trying to send them a notary error. " +
"Did they forget to call ReceiveFinalityFlow? (${e.message})",
e.cause,
e.originalErrorId
@ -457,10 +450,12 @@ object NotarySigCheck {
* @param expectedTxId Expected ID of the transaction that's about to be received. This is typically retrieved from
* [SignTransactionFlow]. Setting it to null disables the expected transaction ID check.
* @param statesToRecord Which states to commit to the vault. Defaults to [StatesToRecord.ONLY_RELEVANT].
* @param handlePropagatedNotaryError Whether to catch and propagate Double Spend exception to peers.
*/
class ReceiveFinalityFlow @JvmOverloads constructor(private val otherSideSession: FlowSession,
private val expectedTxId: SecureHash? = null,
private val statesToRecord: StatesToRecord = ONLY_RELEVANT) : FlowLogic<SignedTransaction>() {
private val statesToRecord: StatesToRecord = ONLY_RELEVANT,
private val handlePropagatedNotaryError: Boolean? = null) : FlowLogic<SignedTransaction>() {
@Suppress("ComplexMethod")
@Suspendable
override fun call(): SignedTransaction {
@ -483,21 +478,18 @@ class ReceiveFinalityFlow @JvmOverloads constructor(private val otherSideSession
(serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx, notarySignatures, statesToRecord)
logger.info("Peer finalised transaction with notary signature.")
}
} catch(throwable: NotaryException) {
if(throwable.error is NotaryError.Conflict) {
logger.info("Peer received double spend error.")
} catch (e: NotaryException) {
logger.info("Peer received notary error.")
val overrideHandlePropagatedNotaryError = handlePropagatedNotaryError ?:
(serviceHub.cordappProvider.getAppContext().cordapp.targetPlatformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY)
if (overrideHandlePropagatedNotaryError) {
(serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(stx.id)
sleep(Duration.ZERO) // force checkpoint to persist db update.
throw e
}
else {
otherSideSession.receive<Any>() // simulate unexpected flow end
}
throw throwable
} catch (e: UnexpectedFlowEndException) {
(serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(stx.id)
sleep(Duration.ZERO) // force checkpoint to persist db update.
throw UnexpectedFlowEndException(
"${otherSideSession.counterparty} has finished prematurely whilst awaiting transaction notary signature.",
e.cause,
e.originalErrorId
)
}
} else {
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordTransactions", flowLogic = this) {

View File

@ -249,6 +249,7 @@ class FlowHospitalTest {
it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds)
}
waitForAllFlowsToComplete(nodeAHandle)
waitForAllFlowsToComplete(nodeBHandle)
}
// 1 is the notary failing to notarise and propagating the error
// 2 is the receiving flow failing due to the unexpected session end error
@ -321,6 +322,8 @@ class FlowHospitalTest {
it.startFlow(::SpendStateAndCatchDoubleSpendOldFinalityFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds)
it.startFlow(::SpendStateAndCatchDoubleSpendOldFinalityFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds)
}
waitForAllFlowsToComplete(nodeAHandle)
waitForAllFlowsToComplete(nodeBHandle)
}
// 1 is the notary failing to notarise and propagating the error
// 2 is the receiving flow failing due to the unexpected session end error
@ -378,6 +381,7 @@ class FlowHospitalTest {
it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref, true).returnValue.getOrThrow(20.seconds)
}
waitForAllFlowsToComplete(nodeAHandle)
waitForAllFlowsToComplete(nodeBHandle)
}
// 1 is the notary failing to notarise and propagating the error
assertEquals(1, dischargedCounter)