From bc718088febb5248a398a4e04d93c7bc919f9774 Mon Sep 17 00:00:00 2001 From: Jose Coll Date: Tue, 24 Oct 2023 10:38:46 +0100 Subject: [PATCH] ENT-10100 Changes required to support recovery of IN_FLIGHT transactions. (#7541) --- .../coretests/flows/FinalityFlowTests.kt | 50 +++++++++---------- .../corda/core/flows/SendTransactionFlow.kt | 1 + .../net/corda/core/internal/FetchDataFlow.kt | 14 ++++-- .../core/internal/ResolveTransactionsFlow.kt | 8 ++- .../core/internal/ServiceHubCoreInternal.kt | 2 +- .../core/node/services/TransactionStorage.kt | 17 +++++++ .../node/services/DbTransactionsResolver.kt | 10 ++-- .../node/services/api/ServiceHubInternal.kt | 7 --- .../persistence/DBTransactionStorage.kt | 12 ++--- .../node/messaging/TwoPartyTradeFlowTests.kt | 12 ++--- ...DBTransactionStorageLedgerRecoveryTests.kt | 4 +- .../persistence/DBTransactionStorageTests.kt | 2 +- .../node/internal/MockTransactionStorage.kt | 5 +- 13 files changed, 84 insertions(+), 60 deletions(-) diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt index 8048c99dbd..562cc3a4b4 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt @@ -182,9 +182,9 @@ class FinalityFlowTests : WithFinality { val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow() val stx = aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow() - val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail() + val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail() assertEquals(TransactionStatus.VERIFIED, txnStatusAlice) - val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail() + val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail() assertEquals(TransactionStatus.VERIFIED, txnStatusBob) try { @@ -192,10 +192,10 @@ class FinalityFlowTests : WithFinality { } catch (e: NotaryException) { val stxId = (e.error as NotaryError.Conflict).txId - assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) + assertNull(aliceNode.services.validatedTransactions.getTransactionWithStatus(stxId)) // Note: double spend error not propagated to peers by default (corDapp PV = 3) // Un-notarised txn clean-up occurs in ReceiveFinalityFlow upon receipt of UnexpectedFlowEndException - assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) + assertNull(aliceNode.services.validatedTransactions.getTransactionWithStatus(stxId)) assertTxnRemovedFromDatabase(aliceNode, stxId) } } @@ -207,9 +207,9 @@ class FinalityFlowTests : WithFinality { val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow() val stx = aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow() - val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail() + val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail() assertEquals(TransactionStatus.VERIFIED, txnStatusAlice) - val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail() + val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail() assertEquals(TransactionStatus.VERIFIED, txnStatusBob) try { @@ -217,9 +217,9 @@ class FinalityFlowTests : WithFinality { } catch (e: NotaryException) { val stxId = (e.error as NotaryError.Conflict).txId - assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) + assertNull(aliceNode.services.validatedTransactions.getTransactionWithStatus(stxId)) assertTxnRemovedFromDatabase(aliceNode, stxId) - assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId)) + assertNull(bobNode.services.validatedTransactions.getTransactionWithStatus(stxId)) assertTxnRemovedFromDatabase(bobNode, stxId) } @@ -228,9 +228,9 @@ class FinalityFlowTests : WithFinality { } catch (e: NotaryException) { val stxId = (e.error as NotaryError.Conflict).txId - assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) + assertNull(aliceNode.services.validatedTransactions.getTransactionWithStatus(stxId)) assertTxnRemovedFromDatabase(aliceNode, stxId) - val (_, txnStatus) = bobNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail() + val (_, txnStatus) = bobNode.services.validatedTransactions.getTransactionWithStatus(stxId) ?: fail() assertEquals(TransactionStatus.IN_FLIGHT, txnStatus) } } @@ -252,9 +252,9 @@ class FinalityFlowTests : WithFinality { val ref = bobNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow() val stx = bobNode.startFlowAndRunNetwork(SpendFlow(ref, aliceNode.info.singleIdentity())).resultFuture.getOrThrow() - val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail() + val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail() assertEquals(TransactionStatus.VERIFIED, txnStatusAlice) - val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail() + val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail() assertEquals(TransactionStatus.VERIFIED, txnStatusBob) try { @@ -262,9 +262,9 @@ class FinalityFlowTests : WithFinality { } catch (e: NotaryException) { val stxId = (e.error as NotaryError.Conflict).txId - assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId)) + assertNull(bobNode.services.validatedTransactions.getTransactionWithStatus(stxId)) assertTxnRemovedFromDatabase(bobNode, stxId) - assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) + assertNull(aliceNode.services.validatedTransactions.getTransactionWithStatus(stxId)) assertTxnRemovedFromDatabase(aliceNode, stxId) } } @@ -276,9 +276,9 @@ class FinalityFlowTests : WithFinality { val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow() val stx = aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow() - val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail() + val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail() assertEquals(TransactionStatus.VERIFIED, txnStatusAlice) - val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail() + val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail() assertEquals(TransactionStatus.VERIFIED, txnStatusBob) try { @@ -286,9 +286,9 @@ class FinalityFlowTests : WithFinality { } catch (e: NotaryException) { val stxId = (e.error as NotaryError.Conflict).txId - assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) + assertNull(aliceNode.services.validatedTransactions.getTransactionWithStatus(stxId)) assertTxnRemovedFromDatabase(aliceNode, stxId) - assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId)) + assertNull(bobNode.services.validatedTransactions.getTransactionWithStatus(stxId)) assertTxnRemovedFromDatabase(bobNode, stxId) } } @@ -300,9 +300,9 @@ class FinalityFlowTests : WithFinality { val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow() val notarisedStxn1 = aliceNode.startFlowAndRunNetwork(SpeedySpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow() - val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail() + val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionWithStatus(notarisedStxn1.id) ?: fail() assertEquals(TransactionStatus.VERIFIED, txnStatusAlice) - val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail() + val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionWithStatus(notarisedStxn1.id) ?: fail() assertEquals(TransactionStatus.IN_FLIGHT, txnStatusBob) // now lets attempt a new spend with the new output of the previous transaction @@ -311,17 +311,17 @@ class FinalityFlowTests : WithFinality { // the original transaction is now finalised at Bob (despite the original flow not completing) because Bob resolved the // original transaction from Alice in the second transaction (and Alice had already notarised and finalised the original transaction) - val (_, txnStatusBobAgain) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail() + val (_, txnStatusBobAgain) = bobNode.services.validatedTransactions.getTransactionWithStatus(notarisedStxn1.id) ?: fail() assertEquals(TransactionStatus.VERIFIED, txnStatusBobAgain) - val (_, txnStatusAlice2) = aliceNode.services.validatedTransactions.getTransactionInternal(notarisedStxn2.id) ?: fail() + val (_, txnStatusAlice2) = aliceNode.services.validatedTransactions.getTransactionWithStatus(notarisedStxn2.id) ?: fail() assertEquals(TransactionStatus.VERIFIED, txnStatusAlice2) - val (_, txnStatusBob2) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn2.id) ?: fail() + val (_, txnStatusBob2) = bobNode.services.validatedTransactions.getTransactionWithStatus(notarisedStxn2.id) ?: fail() assertEquals(TransactionStatus.IN_FLIGHT, txnStatusBob2) // Validate attempt at flow finalisation by Bob has no effect on outcome. val finaliseStxn1 = bobNode.startFlowAndRunNetwork(FinaliseSpeedySpendFlow(notarisedStxn1.id, notarisedStxn1.sigs)).resultFuture.getOrThrow() - val (_, txnStatusBobYetAgain) = bobNode.services.validatedTransactions.getTransactionInternal(finaliseStxn1.id) ?: fail() + val (_, txnStatusBobYetAgain) = bobNode.services.validatedTransactions.getTransactionWithStatus(finaliseStxn1.id) ?: fail() assertEquals(TransactionStatus.VERIFIED, txnStatusBobYetAgain) } @@ -335,7 +335,7 @@ class FinalityFlowTests : WithFinality { } catch (e: UnexpectedFlowEndException) { val stxId = SecureHash.parse(e.message) - val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail() + val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionWithStatus(stxId) ?: fail() assertEquals(TransactionStatus.IN_FLIGHT, txnStatusBob) } } diff --git a/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt b/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt index d10bbe0ebb..9f57de9580 100644 --- a/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt @@ -221,6 +221,7 @@ open class DataVendingFlow(val otherSessions: Set, val payload: Any numSent++ tx } + FetchDataFlow.DataType.TRANSACTION_RECOVERY -> NotImplementedError("Enterprise only feature") // Loop on all items returned using dataRequest.hashes.map: FetchDataFlow.DataType.BATCH_TRANSACTION -> dataRequest.hashes.map { txId -> if (!authorisedTransactions.isAuthorised(txId)) { diff --git a/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt b/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt index 8d69db366f..1479ab4896 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt @@ -12,6 +12,7 @@ import net.corda.core.flows.MaybeSerializedSignedTransaction import net.corda.core.internal.FetchDataFlow.DownloadedVsRequestedDataMismatch import net.corda.core.internal.FetchDataFlow.HashNotFound import net.corda.core.node.NetworkParameters +import net.corda.core.node.services.SignedTransactionWithStatus import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializationTransformEnumDefault import net.corda.core.serialization.CordaSerializationTransformEnumDefaults @@ -82,7 +83,7 @@ sealed class FetchDataFlow( ) @CordaSerializable enum class DataType { - TRANSACTION, ATTACHMENT, PARAMETERS, BATCH_TRANSACTION, UNKNOWN + TRANSACTION, ATTACHMENT, PARAMETERS, BATCH_TRANSACTION, UNKNOWN, TRANSACTION_RECOVERY } @Suspendable @@ -267,12 +268,19 @@ class FetchAttachmentsFlow(requests: Set, * Authorisation is accorded only on valid ancestors of the root transaction. * Note that returned transactions are not inserted into the database, because it's up to the caller to actually verify the transactions are valid. */ -class FetchTransactionsFlow(requests: Set, otherSide: FlowSession) : - FetchDataFlow(requests, otherSide, DataType.TRANSACTION) { +class FetchTransactionsFlow @JvmOverloads constructor(requests: Set, otherSide: FlowSession, dataType: DataType = DataType.TRANSACTION) : + FetchDataFlow(requests, otherSide, dataType) { override fun load(txid: SecureHash): SignedTransaction? = serviceHub.validatedTransactions.getTransaction(txid) } +// Used by Enterprise Ledger Recovery +class FetchRecoverableTransactionsFlow @JvmOverloads constructor(requests: Set, otherSide: FlowSession, dataType: DataType = DataType.TRANSACTION_RECOVERY) : + FetchDataFlow(requests, otherSide, dataType) { + + override fun load(txid: SecureHash): SignedTransactionWithStatus? = serviceHub.validatedTransactions.getTransactionWithStatus(txid) +} + class FetchBatchTransactionsFlow(requests: Set, otherSide: FlowSession) : FetchDataFlow(requests, otherSide, DataType.BATCH_TRANSACTION) { diff --git a/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt b/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt index 053015fef9..7b8f248667 100644 --- a/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt +++ b/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt @@ -20,7 +20,8 @@ class ResolveTransactionsFlow private constructor( val txHashes: Set, val otherSide: FlowSession, val statesToRecord: StatesToRecord, - val deferredAck: Boolean = false + val deferredAck: Boolean = false, + val recoveryMode: Boolean = false ) : FlowLogic() { constructor(txHashes: Set, otherSide: FlowSession, statesToRecord: StatesToRecord = StatesToRecord.NONE) @@ -29,6 +30,9 @@ class ResolveTransactionsFlow private constructor( constructor(txHashes: Set, otherSide: FlowSession, statesToRecord: StatesToRecord, deferredAck: Boolean) : this(null, txHashes, otherSide, statesToRecord, deferredAck) + constructor(txHashes: Set, otherSide: FlowSession, statesToRecord: StatesToRecord, deferredAck: Boolean, recoveryMode: Boolean ) + : this(null, txHashes, otherSide, statesToRecord, deferredAck, recoveryMode) + /** * Resolves and validates the dependencies of the specified [SignedTransaction]. Fetches the attachments, but does * *not* validate or store the [SignedTransaction] itself. @@ -63,7 +67,7 @@ class ResolveTransactionsFlow private constructor( } val resolver = (serviceHub as ServiceHubCoreInternal).createTransactionsResolver(this) - resolver.downloadDependencies(batchMode) + resolver.downloadDependencies(batchMode, recoveryMode) if (!deferredAck) { logger.trace { "ResolveTransactionsFlow: Sending END." } diff --git a/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt b/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt index d752eb3b15..bd7c1142ac 100644 --- a/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt +++ b/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt @@ -85,7 +85,7 @@ interface ServiceHubCoreInternal : ServiceHub { interface TransactionsResolver { @Suspendable - fun downloadDependencies(batchMode: Boolean) + fun downloadDependencies(batchMode: Boolean, recoveryMode: Boolean) @Suspendable fun recordDependencies(usedStatesToRecord: StatesToRecord) diff --git a/core/src/main/kotlin/net/corda/core/node/services/TransactionStorage.kt b/core/src/main/kotlin/net/corda/core/node/services/TransactionStorage.kt index b04c96729f..b4875040e0 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/TransactionStorage.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/TransactionStorage.kt @@ -2,8 +2,11 @@ package net.corda.core.node.services import net.corda.core.DoNotImplement import net.corda.core.concurrent.CordaFuture +import net.corda.core.contracts.NamedByHash import net.corda.core.crypto.SecureHash +import net.corda.core.flows.TransactionStatus import net.corda.core.messaging.DataFeed +import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.SignedTransaction import rx.Observable @@ -17,6 +20,11 @@ interface TransactionStorage { */ fun getTransaction(id: SecureHash): SignedTransaction? + /** + * Return the transaction with its status for the given [id], or null if no such transaction exists. + */ + fun getTransactionWithStatus(id: SecureHash): SignedTransactionWithStatus? + /** * Get a synchronous Observable of updates. When observations are pushed to the Observer, the vault will already * incorporate the update. @@ -32,4 +40,13 @@ interface TransactionStorage { * Returns a future that completes with the transaction corresponding to [id] once it has been committed */ fun trackTransaction(id: SecureHash): CordaFuture +} + +@CordaSerializable +data class SignedTransactionWithStatus( + val stx: SignedTransaction, + val status: TransactionStatus +) : NamedByHash { + override val id: SecureHash + get() = stx.id } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt b/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt index 3d737ea422..e82549d8e3 100644 --- a/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt +++ b/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt @@ -11,8 +11,8 @@ import net.corda.core.internal.dependencies import net.corda.core.node.StatesToRecord import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.debug -import net.corda.core.utilities.trace import net.corda.core.utilities.seconds +import net.corda.core.utilities.trace import net.corda.node.services.api.WritableTransactionStorage import java.util.* @@ -21,7 +21,8 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa private val logger = flow.logger @Suspendable - override fun downloadDependencies(batchMode: Boolean) { + override fun downloadDependencies(batchMode: Boolean, recoveryMode: Boolean) { + if (recoveryMode) throw NotImplementedError("Enterprise only Ledger Recovery feature") logger.debug { "Downloading dependencies for transactions ${flow.txHashes}" } val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage @@ -99,13 +100,12 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa override fun recordDependencies(usedStatesToRecord: StatesToRecord) { val sortedDependencies = checkNotNull(this.sortedDependencies) logger.trace { "Recording ${sortedDependencies.size} dependencies for ${flow.txHashes.size} transactions" } - val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage for (txId in sortedDependencies) { // Retrieve and delete the transaction from the unverified store. - val (tx, txStatus) = checkNotNull(transactionStorage.getTransactionInternal(txId)) { + val (tx, txStatus) = checkNotNull(flow.serviceHub.validatedTransactions.getTransactionWithStatus(txId)) { "Somehow the unverified transaction ($txId) that we stored previously is no longer there." } - if (txStatus != TransactionStatus.VERIFIED) { + if (txStatus == TransactionStatus.UNVERIFIED) { tx.verify(flow.serviceHub) flow.serviceHub.recordTransactions(usedStatesToRecord, listOf(tx)) } else { diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index 9ed76b15c8..0ee732b210 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -7,7 +7,6 @@ import net.corda.core.crypto.TransactionSignature import net.corda.core.flows.FlowLogic import net.corda.core.flows.TransactionMetadata import net.corda.core.flows.StateMachineRunId -import net.corda.core.flows.TransactionStatus import net.corda.core.identity.CordaX500Name import net.corda.core.internal.FlowStateMachineHandle import net.corda.core.internal.NamedCacheFactory @@ -416,12 +415,6 @@ interface WritableTransactionStorage : TransactionStorage { */ fun addUnverifiedTransaction(transaction: SignedTransaction) - /** - * Return the transaction with the given ID from the store, and its associated [TransactionStatus]. - * Returns null if no transaction with the ID exists. - */ - fun getTransactionInternal(id: SecureHash): Pair? - /** * Returns a future that completes with the transaction corresponding to [id] once it has been committed. Do not warn when run inside * a DB transaction. diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt index 6905d6f7c1..8577afe934 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt @@ -11,6 +11,7 @@ import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.internal.concurrent.doneFuture import net.corda.core.messaging.DataFeed +import net.corda.core.node.services.SignedTransactionWithStatus import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializedBytes @@ -314,6 +315,11 @@ open class DBTransactionStorage(private val database: CordaPersistence, cacheFac } } + override fun getTransactionWithStatus(id: SecureHash): SignedTransactionWithStatus? = + database.transaction { + txStorage.content[id]?.let { SignedTransactionWithStatus(it.toSignedTx(), it.status.toTransactionStatus()) } + } + override fun addUnverifiedTransaction(transaction: SignedTransaction) { if (transaction.coreTransaction is WireTransaction) transaction.verifyRequiredSignatures() @@ -335,12 +341,6 @@ open class DBTransactionStorage(private val database: CordaPersistence, cacheFac } } - override fun getTransactionInternal(id: SecureHash): Pair? { - return database.transaction { - txStorage.content[id]?.let { it.toSignedTx() to it.status.toTransactionStatus() } - } - } - private val updatesPublisher = PublishSubject.create().toSerialized() override val updates: Observable = updatesPublisher.wrapWithDatabaseTransaction() diff --git a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt index 451bc813a9..2bdfc69e5f 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt @@ -17,11 +17,10 @@ import net.corda.core.crypto.SignatureMetadata import net.corda.core.crypto.TransactionSignature import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowSession -import net.corda.core.flows.TransactionMetadata import net.corda.core.flows.InitiatedBy import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.StateMachineRunId -import net.corda.core.flows.TransactionStatus +import net.corda.core.flows.TransactionMetadata import net.corda.core.identity.AbstractParty import net.corda.core.identity.AnonymousParty import net.corda.core.identity.CordaX500Name @@ -31,6 +30,7 @@ import net.corda.core.internal.concurrent.map import net.corda.core.internal.rootCause import net.corda.core.messaging.DataFeed import net.corda.core.messaging.StateMachineTransactionMapping +import net.corda.core.node.services.SignedTransactionWithStatus import net.corda.core.node.services.Vault import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SingletonSerializeAsToken @@ -44,12 +44,12 @@ import net.corda.core.utilities.toNonEmptySet import net.corda.core.utilities.unwrap import net.corda.coretesting.internal.TEST_TX_TIME import net.corda.finance.DOLLARS -import net.corda.finance.`issued by` import net.corda.finance.contracts.CommercialPaper import net.corda.finance.contracts.asset.CASH import net.corda.finance.contracts.asset.Cash import net.corda.finance.flows.TwoPartyTradeFlow.Buyer import net.corda.finance.flows.TwoPartyTradeFlow.Seller +import net.corda.finance.`issued by` import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.WritableTransactionStorage import net.corda.node.services.persistence.DBTransactionStorage @@ -857,12 +857,12 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) { } } - override fun getTransactionInternal(id: SecureHash): Pair? { + override fun getTransactionWithStatus(id: SecureHash): SignedTransactionWithStatus? { return database.transaction { - delegate.getTransactionInternal(id) + records.add(TxRecord.Get(id)) + delegate.getTransactionWithStatus(id) } } - } interface TxRecord { diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt index c4a98b1093..9cdf700611 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt @@ -308,7 +308,7 @@ class DBTransactionStorageLedgerRecoveryTests { assertEquals(true, transactionRecovery.removeUnnotarisedTransaction(senderTransaction.id)) assertFailsWith { readTransactionFromDB(senderTransaction.id).status } assertEquals(0, readSenderDistributionRecordFromDB(senderTransaction.id).size) - assertNull(transactionRecovery.getTransactionInternal(senderTransaction.id)) + assertNull(transactionRecovery.getTransactionWithStatus(senderTransaction.id)) val receiverTransaction = newTransaction(notarySig = false) transactionRecovery.addUnnotarisedTransaction(receiverTransaction) @@ -322,7 +322,7 @@ class DBTransactionStorageLedgerRecoveryTests { assertEquals(true, transactionRecovery.removeUnnotarisedTransaction(receiverTransaction.id)) assertFailsWith { readTransactionFromDB(receiverTransaction.id).status } assertFailsWith { readReceiverDistributionRecordFromDB(receiverTransaction.id) } - assertNull(transactionRecovery.getTransactionInternal(receiverTransaction.id)) + assertNull(transactionRecovery.getTransactionWithStatus(receiverTransaction.id)) } @Test(timeout = 300_000) diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt index 0d4b380375..2d54442556 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt @@ -170,7 +170,7 @@ class DBTransactionStorageTests { assertEquals(true, transactionStorage.removeUnnotarisedTransaction(transaction.id)) assertFailsWith { readTransactionFromDB(transaction.id).status } - assertNull(transactionStorage.getTransactionInternal(transaction.id)) + assertNull(transactionStorage.getTransactionWithStatus(transaction.id)) } @Test(timeout = 300_000) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt index 0a52c09f56..d09af76ca2 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt @@ -12,6 +12,7 @@ import net.corda.node.services.api.WritableTransactionStorage import net.corda.core.flows.TransactionMetadata import net.corda.core.flows.TransactionStatus import net.corda.core.identity.CordaX500Name +import net.corda.core.node.services.SignedTransactionWithStatus import net.corda.testing.node.MockServices import rx.Observable import rx.subjects.PublishSubject @@ -86,9 +87,9 @@ open class MockTransactionStorage : WritableTransactionStorage, SingletonSeriali txns.putIfAbsent(transaction.id, TxHolder(transaction, status = TransactionStatus.UNVERIFIED)) } - override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id]?.let { if (it.status == TransactionStatus.VERIFIED) it.stx else null } + override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id]?.let { if (it.isVerified) it.stx else null } - override fun getTransactionInternal(id: SecureHash): Pair? = txns[id]?.let { Pair(it.stx, it.status) } + override fun getTransactionWithStatus(id: SecureHash): SignedTransactionWithStatus? = txns[id]?.let { SignedTransactionWithStatus(it.stx, it.status) } private class TxHolder(val stx: SignedTransaction, var status: TransactionStatus) { val isVerified = status == TransactionStatus.VERIFIED