Merging forward updates from release/os/4.11 to release/os/4.12 - 2023-10-24

This commit is contained in:
r3-build 2023-10-24 09:42:55 +00:00
commit 9d452f0003
13 changed files with 84 additions and 60 deletions

View File

@ -182,9 +182,9 @@ class FinalityFlowTests : WithFinality {
val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow() val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow()
val stx = aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow() val stx = aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail() val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail()
assertEquals(TransactionStatus.VERIFIED, txnStatusAlice) assertEquals(TransactionStatus.VERIFIED, txnStatusAlice)
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail() val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail()
assertEquals(TransactionStatus.VERIFIED, txnStatusBob) assertEquals(TransactionStatus.VERIFIED, txnStatusBob)
try { try {
@ -192,10 +192,10 @@ class FinalityFlowTests : WithFinality {
} }
catch (e: NotaryException) { catch (e: NotaryException) {
val stxId = (e.error as NotaryError.Conflict).txId val stxId = (e.error as NotaryError.Conflict).txId
assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) assertNull(aliceNode.services.validatedTransactions.getTransactionWithStatus(stxId))
// Note: double spend error not propagated to peers by default (corDapp PV = 3) // Note: double spend error not propagated to peers by default (corDapp PV = 3)
// Un-notarised txn clean-up occurs in ReceiveFinalityFlow upon receipt of UnexpectedFlowEndException // Un-notarised txn clean-up occurs in ReceiveFinalityFlow upon receipt of UnexpectedFlowEndException
assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) assertNull(aliceNode.services.validatedTransactions.getTransactionWithStatus(stxId))
assertTxnRemovedFromDatabase(aliceNode, stxId) assertTxnRemovedFromDatabase(aliceNode, stxId)
} }
} }
@ -207,9 +207,9 @@ class FinalityFlowTests : WithFinality {
val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow() val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow()
val stx = aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow() val stx = aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail() val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail()
assertEquals(TransactionStatus.VERIFIED, txnStatusAlice) assertEquals(TransactionStatus.VERIFIED, txnStatusAlice)
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail() val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail()
assertEquals(TransactionStatus.VERIFIED, txnStatusBob) assertEquals(TransactionStatus.VERIFIED, txnStatusBob)
try { try {
@ -217,9 +217,9 @@ class FinalityFlowTests : WithFinality {
} }
catch (e: NotaryException) { catch (e: NotaryException) {
val stxId = (e.error as NotaryError.Conflict).txId val stxId = (e.error as NotaryError.Conflict).txId
assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) assertNull(aliceNode.services.validatedTransactions.getTransactionWithStatus(stxId))
assertTxnRemovedFromDatabase(aliceNode, stxId) assertTxnRemovedFromDatabase(aliceNode, stxId)
assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId)) assertNull(bobNode.services.validatedTransactions.getTransactionWithStatus(stxId))
assertTxnRemovedFromDatabase(bobNode, stxId) assertTxnRemovedFromDatabase(bobNode, stxId)
} }
@ -228,9 +228,9 @@ class FinalityFlowTests : WithFinality {
} }
catch (e: NotaryException) { catch (e: NotaryException) {
val stxId = (e.error as NotaryError.Conflict).txId val stxId = (e.error as NotaryError.Conflict).txId
assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) assertNull(aliceNode.services.validatedTransactions.getTransactionWithStatus(stxId))
assertTxnRemovedFromDatabase(aliceNode, stxId) assertTxnRemovedFromDatabase(aliceNode, stxId)
val (_, txnStatus) = bobNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail() val (_, txnStatus) = bobNode.services.validatedTransactions.getTransactionWithStatus(stxId) ?: fail()
assertEquals(TransactionStatus.IN_FLIGHT, txnStatus) assertEquals(TransactionStatus.IN_FLIGHT, txnStatus)
} }
} }
@ -252,9 +252,9 @@ class FinalityFlowTests : WithFinality {
val ref = bobNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow() val ref = bobNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow()
val stx = bobNode.startFlowAndRunNetwork(SpendFlow(ref, aliceNode.info.singleIdentity())).resultFuture.getOrThrow() val stx = bobNode.startFlowAndRunNetwork(SpendFlow(ref, aliceNode.info.singleIdentity())).resultFuture.getOrThrow()
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail() val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail()
assertEquals(TransactionStatus.VERIFIED, txnStatusAlice) assertEquals(TransactionStatus.VERIFIED, txnStatusAlice)
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail() val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail()
assertEquals(TransactionStatus.VERIFIED, txnStatusBob) assertEquals(TransactionStatus.VERIFIED, txnStatusBob)
try { try {
@ -262,9 +262,9 @@ class FinalityFlowTests : WithFinality {
} }
catch (e: NotaryException) { catch (e: NotaryException) {
val stxId = (e.error as NotaryError.Conflict).txId val stxId = (e.error as NotaryError.Conflict).txId
assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId)) assertNull(bobNode.services.validatedTransactions.getTransactionWithStatus(stxId))
assertTxnRemovedFromDatabase(bobNode, stxId) assertTxnRemovedFromDatabase(bobNode, stxId)
assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) assertNull(aliceNode.services.validatedTransactions.getTransactionWithStatus(stxId))
assertTxnRemovedFromDatabase(aliceNode, stxId) assertTxnRemovedFromDatabase(aliceNode, stxId)
} }
} }
@ -276,9 +276,9 @@ class FinalityFlowTests : WithFinality {
val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow() val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow()
val stx = aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow() val stx = aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail() val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail()
assertEquals(TransactionStatus.VERIFIED, txnStatusAlice) assertEquals(TransactionStatus.VERIFIED, txnStatusAlice)
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail() val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail()
assertEquals(TransactionStatus.VERIFIED, txnStatusBob) assertEquals(TransactionStatus.VERIFIED, txnStatusBob)
try { try {
@ -286,9 +286,9 @@ class FinalityFlowTests : WithFinality {
} }
catch (e: NotaryException) { catch (e: NotaryException) {
val stxId = (e.error as NotaryError.Conflict).txId val stxId = (e.error as NotaryError.Conflict).txId
assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) assertNull(aliceNode.services.validatedTransactions.getTransactionWithStatus(stxId))
assertTxnRemovedFromDatabase(aliceNode, stxId) assertTxnRemovedFromDatabase(aliceNode, stxId)
assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId)) assertNull(bobNode.services.validatedTransactions.getTransactionWithStatus(stxId))
assertTxnRemovedFromDatabase(bobNode, stxId) assertTxnRemovedFromDatabase(bobNode, stxId)
} }
} }
@ -300,9 +300,9 @@ class FinalityFlowTests : WithFinality {
val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow() val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow()
val notarisedStxn1 = aliceNode.startFlowAndRunNetwork(SpeedySpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow() val notarisedStxn1 = aliceNode.startFlowAndRunNetwork(SpeedySpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail() val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionWithStatus(notarisedStxn1.id) ?: fail()
assertEquals(TransactionStatus.VERIFIED, txnStatusAlice) assertEquals(TransactionStatus.VERIFIED, txnStatusAlice)
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail() val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionWithStatus(notarisedStxn1.id) ?: fail()
assertEquals(TransactionStatus.IN_FLIGHT, txnStatusBob) assertEquals(TransactionStatus.IN_FLIGHT, txnStatusBob)
// now lets attempt a new spend with the new output of the previous transaction // now lets attempt a new spend with the new output of the previous transaction
@ -311,17 +311,17 @@ class FinalityFlowTests : WithFinality {
// the original transaction is now finalised at Bob (despite the original flow not completing) because Bob resolved the // the original transaction is now finalised at Bob (despite the original flow not completing) because Bob resolved the
// original transaction from Alice in the second transaction (and Alice had already notarised and finalised the original transaction) // original transaction from Alice in the second transaction (and Alice had already notarised and finalised the original transaction)
val (_, txnStatusBobAgain) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail() val (_, txnStatusBobAgain) = bobNode.services.validatedTransactions.getTransactionWithStatus(notarisedStxn1.id) ?: fail()
assertEquals(TransactionStatus.VERIFIED, txnStatusBobAgain) assertEquals(TransactionStatus.VERIFIED, txnStatusBobAgain)
val (_, txnStatusAlice2) = aliceNode.services.validatedTransactions.getTransactionInternal(notarisedStxn2.id) ?: fail() val (_, txnStatusAlice2) = aliceNode.services.validatedTransactions.getTransactionWithStatus(notarisedStxn2.id) ?: fail()
assertEquals(TransactionStatus.VERIFIED, txnStatusAlice2) assertEquals(TransactionStatus.VERIFIED, txnStatusAlice2)
val (_, txnStatusBob2) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn2.id) ?: fail() val (_, txnStatusBob2) = bobNode.services.validatedTransactions.getTransactionWithStatus(notarisedStxn2.id) ?: fail()
assertEquals(TransactionStatus.IN_FLIGHT, txnStatusBob2) assertEquals(TransactionStatus.IN_FLIGHT, txnStatusBob2)
// Validate attempt at flow finalisation by Bob has no effect on outcome. // Validate attempt at flow finalisation by Bob has no effect on outcome.
val finaliseStxn1 = bobNode.startFlowAndRunNetwork(FinaliseSpeedySpendFlow(notarisedStxn1.id, notarisedStxn1.sigs)).resultFuture.getOrThrow() val finaliseStxn1 = bobNode.startFlowAndRunNetwork(FinaliseSpeedySpendFlow(notarisedStxn1.id, notarisedStxn1.sigs)).resultFuture.getOrThrow()
val (_, txnStatusBobYetAgain) = bobNode.services.validatedTransactions.getTransactionInternal(finaliseStxn1.id) ?: fail() val (_, txnStatusBobYetAgain) = bobNode.services.validatedTransactions.getTransactionWithStatus(finaliseStxn1.id) ?: fail()
assertEquals(TransactionStatus.VERIFIED, txnStatusBobYetAgain) assertEquals(TransactionStatus.VERIFIED, txnStatusBobYetAgain)
} }
@ -335,7 +335,7 @@ class FinalityFlowTests : WithFinality {
} }
catch (e: UnexpectedFlowEndException) { catch (e: UnexpectedFlowEndException) {
val stxId = SecureHash.parse(e.message) val stxId = SecureHash.parse(e.message)
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail() val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionWithStatus(stxId) ?: fail()
assertEquals(TransactionStatus.IN_FLIGHT, txnStatusBob) assertEquals(TransactionStatus.IN_FLIGHT, txnStatusBob)
} }
} }

