From 7bd3f5dd33eba746caf918b1d5c37183aca8d732 Mon Sep 17 00:00:00 2001 From: Jose Coll Date: Wed, 19 Apr 2023 15:31:47 +0100 Subject: [PATCH] ENT-9147 Remove un-notarised transactions upon Double Spend. (#7324) --- .ci/api-current.txt | 2 - .../coretests/flows/FinalityFlowTests.kt | 51 +++++++- .../net/corda/core/flows/FinalityFlow.kt | 113 +++++++++++++----- .../core/internal/ServiceHubCoreInternal.kt | 9 ++ .../services/statemachine/FlowHospitalTest.kt | 18 ++- .../node/services/api/ServiceHubInternal.kt | 12 ++ .../persistence/DBTransactionStorage.kt | 21 ++++ .../node/utilities/AppendOnlyPersistentMap.kt | 3 + .../node/messaging/TwoPartyTradeFlowTests.kt | 6 + .../persistence/DBTransactionStorageTests.kt | 17 +++ .../net/corda/testing/flows/FlowTestsUtils.kt | 11 ++ .../node/internal/MockTransactionStorage.kt | 4 + .../kotlin/net/corda/testing/dsl/TestDSL.kt | 2 + 13 files changed, 226 insertions(+), 43 deletions(-) diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 56123e9a97..b1923bb3d1 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -2542,9 +2542,7 @@ public final class net.corda.core.flows.FinalityFlow extends net.corda.core.flow public (net.corda.core.transactions.SignedTransaction, java.util.Collection, java.util.Collection, net.corda.core.utilities.ProgressTracker) public (net.corda.core.transactions.SignedTransaction, java.util.Collection, net.corda.core.node.StatesToRecord) public (net.corda.core.transactions.SignedTransaction, java.util.Collection, net.corda.core.node.StatesToRecord, net.corda.core.utilities.ProgressTracker) - public (net.corda.core.transactions.SignedTransaction, java.util.Collection, net.corda.core.node.StatesToRecord, net.corda.core.utilities.ProgressTracker, int, kotlin.jvm.internal.DefaultConstructorMarker) public (net.corda.core.transactions.SignedTransaction, java.util.Collection, net.corda.core.utilities.ProgressTracker) - public (net.corda.core.transactions.SignedTransaction, java.util.Collection, net.corda.core.utilities.ProgressTracker, int, kotlin.jvm.internal.DefaultConstructorMarker) public (net.corda.core.transactions.SignedTransaction, java.util.Set) public (net.corda.core.transactions.SignedTransaction, java.util.Set, net.corda.core.utilities.ProgressTracker) public (net.corda.core.transactions.SignedTransaction, net.corda.core.flows.FlowSession, net.corda.core.flows.FlowSession...) diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt index 0fac4a11ce..cf9663729c 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt @@ -45,6 +45,7 @@ import net.corda.finance.contracts.asset.Cash import net.corda.finance.flows.CashIssueFlow import net.corda.finance.flows.CashPaymentFlow import net.corda.finance.issuedBy +import net.corda.node.services.persistence.DBTransactionStorage import net.corda.testing.contracts.DummyContract import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.BOB_NAME @@ -182,13 +183,47 @@ class FinalityFlowTests : WithFinality { } catch (e: NotaryException) { val stxId = (e.error as NotaryError.Conflict).txId - val (_, txnDsStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail() - assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnDsStatusAlice) + assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) + // Note: double spend error not propagated to peers by default val (_, txnDsStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail() assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnDsStatusBob) } } + @Test(timeout=300_000) + fun `two phase finality flow double spend transaction with double spend handling`() { + val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY) + + val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow() + val stx = aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow() + + val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail() + assertEquals(TransactionStatus.VERIFIED, txnStatusAlice) + val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail() + assertEquals(TransactionStatus.VERIFIED, txnStatusBob) + + try { + aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity(), handleDoubleSpend = true)).resultFuture.getOrThrow() + } + catch (e: NotaryException) { + val stxId = (e.error as NotaryError.Conflict).txId + assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) + assertTxnRemovedFromDatabase(aliceNode, stxId) + assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId)) + assertTxnRemovedFromDatabase(bobNode, stxId) + } + } + + private fun assertTxnRemovedFromDatabase(node: TestStartedNode, stxId: SecureHash) { + val fromDb = node.database.transaction { + session.createQuery( + "from ${DBTransactionStorage.DBTransaction::class.java.name} where tx_id = :transactionId", + DBTransactionStorage.DBTransaction::class.java + ).setParameter("transactionId", stxId.toString()).resultList.map { it } + } + assertEquals(0, fromDb.size) + } + @Test(timeout=300_000) fun `two phase finality flow double spend transaction from pre-2PF initiator`() { val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY - 1) @@ -207,7 +242,9 @@ class FinalityFlowTests : WithFinality { catch (e: NotaryException) { val stxId = (e.error as NotaryError.Conflict).txId assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId)) + assertTxnRemovedFromDatabase(bobNode, stxId) assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) + assertTxnRemovedFromDatabase(aliceNode, stxId) } } @@ -228,9 +265,10 @@ class FinalityFlowTests : WithFinality { } catch (e: NotaryException) { val stxId = (e.error as NotaryError.Conflict).txId - val (_, txnDsStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail() - assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnDsStatusAlice) + assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) + assertTxnRemovedFromDatabase(aliceNode, stxId) assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId)) + assertTxnRemovedFromDatabase(bobNode, stxId) } } @@ -281,7 +319,8 @@ class FinalityFlowTests : WithFinality { @StartableByRPC @InitiatingFlow - class SpendFlow(private val stateAndRef: StateAndRef, private val newOwner: Party) : FlowLogic() { + class SpendFlow(private val stateAndRef: StateAndRef, private val newOwner: Party, + private val handleDoubleSpend: Boolean? = null) : FlowLogic() { @Suspendable override fun call(): SignedTransaction { @@ -289,7 +328,7 @@ class FinalityFlowTests : WithFinality { val signedTransaction = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey) val sessionWithCounterParty = initiateFlow(newOwner) sessionWithCounterParty.sendAndReceive("initial-message") - return subFlow(FinalityFlow(signedTransaction, setOf(sessionWithCounterParty))) + return subFlow(FinalityFlow(signedTransaction, setOf(sessionWithCounterParty), handleDoubleSpend = handleDoubleSpend)) } } diff --git a/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt b/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt index 932d2a01f4..f5a58f7d58 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt @@ -19,8 +19,10 @@ import net.corda.core.node.StatesToRecord.ONLY_RELEVANT import net.corda.core.transactions.LedgerTransaction import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.ProgressTracker +import net.corda.core.utilities.Try import net.corda.core.utilities.debug import net.corda.core.utilities.unwrap +import java.time.Duration /** * Verifies the given transaction, then sends it to the named notary. If the notary agrees that the transaction @@ -46,13 +48,15 @@ import net.corda.core.utilities.unwrap // To maintain backwards compatibility with the old API, FinalityFlow can act both as an initiating flow and as an inlined flow. // This is only possible because a flow is only truly initiating when the first call to initiateFlow is made (where the // presence of @InitiatingFlow is checked). So the new API is inlined simply because that code path doesn't call initiateFlow. +@Suppress("TooManyFunctions") @InitiatingFlow class FinalityFlow private constructor(val transaction: SignedTransaction, private val oldParticipants: Collection, override val progressTracker: ProgressTracker, private val sessions: Collection, private val newApi: Boolean, - private val statesToRecord: StatesToRecord = ONLY_RELEVANT) : FlowLogic() { + private val statesToRecord: StatesToRecord = ONLY_RELEVANT, + private val handleDoubleSpend: Boolean? = null) : FlowLogic() { @CordaInternal data class ExtraConstructorArgs(val oldParticipants: Collection, val sessions: Collection, val newApi: Boolean, val statesToRecord: StatesToRecord) @@ -87,13 +91,15 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, * @param transaction What to commit. * @param sessions A collection of [FlowSession]s for each non-local participant of the transaction. Sessions to non-participants can * also be provided. + * @param handleDoubleSpend Whether to catch and propagate Double Spend exception to peers. */ @JvmOverloads constructor( transaction: SignedTransaction, sessions: Collection, - progressTracker: ProgressTracker = tracker() - ) : this(transaction, emptyList(), progressTracker, sessions, true) + progressTracker: ProgressTracker = tracker(), + handleDoubleSpend: Boolean? = null + ) : this(transaction, emptyList(), progressTracker, sessions, true, handleDoubleSpend = handleDoubleSpend) /** * Notarise the given transaction and broadcast it to all the participants. @@ -102,14 +108,16 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, * @param sessions A collection of [FlowSession]s for each non-local participant of the transaction. Sessions to non-participants can * also be provided. * @param statesToRecord Which states to commit to the vault. + * @param handleDoubleSpend Whether to catch and propagate Double Spend exception to peers. */ @JvmOverloads constructor( transaction: SignedTransaction, sessions: Collection, statesToRecord: StatesToRecord, - progressTracker: ProgressTracker = tracker() - ) : this(transaction, emptyList(), progressTracker, sessions, true, statesToRecord) + progressTracker: ProgressTracker = tracker(), + handleDoubleSpend: Boolean? = null + ) : this(transaction, emptyList(), progressTracker, sessions, true, statesToRecord, handleDoubleSpend = handleDoubleSpend) /** * Notarise the given transaction and broadcast it to all the participants. @@ -146,11 +154,13 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, @Suppress("ClassNaming") object BROADCASTING_POST_NOTARISATION : ProgressTracker.Step("Broadcasting notary signature") @Suppress("ClassNaming") + object BROADCASTING_DOUBLE_SPEND_ERROR : ProgressTracker.Step("Broadcasting notary double spend error") + @Suppress("ClassNaming") object FINALISING_TRANSACTION : ProgressTracker.Step("Finalising transaction locally") object BROADCASTING : ProgressTracker.Step("Broadcasting notarised transaction to other participants") @JvmStatic - fun tracker() = ProgressTracker(RECORD_UNNOTARISED, BROADCASTING_PRE_NOTARISATION, NOTARISING, BROADCASTING_POST_NOTARISATION, FINALISING_TRANSACTION, BROADCASTING) + fun tracker() = ProgressTracker(RECORD_UNNOTARISED, BROADCASTING_PRE_NOTARISATION, NOTARISING, BROADCASTING_POST_NOTARISATION, BROADCASTING_DOUBLE_SPEND_ERROR, FINALISING_TRANSACTION, BROADCASTING) } @Suspendable @@ -202,28 +212,39 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, recordLocallyAndBroadcast(newPlatformSessions, transaction) } - val stxn = notariseOrRecord() - val notarySignatures = stxn.sigs - transaction.sigs.toSet() - if (notarySignatures.isNotEmpty()) { - if (useTwoPhaseFinality && newPlatformSessions.isNotEmpty()) { - broadcastSignaturesAndFinalise(newPlatformSessions, notarySignatures) - } - else { - progressTracker.currentStep = FINALISING_TRANSACTION - serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) { - (serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(transaction, notarySignatures, statesToRecord) - logger.info("Finalised transaction locally.") + try { + val stxn = notariseOrRecord() + val notarySignatures = stxn.sigs - transaction.sigs.toSet() + if (notarySignatures.isNotEmpty()) { + if (useTwoPhaseFinality && newPlatformSessions.isNotEmpty()) { + broadcastSignaturesAndFinalise(newPlatformSessions, notarySignatures) + } else { + progressTracker.currentStep = FINALISING_TRANSACTION + serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) { + (serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(transaction, notarySignatures, statesToRecord) + logger.info("Finalised transaction locally.") + } } } - } - if (!useTwoPhaseFinality || !needsNotarySignature(transaction)) { - broadcastToOtherParticipants(externalTxParticipants, newPlatformSessions + oldPlatformSessions, stxn) - } else if (useTwoPhaseFinality && oldPlatformSessions.isNotEmpty()) { - broadcastToOtherParticipants(externalTxParticipants, oldPlatformSessions, stxn) + if (!useTwoPhaseFinality || !needsNotarySignature(transaction)) { + broadcastToOtherParticipants(externalTxParticipants, newPlatformSessions + oldPlatformSessions, stxn) + } else if (useTwoPhaseFinality && oldPlatformSessions.isNotEmpty()) { + broadcastToOtherParticipants(externalTxParticipants, oldPlatformSessions, stxn) + } + return stxn + } + catch (e: NotaryException) { + if (e.error is NotaryError.Conflict && useTwoPhaseFinality) { + (serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(e.error.txId) + val overrideHandleDoubleSpend = handleDoubleSpend ?: + (serviceHub.cordappProvider.getAppContext().cordapp.targetPlatformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY) + if (overrideHandleDoubleSpend && newPlatformSessions.isNotEmpty()) { + broadcastDoubleSpendError(newPlatformSessions, e) + } else sleep(Duration.ZERO) // force checkpoint to persist db update. + } + throw e } - - return stxn } @Suspendable @@ -257,7 +278,7 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, sessions.forEach { session -> try { logger.debug { "Sending notary signature to party $session." } - session.send(notarySignatures) + session.send(Try.Success(notarySignatures)) // remote will finalise txn with notary signature } catch (e: UnexpectedFlowEndException) { throw UnexpectedFlowEndException( @@ -276,6 +297,27 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, } } + @Suspendable + private fun broadcastDoubleSpendError(sessions: Collection, error: NotaryException) { + progressTracker.currentStep = BROADCASTING_DOUBLE_SPEND_ERROR + serviceHub.telemetryServiceInternal.span("${this::class.java.name}#broadcastDoubleSpendError", flowLogic = this) { + logger.info("Broadcasting notary double spend error.") + sessions.forEach { session -> + try { + logger.debug { "Sending notary double spend error to party $session." } + session.send(Try.Failure>(error)) + } catch (e: UnexpectedFlowEndException) { + throw UnexpectedFlowEndException( + "${session.counterparty} has finished prematurely and we're trying to send them a notary double spend error. " + + "Did they forget to call ReceiveFinalityFlow? (${e.message})", + e.cause, + e.originalErrorId + ) + } + } + } + } + @Suspendable private fun broadcastToOtherParticipants(externalTxParticipants: Set, sessions: Collection, tx: SignedTransaction) { if (externalTxParticipants.isEmpty() && sessions.isEmpty() && oldParticipants.isEmpty()) return @@ -433,12 +475,21 @@ class ReceiveFinalityFlow @JvmOverloads constructor(private val otherSideSession } otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck) logger.info("Peer recorded transaction without notary signature. Waiting to receive notary signature.") - val notarySignatures = otherSideSession.receive>() - .unwrap { it } - serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) { - logger.debug { "Peer received notarised signature." } - (serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx, notarySignatures, statesToRecord) - logger.info("Peer finalised transaction with notary signature.") + + try { + val notarySignatures = otherSideSession.receive>>().unwrap { it.getOrThrow() } + serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) { + logger.debug { "Peer received notarised signature." } + (serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx, notarySignatures, statesToRecord) + logger.info("Peer finalised transaction with notary signature.") + } + } catch(throwable: NotaryException) { + if(throwable.error is NotaryError.Conflict) { + logger.info("Peer received double spend error.") + (serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(stx.id) + sleep(Duration.ZERO) // force checkpoint to persist db update. + } + throw throwable } } else { serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordTransactions", flowLogic = this) { diff --git a/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt b/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt index af7ce40179..36b44ed0e1 100644 --- a/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt +++ b/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt @@ -2,6 +2,7 @@ package net.corda.core.internal import co.paralleluniverse.fibers.Suspendable import net.corda.core.DeleteForDJVM +import net.corda.core.crypto.SecureHash import net.corda.core.crypto.TransactionSignature import net.corda.core.flows.FlowTransactionMetadata import net.corda.core.internal.notary.NotaryService @@ -37,6 +38,14 @@ interface ServiceHubCoreInternal : ServiceHub { */ fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata?= null) + /** + * Removes transaction from data store. + * This is expected to be run within a database transaction. + * + * @param id of transaction to remove. + */ + fun removeUnnotarisedTransaction(id: SecureHash) + /** * Stores [SignedTransaction] with extra signatures in the local transaction storage * diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt index 5c17bb7bac..b90f23437e 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt @@ -42,6 +42,7 @@ import net.corda.testing.core.CHARLIE_NAME import net.corda.testing.core.singleIdentity import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.driver +import net.corda.testing.flows.waitForAllFlowsToComplete import net.corda.testing.node.User import net.corda.testing.node.internal.CustomCordapp import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP @@ -247,6 +248,7 @@ class FlowHospitalTest { it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds) it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds) } + waitForAllFlowsToComplete(nodeAHandle) } // 1 is the notary failing to notarise and propagating the error // 2 is the receiving flow failing due to the unexpected session end error @@ -348,6 +350,7 @@ class FlowHospitalTest { val ref3 = it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeCHandle.nodeInfo.singleIdentity(), ref2).returnValue.getOrThrow(20.seconds) it.startFlow(::CreateTransactionButDontFinalizeFlow, nodeBHandle.nodeInfo.singleIdentity(), ref3).returnValue.getOrThrow(20.seconds) } + waitForAllFlowsToComplete(nodeAHandle) } assertEquals(0, dischargedCounter) assertEquals(1, observationCounter) @@ -374,6 +377,7 @@ class FlowHospitalTest { it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds) it.startFlow(::SpendStateAndCatchDoubleSpendFlow, nodeBHandle.nodeInfo.singleIdentity(), ref, true).returnValue.getOrThrow(20.seconds) } + waitForAllFlowsToComplete(nodeAHandle) } // 1 is the notary failing to notarise and propagating the error assertEquals(1, dischargedCounter) @@ -552,6 +556,7 @@ class FlowHospitalTest { var exceptionSeenInUserFlow = false } + @Suppress("TooGenericExceptionCaught") @Suspendable override fun call() { val consumeError = session.receive().unwrap { it } @@ -562,10 +567,15 @@ class FlowHospitalTest { }) try { subFlow(ReceiveFinalityFlow(session, stx.id)) - } catch (e: UnexpectedFlowEndException) { - exceptionSeenInUserFlow = true - if (!consumeError) { - throw e + } catch (ex: Exception) { + when (ex) { + is NotaryException, + is UnexpectedFlowEndException -> { + exceptionSeenInUserFlow = true + if (!consumeError) { + throw ex + } + } } } } diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index cf7623f825..cb917e0b51 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -251,6 +251,12 @@ interface ServiceHubInternal : ServiceHubCoreInternal { } } + override fun removeUnnotarisedTransaction(id: SecureHash) { + database.transaction { + validatedTransactions.removeUnnotarisedTransaction(id) + } + } + override fun createTransactionsResolver(flow: ResolveTransactionsFlow): TransactionsResolver = DbTransactionsResolver(flow) /** @@ -346,6 +352,12 @@ interface WritableTransactionStorage : TransactionStorage { */ fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata? = null): Boolean + /** + * Removes an un-notarised transaction (with a status of *MISSING_TRANSACTION_SIG*) from the data store. + * Returns null if no transaction with the ID exists. + */ + fun removeUnnotarisedTransaction(id: SecureHash): Boolean + /** * Update a previously un-notarised transaction including associated notary signatures. * @param transaction The notarised transaction to be finalized. diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt index d9149e1f34..7773fdcd22 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt @@ -268,6 +268,27 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: } } + override fun removeUnnotarisedTransaction(id: SecureHash): Boolean { + return database.transaction { + val session = currentDBSession() + val criteriaBuilder = session.criteriaBuilder + val delete = criteriaBuilder.createCriteriaDelete(DBTransaction::class.java) + val root = delete.from(DBTransaction::class.java) + delete.where(criteriaBuilder.and( + criteriaBuilder.equal(root.get(DBTransaction::txId.name), id.toString()), + criteriaBuilder.equal(root.get(DBTransaction::status.name), TransactionStatus.MISSING_NOTARY_SIG) + )) + if (session.createQuery(delete).executeUpdate() != 0) { + txStorage.locked { + txStorage.content.clear(id) + txStorage.content[id] + logger.debug { "Un-notarised transaction $id has been removed." } + } + true + } else false + } + } + override fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection) = addTransaction(transaction + signatures) { finalizeTransactionWithExtraSignatures(transaction.id, signatures) diff --git a/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt b/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt index 27d493b0a4..98de78ac0c 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt @@ -2,6 +2,7 @@ package net.corda.node.utilities import com.github.benmanes.caffeine.cache.LoadingCache import com.github.benmanes.caffeine.cache.Weigher +import net.corda.core.crypto.SecureHash import net.corda.core.internal.NamedCacheFactory import net.corda.core.utilities.contextLogger import net.corda.nodeapi.internal.persistence.DatabaseTransaction @@ -248,6 +249,8 @@ abstract class AppendOnlyPersistentMapBase( cache.invalidateAll() } + fun clear(id: SecureHash) = cache.invalidate(id) + // Helpers to know if transaction(s) are currently writing the given key. private fun weAreWriting(key: K): Boolean = pendingKeys[key]?.transactions?.contains(contextTransaction) ?: false diff --git a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt index cddecab98a..7e3ed4f2f9 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt @@ -809,6 +809,12 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) { return true } + override fun removeUnnotarisedTransaction(id: SecureHash): Boolean { + return database.transaction { + delegate.removeUnnotarisedTransaction(id) + } + } + override fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection) : Boolean { database.transaction { delegate.finalizeTransactionWithExtraSignatures(transaction, signatures) diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt index fd086f6ff9..16889e886e 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt @@ -43,6 +43,7 @@ import org.junit.Before import org.junit.Rule import org.junit.Test import rx.plugins.RxJavaHooks +import java.lang.AssertionError import java.security.KeyPair import java.time.Clock import java.time.Instant @@ -50,6 +51,7 @@ import java.util.concurrent.Semaphore import java.util.concurrent.TimeUnit import kotlin.concurrent.thread import kotlin.test.assertEquals +import kotlin.test.assertFailsWith import kotlin.test.assertFalse import kotlin.test.assertNull @@ -159,6 +161,21 @@ class DBTransactionStorageTests { } } + @Test(timeout = 300_000) + fun `remove un-notarised transaction`() { + val now = Instant.ofEpochSecond(333444555L) + val transactionClock = TransactionClock(now) + newTransactionStorage(clock = transactionClock) + val transaction = newTransaction(notarySig = false) + transactionStorage.addUnnotarisedTransaction(transaction) + assertNull(transactionStorage.getTransaction(transaction.id)) + assertEquals(MISSING_NOTARY_SIG, readTransactionFromDB(transaction.id).status) + + assertEquals(true, transactionStorage.removeUnnotarisedTransaction(transaction.id)) + assertFailsWith { readTransactionFromDB(transaction.id).status } + assertNull(transactionStorage.getTransactionInternal(transaction.id)) + } + @Test(timeout = 300_000) fun `finalize unverified transaction and verify no additional signatures are added`() { val now = Instant.ofEpochSecond(333444555L) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/flows/FlowTestsUtils.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/flows/FlowTestsUtils.kt index 78dd3a7035..bcbfdf85bc 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/flows/FlowTestsUtils.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/flows/FlowTestsUtils.kt @@ -1,6 +1,7 @@ package net.corda.testing.flows import co.paralleluniverse.fibers.Suspendable +import co.paralleluniverse.strands.Strand import net.corda.core.concurrent.CordaFuture import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowSession @@ -8,6 +9,7 @@ import net.corda.core.toFuture import net.corda.core.utilities.UntrustworthyData import net.corda.core.utilities.unwrap import net.corda.node.internal.InitiatedFlowFactory +import net.corda.testing.driver.NodeHandle import net.corda.testing.node.internal.TestStartedNode import rx.Observable import kotlin.reflect.KClass @@ -95,4 +97,13 @@ fun > TestStartedNode.registerCoreFlowFactory(initiatingFlowCla initiatedFlowClass: Class, flowFactory: (FlowSession) -> T, track: Boolean): Observable { return this.internals.registerInitiatedFlowFactory(initiatingFlowClass, initiatedFlowClass, InitiatedFlowFactory.Core(flowFactory), track) +} + +fun waitForAllFlowsToComplete(nodeHandle: NodeHandle, maxIterations: Int = 60, iterationDelay: Long = 500) { + repeat((0..maxIterations).count()) { + if (nodeHandle.rpc.stateMachinesSnapshot().isEmpty()) { + return + } + Strand.sleep(iterationDelay) + } } \ No newline at end of file diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt index c54dceba55..e0beb1ec00 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt @@ -59,6 +59,10 @@ open class MockTransactionStorage : WritableTransactionStorage, SingletonSeriali return txns.putIfAbsent(transaction.id, TxHolder(transaction, status = TransactionStatus.MISSING_NOTARY_SIG)) == null } + override fun removeUnnotarisedTransaction(id: SecureHash): Boolean { + return txns.remove(id) != null + } + override fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection): Boolean { val current = txns.replace(transaction.id, TxHolder(transaction, status = TransactionStatus.VERIFIED)) return if (current != null) { diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/dsl/TestDSL.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/dsl/TestDSL.kt index 983ca29b8f..0a63b813b1 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/dsl/TestDSL.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/dsl/TestDSL.kt @@ -141,6 +141,8 @@ data class TestTransactionDSLInterpreter private constructor( override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata?) {} + override fun removeUnnotarisedTransaction(id: SecureHash) {} + override fun finalizeTransactionWithExtraSignatures(txn: SignedTransaction, sigs: Collection, statesToRecord: StatesToRecord) {} }