View File

@ -221,6 +221,7 @@ open class DataVendingFlow(val otherSessions: Set<FlowSession>, val payload: Any
numSent++ numSent++
tx tx
} }
FetchDataFlow.DataType.TRANSACTION_RECOVERY -> NotImplementedError("Enterprise only feature")
// Loop on all items returned using dataRequest.hashes.map: // Loop on all items returned using dataRequest.hashes.map:
FetchDataFlow.DataType.BATCH_TRANSACTION -> dataRequest.hashes.map { txId -> FetchDataFlow.DataType.BATCH_TRANSACTION -> dataRequest.hashes.map { txId ->
if (!authorisedTransactions.isAuthorised(txId)) { if (!authorisedTransactions.isAuthorised(txId)) {

View File

@ -12,6 +12,7 @@ import net.corda.core.flows.MaybeSerializedSignedTransaction
import net.corda.core.internal.FetchDataFlow.DownloadedVsRequestedDataMismatch import net.corda.core.internal.FetchDataFlow.DownloadedVsRequestedDataMismatch
import net.corda.core.internal.FetchDataFlow.HashNotFound import net.corda.core.internal.FetchDataFlow.HashNotFound
import net.corda.core.node.NetworkParameters import net.corda.core.node.NetworkParameters
import net.corda.core.node.services.SignedTransactionWithStatus
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.CordaSerializationTransformEnumDefault import net.corda.core.serialization.CordaSerializationTransformEnumDefault
import net.corda.core.serialization.CordaSerializationTransformEnumDefaults import net.corda.core.serialization.CordaSerializationTransformEnumDefaults
@ -82,7 +83,7 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
) )
@CordaSerializable @CordaSerializable
enum class DataType { enum class DataType {
TRANSACTION, ATTACHMENT, PARAMETERS, BATCH_TRANSACTION, UNKNOWN TRANSACTION, ATTACHMENT, PARAMETERS, BATCH_TRANSACTION, UNKNOWN, TRANSACTION_RECOVERY
} }
@Suspendable @Suspendable
@ -267,12 +268,19 @@ class FetchAttachmentsFlow(requests: Set<SecureHash>,
* Authorisation is accorded only on valid ancestors of the root transaction. * Authorisation is accorded only on valid ancestors of the root transaction.
* Note that returned transactions are not inserted into the database, because it's up to the caller to actually verify the transactions are valid. * Note that returned transactions are not inserted into the database, because it's up to the caller to actually verify the transactions are valid.
*/ */
class FetchTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowSession) : class FetchTransactionsFlow @JvmOverloads constructor(requests: Set<SecureHash>, otherSide: FlowSession, dataType: DataType = DataType.TRANSACTION) :
FetchDataFlow<SignedTransaction, SignedTransaction>(requests, otherSide, DataType.TRANSACTION) { FetchDataFlow<SignedTransaction, SignedTransaction>(requests, otherSide, dataType) {
override fun load(txid: SecureHash): SignedTransaction? = serviceHub.validatedTransactions.getTransaction(txid) override fun load(txid: SecureHash): SignedTransaction? = serviceHub.validatedTransactions.getTransaction(txid)
} }
// Used by Enterprise Ledger Recovery
class FetchRecoverableTransactionsFlow @JvmOverloads constructor(requests: Set<SecureHash>, otherSide: FlowSession, dataType: DataType = DataType.TRANSACTION_RECOVERY) :
FetchDataFlow<SignedTransactionWithStatus, SignedTransactionWithStatus>(requests, otherSide, dataType) {
override fun load(txid: SecureHash): SignedTransactionWithStatus? = serviceHub.validatedTransactions.getTransactionWithStatus(txid)
}
class FetchBatchTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowSession) : class FetchBatchTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowSession) :
FetchDataFlow<MaybeSerializedSignedTransaction, MaybeSerializedSignedTransaction>(requests, otherSide, DataType.BATCH_TRANSACTION) { FetchDataFlow<MaybeSerializedSignedTransaction, MaybeSerializedSignedTransaction>(requests, otherSide, DataType.BATCH_TRANSACTION) {

View File

@ -20,7 +20,8 @@ class ResolveTransactionsFlow private constructor(
val txHashes: Set<SecureHash>, val txHashes: Set<SecureHash>,
val otherSide: FlowSession, val otherSide: FlowSession,
val statesToRecord: StatesToRecord, val statesToRecord: StatesToRecord,
val deferredAck: Boolean = false val deferredAck: Boolean = false,
val recoveryMode: Boolean = false
) : FlowLogic<Unit>() { ) : FlowLogic<Unit>() {
constructor(txHashes: Set<SecureHash>, otherSide: FlowSession, statesToRecord: StatesToRecord = StatesToRecord.NONE) constructor(txHashes: Set<SecureHash>, otherSide: FlowSession, statesToRecord: StatesToRecord = StatesToRecord.NONE)
@ -29,6 +30,9 @@ class ResolveTransactionsFlow private constructor(
constructor(txHashes: Set<SecureHash>, otherSide: FlowSession, statesToRecord: StatesToRecord, deferredAck: Boolean) constructor(txHashes: Set<SecureHash>, otherSide: FlowSession, statesToRecord: StatesToRecord, deferredAck: Boolean)
: this(null, txHashes, otherSide, statesToRecord, deferredAck) : this(null, txHashes, otherSide, statesToRecord, deferredAck)
constructor(txHashes: Set<SecureHash>, otherSide: FlowSession, statesToRecord: StatesToRecord, deferredAck: Boolean, recoveryMode: Boolean )
: this(null, txHashes, otherSide, statesToRecord, deferredAck, recoveryMode)
/** /**
* Resolves and validates the dependencies of the specified [SignedTransaction]. Fetches the attachments, but does * Resolves and validates the dependencies of the specified [SignedTransaction]. Fetches the attachments, but does
* *not* validate or store the [SignedTransaction] itself. * *not* validate or store the [SignedTransaction] itself.
@ -63,7 +67,7 @@ class ResolveTransactionsFlow private constructor(
} }
val resolver = (serviceHub as ServiceHubCoreInternal).createTransactionsResolver(this) val resolver = (serviceHub as ServiceHubCoreInternal).createTransactionsResolver(this)
resolver.downloadDependencies(batchMode) resolver.downloadDependencies(batchMode, recoveryMode)
if (!deferredAck) { if (!deferredAck) {
logger.trace { "ResolveTransactionsFlow: Sending END." } logger.trace { "ResolveTransactionsFlow: Sending END." }

View File

@ -85,7 +85,7 @@ interface ServiceHubCoreInternal : ServiceHub {
interface TransactionsResolver { interface TransactionsResolver {
@Suspendable @Suspendable
fun downloadDependencies(batchMode: Boolean) fun downloadDependencies(batchMode: Boolean, recoveryMode: Boolean)
@Suspendable @Suspendable
fun recordDependencies(usedStatesToRecord: StatesToRecord) fun recordDependencies(usedStatesToRecord: StatesToRecord)

View File

@ -2,8 +2,11 @@ package net.corda.core.node.services
import net.corda.core.DoNotImplement import net.corda.core.DoNotImplement
import net.corda.core.concurrent.CordaFuture import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.NamedByHash
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.flows.TransactionStatus
import net.corda.core.messaging.DataFeed import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import rx.Observable import rx.Observable
@ -17,6 +20,11 @@ interface TransactionStorage {
*/ */
fun getTransaction(id: SecureHash): SignedTransaction? fun getTransaction(id: SecureHash): SignedTransaction?
/**
* Return the transaction with its status for the given [id], or null if no such transaction exists.
*/
fun getTransactionWithStatus(id: SecureHash): SignedTransactionWithStatus?
/** /**
* Get a synchronous Observable of updates. When observations are pushed to the Observer, the vault will already * Get a synchronous Observable of updates. When observations are pushed to the Observer, the vault will already
* incorporate the update. * incorporate the update.
@ -33,3 +41,12 @@ interface TransactionStorage {
*/ */
fun trackTransaction(id: SecureHash): CordaFuture<SignedTransaction> fun trackTransaction(id: SecureHash): CordaFuture<SignedTransaction>
} }
@CordaSerializable
data class SignedTransactionWithStatus(
val stx: SignedTransaction,
val status: TransactionStatus
) : NamedByHash {
override val id: SecureHash
get() = stx.id
}

View File

@ -11,8 +11,8 @@ import net.corda.core.internal.dependencies
import net.corda.core.node.StatesToRecord import net.corda.core.node.StatesToRecord
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.debug import net.corda.core.utilities.debug
import net.corda.core.utilities.trace
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.core.utilities.trace
import net.corda.node.services.api.WritableTransactionStorage import net.corda.node.services.api.WritableTransactionStorage
import java.util.* import java.util.*
@ -21,7 +21,8 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
private val logger = flow.logger private val logger = flow.logger
@Suspendable @Suspendable
override fun downloadDependencies(batchMode: Boolean) { override fun downloadDependencies(batchMode: Boolean, recoveryMode: Boolean) {
if (recoveryMode) throw NotImplementedError("Enterprise only Ledger Recovery feature")
logger.debug { "Downloading dependencies for transactions ${flow.txHashes}" } logger.debug { "Downloading dependencies for transactions ${flow.txHashes}" }
val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage
@ -99,13 +100,12 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
override fun recordDependencies(usedStatesToRecord: StatesToRecord) { override fun recordDependencies(usedStatesToRecord: StatesToRecord) {
val sortedDependencies = checkNotNull(this.sortedDependencies) val sortedDependencies = checkNotNull(this.sortedDependencies)
logger.trace { "Recording ${sortedDependencies.size} dependencies for ${flow.txHashes.size} transactions" } logger.trace { "Recording ${sortedDependencies.size} dependencies for ${flow.txHashes.size} transactions" }
val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage
for (txId in sortedDependencies) { for (txId in sortedDependencies) {
// Retrieve and delete the transaction from the unverified store. // Retrieve and delete the transaction from the unverified store.
val (tx, txStatus) = checkNotNull(transactionStorage.getTransactionInternal(txId)) { val (tx, txStatus) = checkNotNull(flow.serviceHub.validatedTransactions.getTransactionWithStatus(txId)) {
"Somehow the unverified transaction ($txId) that we stored previously is no longer there." "Somehow the unverified transaction ($txId) that we stored previously is no longer there."
} }
if (txStatus != TransactionStatus.VERIFIED) { if (txStatus == TransactionStatus.UNVERIFIED) {
tx.verify(flow.serviceHub) tx.verify(flow.serviceHub)
flow.serviceHub.recordTransactions(usedStatesToRecord, listOf(tx)) flow.serviceHub.recordTransactions(usedStatesToRecord, listOf(tx))
} else { } else {

View File

@ -7,7 +7,6 @@ import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.TransactionMetadata import net.corda.core.flows.TransactionMetadata
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.flows.TransactionStatus
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.FlowStateMachineHandle import net.corda.core.internal.FlowStateMachineHandle
import net.corda.core.internal.NamedCacheFactory import net.corda.core.internal.NamedCacheFactory
@ -416,12 +415,6 @@ interface WritableTransactionStorage : TransactionStorage {
*/ */
fun addUnverifiedTransaction(transaction: SignedTransaction) fun addUnverifiedTransaction(transaction: SignedTransaction)
/**
* Return the transaction with the given ID from the store, and its associated [TransactionStatus].
* Returns null if no transaction with the ID exists.
*/
fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, TransactionStatus>?
/** /**
* Returns a future that completes with the transaction corresponding to [id] once it has been committed. Do not warn when run inside * Returns a future that completes with the transaction corresponding to [id] once it has been committed. Do not warn when run inside
* a DB transaction. * a DB transaction.

View File

@ -11,6 +11,7 @@ import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.messaging.DataFeed import net.corda.core.messaging.DataFeed
import net.corda.core.node.services.SignedTransactionWithStatus
import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.SerializedBytes
@ -314,6 +315,11 @@ open class DBTransactionStorage(private val database: CordaPersistence, cacheFac
} }
} }
override fun getTransactionWithStatus(id: SecureHash): SignedTransactionWithStatus? =
database.transaction {
txStorage.content[id]?.let { SignedTransactionWithStatus(it.toSignedTx(), it.status.toTransactionStatus()) }
}
override fun addUnverifiedTransaction(transaction: SignedTransaction) { override fun addUnverifiedTransaction(transaction: SignedTransaction) {
if (transaction.coreTransaction is WireTransaction) if (transaction.coreTransaction is WireTransaction)
transaction.verifyRequiredSignatures() transaction.verifyRequiredSignatures()
@ -335,12 +341,6 @@ open class DBTransactionStorage(private val database: CordaPersistence, cacheFac
} }
} }
override fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, net.corda.core.flows.TransactionStatus>? {
return database.transaction {
txStorage.content[id]?.let { it.toSignedTx() to it.status.toTransactionStatus() }
}
}
private val updatesPublisher = PublishSubject.create<SignedTransaction>().toSerialized() private val updatesPublisher = PublishSubject.create<SignedTransaction>().toSerialized()
override val updates: Observable<SignedTransaction> = updatesPublisher.wrapWithDatabaseTransaction() override val updates: Observable<SignedTransaction> = updatesPublisher.wrapWithDatabaseTransaction()

View File

@ -17,11 +17,10 @@ import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.TransactionSignature import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession import net.corda.core.flows.FlowSession
import net.corda.core.flows.TransactionMetadata
import net.corda.core.flows.InitiatedBy import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.flows.TransactionStatus import net.corda.core.flows.TransactionMetadata
import net.corda.core.identity.AbstractParty import net.corda.core.identity.AbstractParty
import net.corda.core.identity.AnonymousParty import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
@ -31,6 +30,7 @@ import net.corda.core.internal.concurrent.map
import net.corda.core.internal.rootCause import net.corda.core.internal.rootCause
import net.corda.core.messaging.DataFeed import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.StateMachineTransactionMapping import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.node.services.SignedTransactionWithStatus
import net.corda.core.node.services.Vault import net.corda.core.node.services.Vault
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
@ -44,12 +44,12 @@ import net.corda.core.utilities.toNonEmptySet
import net.corda.core.utilities.unwrap import net.corda.core.utilities.unwrap
import net.corda.coretesting.internal.TEST_TX_TIME import net.corda.coretesting.internal.TEST_TX_TIME
import net.corda.finance.DOLLARS import net.corda.finance.DOLLARS
import net.corda.finance.`issued by`
import net.corda.finance.contracts.CommercialPaper import net.corda.finance.contracts.CommercialPaper
import net.corda.finance.contracts.asset.CASH import net.corda.finance.contracts.asset.CASH
import net.corda.finance.contracts.asset.Cash import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.TwoPartyTradeFlow.Buyer import net.corda.finance.flows.TwoPartyTradeFlow.Buyer
import net.corda.finance.flows.TwoPartyTradeFlow.Seller import net.corda.finance.flows.TwoPartyTradeFlow.Seller
import net.corda.finance.`issued by`
import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.WritableTransactionStorage import net.corda.node.services.api.WritableTransactionStorage
import net.corda.node.services.persistence.DBTransactionStorage import net.corda.node.services.persistence.DBTransactionStorage
@ -857,12 +857,12 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
} }
} }
override fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, TransactionStatus>? { override fun getTransactionWithStatus(id: SecureHash): SignedTransactionWithStatus? {
return database.transaction { return database.transaction {
delegate.getTransactionInternal(id) records.add(TxRecord.Get(id))
delegate.getTransactionWithStatus(id)
} }
} }
} }
interface TxRecord { interface TxRecord {

View File

@ -308,7 +308,7 @@ class DBTransactionStorageLedgerRecoveryTests {
assertEquals(true, transactionRecovery.removeUnnotarisedTransaction(senderTransaction.id)) assertEquals(true, transactionRecovery.removeUnnotarisedTransaction(senderTransaction.id))
assertFailsWith<AssertionError> { readTransactionFromDB(senderTransaction.id).status } assertFailsWith<AssertionError> { readTransactionFromDB(senderTransaction.id).status }
assertEquals(0, readSenderDistributionRecordFromDB(senderTransaction.id).size) assertEquals(0, readSenderDistributionRecordFromDB(senderTransaction.id).size)
assertNull(transactionRecovery.getTransactionInternal(senderTransaction.id)) assertNull(transactionRecovery.getTransactionWithStatus(senderTransaction.id))
val receiverTransaction = newTransaction(notarySig = false) val receiverTransaction = newTransaction(notarySig = false)
transactionRecovery.addUnnotarisedTransaction(receiverTransaction) transactionRecovery.addUnnotarisedTransaction(receiverTransaction)
@ -322,7 +322,7 @@ class DBTransactionStorageLedgerRecoveryTests {
assertEquals(true, transactionRecovery.removeUnnotarisedTransaction(receiverTransaction.id)) assertEquals(true, transactionRecovery.removeUnnotarisedTransaction(receiverTransaction.id))
assertFailsWith<AssertionError> { readTransactionFromDB(receiverTransaction.id).status } assertFailsWith<AssertionError> { readTransactionFromDB(receiverTransaction.id).status }
assertFailsWith<AssertionError> { readReceiverDistributionRecordFromDB(receiverTransaction.id) } assertFailsWith<AssertionError> { readReceiverDistributionRecordFromDB(receiverTransaction.id) }
assertNull(transactionRecovery.getTransactionInternal(receiverTransaction.id)) assertNull(transactionRecovery.getTransactionWithStatus(receiverTransaction.id))
} }
@Test(timeout = 300_000) @Test(timeout = 300_000)

View File

@ -170,7 +170,7 @@ class DBTransactionStorageTests {
assertEquals(true, transactionStorage.removeUnnotarisedTransaction(transaction.id)) assertEquals(true, transactionStorage.removeUnnotarisedTransaction(transaction.id))
assertFailsWith<AssertionError> { readTransactionFromDB(transaction.id).status } assertFailsWith<AssertionError> { readTransactionFromDB(transaction.id).status }
assertNull(transactionStorage.getTransactionInternal(transaction.id)) assertNull(transactionStorage.getTransactionWithStatus(transaction.id))
} }
@Test(timeout = 300_000) @Test(timeout = 300_000)

View File

@ -12,6 +12,7 @@ import net.corda.node.services.api.WritableTransactionStorage
import net.corda.core.flows.TransactionMetadata import net.corda.core.flows.TransactionMetadata
import net.corda.core.flows.TransactionStatus import net.corda.core.flows.TransactionStatus
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.node.services.SignedTransactionWithStatus
import net.corda.testing.node.MockServices import net.corda.testing.node.MockServices
import rx.Observable import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
@ -86,9 +87,9 @@ open class MockTransactionStorage : WritableTransactionStorage, SingletonSeriali
txns.putIfAbsent(transaction.id, TxHolder(transaction, status = TransactionStatus.UNVERIFIED)) txns.putIfAbsent(transaction.id, TxHolder(transaction, status = TransactionStatus.UNVERIFIED))
} }
override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id]?.let { if (it.status == TransactionStatus.VERIFIED) it.stx else null } override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id]?.let { if (it.isVerified) it.stx else null }
override fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, TransactionStatus>? = txns[id]?.let { Pair(it.stx, it.status) } override fun getTransactionWithStatus(id: SecureHash): SignedTransactionWithStatus? = txns[id]?.let { SignedTransactionWithStatus(it.stx, it.status) }
private class TxHolder(val stx: SignedTransaction, var status: TransactionStatus) { private class TxHolder(val stx: SignedTransaction, var status: TransactionStatus) {
val isVerified = status == TransactionStatus.VERIFIED val isVerified = status == TransactionStatus.VERIFIED