mirror of
https://github.com/corda/corda.git
synced 2025-01-18 02:39:51 +00:00
ENT-10100 Changes required to support recovery of IN_FLIGHT transactions. (#7541)
This commit is contained in:
parent
7556b9a432
commit
bc718088fe
@ -182,9 +182,9 @@ class FinalityFlowTests : WithFinality {
|
||||
val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow()
|
||||
val stx = aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
|
||||
|
||||
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
|
||||
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusAlice)
|
||||
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
|
||||
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusBob)
|
||||
|
||||
try {
|
||||
@ -192,10 +192,10 @@ class FinalityFlowTests : WithFinality {
|
||||
}
|
||||
catch (e: NotaryException) {
|
||||
val stxId = (e.error as NotaryError.Conflict).txId
|
||||
assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId))
|
||||
assertNull(aliceNode.services.validatedTransactions.getTransactionWithStatus(stxId))
|
||||
// Note: double spend error not propagated to peers by default (corDapp PV = 3)
|
||||
// Un-notarised txn clean-up occurs in ReceiveFinalityFlow upon receipt of UnexpectedFlowEndException
|
||||
assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId))
|
||||
assertNull(aliceNode.services.validatedTransactions.getTransactionWithStatus(stxId))
|
||||
assertTxnRemovedFromDatabase(aliceNode, stxId)
|
||||
}
|
||||
}
|
||||
@ -207,9 +207,9 @@ class FinalityFlowTests : WithFinality {
|
||||
val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow()
|
||||
val stx = aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
|
||||
|
||||
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
|
||||
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusAlice)
|
||||
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
|
||||
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusBob)
|
||||
|
||||
try {
|
||||
@ -217,9 +217,9 @@ class FinalityFlowTests : WithFinality {
|
||||
}
|
||||
catch (e: NotaryException) {
|
||||
val stxId = (e.error as NotaryError.Conflict).txId
|
||||
assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId))
|
||||
assertNull(aliceNode.services.validatedTransactions.getTransactionWithStatus(stxId))
|
||||
assertTxnRemovedFromDatabase(aliceNode, stxId)
|
||||
assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId))
|
||||
assertNull(bobNode.services.validatedTransactions.getTransactionWithStatus(stxId))
|
||||
assertTxnRemovedFromDatabase(bobNode, stxId)
|
||||
}
|
||||
|
||||
@ -228,9 +228,9 @@ class FinalityFlowTests : WithFinality {
|
||||
}
|
||||
catch (e: NotaryException) {
|
||||
val stxId = (e.error as NotaryError.Conflict).txId
|
||||
assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId))
|
||||
assertNull(aliceNode.services.validatedTransactions.getTransactionWithStatus(stxId))
|
||||
assertTxnRemovedFromDatabase(aliceNode, stxId)
|
||||
val (_, txnStatus) = bobNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail()
|
||||
val (_, txnStatus) = bobNode.services.validatedTransactions.getTransactionWithStatus(stxId) ?: fail()
|
||||
assertEquals(TransactionStatus.IN_FLIGHT, txnStatus)
|
||||
}
|
||||
}
|
||||
@ -252,9 +252,9 @@ class FinalityFlowTests : WithFinality {
|
||||
val ref = bobNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow()
|
||||
val stx = bobNode.startFlowAndRunNetwork(SpendFlow(ref, aliceNode.info.singleIdentity())).resultFuture.getOrThrow()
|
||||
|
||||
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
|
||||
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusAlice)
|
||||
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
|
||||
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusBob)
|
||||
|
||||
try {
|
||||
@ -262,9 +262,9 @@ class FinalityFlowTests : WithFinality {
|
||||
}
|
||||
catch (e: NotaryException) {
|
||||
val stxId = (e.error as NotaryError.Conflict).txId
|
||||
assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId))
|
||||
assertNull(bobNode.services.validatedTransactions.getTransactionWithStatus(stxId))
|
||||
assertTxnRemovedFromDatabase(bobNode, stxId)
|
||||
assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId))
|
||||
assertNull(aliceNode.services.validatedTransactions.getTransactionWithStatus(stxId))
|
||||
assertTxnRemovedFromDatabase(aliceNode, stxId)
|
||||
}
|
||||
}
|
||||
@ -276,9 +276,9 @@ class FinalityFlowTests : WithFinality {
|
||||
val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow()
|
||||
val stx = aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
|
||||
|
||||
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
|
||||
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusAlice)
|
||||
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
|
||||
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionWithStatus(stx.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusBob)
|
||||
|
||||
try {
|
||||
@ -286,9 +286,9 @@ class FinalityFlowTests : WithFinality {
|
||||
}
|
||||
catch (e: NotaryException) {
|
||||
val stxId = (e.error as NotaryError.Conflict).txId
|
||||
assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId))
|
||||
assertNull(aliceNode.services.validatedTransactions.getTransactionWithStatus(stxId))
|
||||
assertTxnRemovedFromDatabase(aliceNode, stxId)
|
||||
assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId))
|
||||
assertNull(bobNode.services.validatedTransactions.getTransactionWithStatus(stxId))
|
||||
assertTxnRemovedFromDatabase(bobNode, stxId)
|
||||
}
|
||||
}
|
||||
@ -300,9 +300,9 @@ class FinalityFlowTests : WithFinality {
|
||||
val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow()
|
||||
val notarisedStxn1 = aliceNode.startFlowAndRunNetwork(SpeedySpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
|
||||
|
||||
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail()
|
||||
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionWithStatus(notarisedStxn1.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusAlice)
|
||||
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail()
|
||||
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionWithStatus(notarisedStxn1.id) ?: fail()
|
||||
assertEquals(TransactionStatus.IN_FLIGHT, txnStatusBob)
|
||||
|
||||
// now lets attempt a new spend with the new output of the previous transaction
|
||||
@ -311,17 +311,17 @@ class FinalityFlowTests : WithFinality {
|
||||
|
||||
// the original transaction is now finalised at Bob (despite the original flow not completing) because Bob resolved the
|
||||
// original transaction from Alice in the second transaction (and Alice had already notarised and finalised the original transaction)
|
||||
val (_, txnStatusBobAgain) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail()
|
||||
val (_, txnStatusBobAgain) = bobNode.services.validatedTransactions.getTransactionWithStatus(notarisedStxn1.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusBobAgain)
|
||||
|
||||
val (_, txnStatusAlice2) = aliceNode.services.validatedTransactions.getTransactionInternal(notarisedStxn2.id) ?: fail()
|
||||
val (_, txnStatusAlice2) = aliceNode.services.validatedTransactions.getTransactionWithStatus(notarisedStxn2.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusAlice2)
|
||||
val (_, txnStatusBob2) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn2.id) ?: fail()
|
||||
val (_, txnStatusBob2) = bobNode.services.validatedTransactions.getTransactionWithStatus(notarisedStxn2.id) ?: fail()
|
||||
assertEquals(TransactionStatus.IN_FLIGHT, txnStatusBob2)
|
||||
|
||||
// Validate attempt at flow finalisation by Bob has no effect on outcome.
|
||||
val finaliseStxn1 = bobNode.startFlowAndRunNetwork(FinaliseSpeedySpendFlow(notarisedStxn1.id, notarisedStxn1.sigs)).resultFuture.getOrThrow()
|
||||
val (_, txnStatusBobYetAgain) = bobNode.services.validatedTransactions.getTransactionInternal(finaliseStxn1.id) ?: fail()
|
||||
val (_, txnStatusBobYetAgain) = bobNode.services.validatedTransactions.getTransactionWithStatus(finaliseStxn1.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusBobYetAgain)
|
||||
}
|
||||
|
||||
@ -335,7 +335,7 @@ class FinalityFlowTests : WithFinality {
|
||||
}
|
||||
catch (e: UnexpectedFlowEndException) {
|
||||
val stxId = SecureHash.parse(e.message)
|
||||
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail()
|
||||
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionWithStatus(stxId) ?: fail()
|
||||
assertEquals(TransactionStatus.IN_FLIGHT, txnStatusBob)
|
||||
}
|
||||
}
|
||||
|
@ -221,6 +221,7 @@ open class DataVendingFlow(val otherSessions: Set<FlowSession>, val payload: Any
|
||||
numSent++
|
||||
tx
|
||||
}
|
||||
FetchDataFlow.DataType.TRANSACTION_RECOVERY -> NotImplementedError("Enterprise only feature")
|
||||
// Loop on all items returned using dataRequest.hashes.map:
|
||||
FetchDataFlow.DataType.BATCH_TRANSACTION -> dataRequest.hashes.map { txId ->
|
||||
if (!authorisedTransactions.isAuthorised(txId)) {
|
||||
|
@ -12,6 +12,7 @@ import net.corda.core.flows.MaybeSerializedSignedTransaction
|
||||
import net.corda.core.internal.FetchDataFlow.DownloadedVsRequestedDataMismatch
|
||||
import net.corda.core.internal.FetchDataFlow.HashNotFound
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.services.SignedTransactionWithStatus
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.CordaSerializationTransformEnumDefault
|
||||
import net.corda.core.serialization.CordaSerializationTransformEnumDefaults
|
||||
@ -82,7 +83,7 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
|
||||
)
|
||||
@CordaSerializable
|
||||
enum class DataType {
|
||||
TRANSACTION, ATTACHMENT, PARAMETERS, BATCH_TRANSACTION, UNKNOWN
|
||||
TRANSACTION, ATTACHMENT, PARAMETERS, BATCH_TRANSACTION, UNKNOWN, TRANSACTION_RECOVERY
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
@ -267,12 +268,19 @@ class FetchAttachmentsFlow(requests: Set<SecureHash>,
|
||||
* Authorisation is accorded only on valid ancestors of the root transaction.
|
||||
* Note that returned transactions are not inserted into the database, because it's up to the caller to actually verify the transactions are valid.
|
||||
*/
|
||||
class FetchTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowSession) :
|
||||
FetchDataFlow<SignedTransaction, SignedTransaction>(requests, otherSide, DataType.TRANSACTION) {
|
||||
class FetchTransactionsFlow @JvmOverloads constructor(requests: Set<SecureHash>, otherSide: FlowSession, dataType: DataType = DataType.TRANSACTION) :
|
||||
FetchDataFlow<SignedTransaction, SignedTransaction>(requests, otherSide, dataType) {
|
||||
|
||||
override fun load(txid: SecureHash): SignedTransaction? = serviceHub.validatedTransactions.getTransaction(txid)
|
||||
}
|
||||
|
||||
// Used by Enterprise Ledger Recovery
|
||||
class FetchRecoverableTransactionsFlow @JvmOverloads constructor(requests: Set<SecureHash>, otherSide: FlowSession, dataType: DataType = DataType.TRANSACTION_RECOVERY) :
|
||||
FetchDataFlow<SignedTransactionWithStatus, SignedTransactionWithStatus>(requests, otherSide, dataType) {
|
||||
|
||||
override fun load(txid: SecureHash): SignedTransactionWithStatus? = serviceHub.validatedTransactions.getTransactionWithStatus(txid)
|
||||
}
|
||||
|
||||
class FetchBatchTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowSession) :
|
||||
FetchDataFlow<MaybeSerializedSignedTransaction, MaybeSerializedSignedTransaction>(requests, otherSide, DataType.BATCH_TRANSACTION) {
|
||||
|
||||
|
@ -20,7 +20,8 @@ class ResolveTransactionsFlow private constructor(
|
||||
val txHashes: Set<SecureHash>,
|
||||
val otherSide: FlowSession,
|
||||
val statesToRecord: StatesToRecord,
|
||||
val deferredAck: Boolean = false
|
||||
val deferredAck: Boolean = false,
|
||||
val recoveryMode: Boolean = false
|
||||
) : FlowLogic<Unit>() {
|
||||
|
||||
constructor(txHashes: Set<SecureHash>, otherSide: FlowSession, statesToRecord: StatesToRecord = StatesToRecord.NONE)
|
||||
@ -29,6 +30,9 @@ class ResolveTransactionsFlow private constructor(
|
||||
constructor(txHashes: Set<SecureHash>, otherSide: FlowSession, statesToRecord: StatesToRecord, deferredAck: Boolean)
|
||||
: this(null, txHashes, otherSide, statesToRecord, deferredAck)
|
||||
|
||||
constructor(txHashes: Set<SecureHash>, otherSide: FlowSession, statesToRecord: StatesToRecord, deferredAck: Boolean, recoveryMode: Boolean )
|
||||
: this(null, txHashes, otherSide, statesToRecord, deferredAck, recoveryMode)
|
||||
|
||||
/**
|
||||
* Resolves and validates the dependencies of the specified [SignedTransaction]. Fetches the attachments, but does
|
||||
* *not* validate or store the [SignedTransaction] itself.
|
||||
@ -63,7 +67,7 @@ class ResolveTransactionsFlow private constructor(
|
||||
}
|
||||
|
||||
val resolver = (serviceHub as ServiceHubCoreInternal).createTransactionsResolver(this)
|
||||
resolver.downloadDependencies(batchMode)
|
||||
resolver.downloadDependencies(batchMode, recoveryMode)
|
||||
|
||||
if (!deferredAck) {
|
||||
logger.trace { "ResolveTransactionsFlow: Sending END." }
|
||||
|
@ -85,7 +85,7 @@ interface ServiceHubCoreInternal : ServiceHub {
|
||||
|
||||
interface TransactionsResolver {
|
||||
@Suspendable
|
||||
fun downloadDependencies(batchMode: Boolean)
|
||||
fun downloadDependencies(batchMode: Boolean, recoveryMode: Boolean)
|
||||
|
||||
@Suspendable
|
||||
fun recordDependencies(usedStatesToRecord: StatesToRecord)
|
||||
|
@ -2,8 +2,11 @@ package net.corda.core.node.services
|
||||
|
||||
import net.corda.core.DoNotImplement
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.NamedByHash
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.TransactionStatus
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import rx.Observable
|
||||
|
||||
@ -17,6 +20,11 @@ interface TransactionStorage {
|
||||
*/
|
||||
fun getTransaction(id: SecureHash): SignedTransaction?
|
||||
|
||||
/**
|
||||
* Return the transaction with its status for the given [id], or null if no such transaction exists.
|
||||
*/
|
||||
fun getTransactionWithStatus(id: SecureHash): SignedTransactionWithStatus?
|
||||
|
||||
/**
|
||||
* Get a synchronous Observable of updates. When observations are pushed to the Observer, the vault will already
|
||||
* incorporate the update.
|
||||
@ -32,4 +40,13 @@ interface TransactionStorage {
|
||||
* Returns a future that completes with the transaction corresponding to [id] once it has been committed
|
||||
*/
|
||||
fun trackTransaction(id: SecureHash): CordaFuture<SignedTransaction>
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
data class SignedTransactionWithStatus(
|
||||
val stx: SignedTransaction,
|
||||
val status: TransactionStatus
|
||||
) : NamedByHash {
|
||||
override val id: SecureHash
|
||||
get() = stx.id
|
||||
}
|
@ -11,8 +11,8 @@ import net.corda.core.internal.dependencies
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.api.WritableTransactionStorage
|
||||
import java.util.*
|
||||
|
||||
@ -21,7 +21,8 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
|
||||
private val logger = flow.logger
|
||||
|
||||
@Suspendable
|
||||
override fun downloadDependencies(batchMode: Boolean) {
|
||||
override fun downloadDependencies(batchMode: Boolean, recoveryMode: Boolean) {
|
||||
if (recoveryMode) throw NotImplementedError("Enterprise only Ledger Recovery feature")
|
||||
logger.debug { "Downloading dependencies for transactions ${flow.txHashes}" }
|
||||
val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage
|
||||
|
||||
@ -99,13 +100,12 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
|
||||
override fun recordDependencies(usedStatesToRecord: StatesToRecord) {
|
||||
val sortedDependencies = checkNotNull(this.sortedDependencies)
|
||||
logger.trace { "Recording ${sortedDependencies.size} dependencies for ${flow.txHashes.size} transactions" }
|
||||
val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage
|
||||
for (txId in sortedDependencies) {
|
||||
// Retrieve and delete the transaction from the unverified store.
|
||||
val (tx, txStatus) = checkNotNull(transactionStorage.getTransactionInternal(txId)) {
|
||||
val (tx, txStatus) = checkNotNull(flow.serviceHub.validatedTransactions.getTransactionWithStatus(txId)) {
|
||||
"Somehow the unverified transaction ($txId) that we stored previously is no longer there."
|
||||
}
|
||||
if (txStatus != TransactionStatus.VERIFIED) {
|
||||
if (txStatus == TransactionStatus.UNVERIFIED) {
|
||||
tx.verify(flow.serviceHub)
|
||||
flow.serviceHub.recordTransactions(usedStatesToRecord, listOf(tx))
|
||||
} else {
|
||||
|
@ -7,7 +7,6 @@ import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.TransactionMetadata
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.flows.TransactionStatus
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.FlowStateMachineHandle
|
||||
import net.corda.core.internal.NamedCacheFactory
|
||||
@ -416,12 +415,6 @@ interface WritableTransactionStorage : TransactionStorage {
|
||||
*/
|
||||
fun addUnverifiedTransaction(transaction: SignedTransaction)
|
||||
|
||||
/**
|
||||
* Return the transaction with the given ID from the store, and its associated [TransactionStatus].
|
||||
* Returns null if no transaction with the ID exists.
|
||||
*/
|
||||
fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, TransactionStatus>?
|
||||
|
||||
/**
|
||||
* Returns a future that completes with the transaction corresponding to [id] once it has been committed. Do not warn when run inside
|
||||
* a DB transaction.
|
||||
|
@ -11,6 +11,7 @@ import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.internal.bufferUntilSubscribed
|
||||
import net.corda.core.internal.concurrent.doneFuture
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.node.services.SignedTransactionWithStatus
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
@ -314,6 +315,11 @@ open class DBTransactionStorage(private val database: CordaPersistence, cacheFac
|
||||
}
|
||||
}
|
||||
|
||||
override fun getTransactionWithStatus(id: SecureHash): SignedTransactionWithStatus? =
|
||||
database.transaction {
|
||||
txStorage.content[id]?.let { SignedTransactionWithStatus(it.toSignedTx(), it.status.toTransactionStatus()) }
|
||||
}
|
||||
|
||||
override fun addUnverifiedTransaction(transaction: SignedTransaction) {
|
||||
if (transaction.coreTransaction is WireTransaction)
|
||||
transaction.verifyRequiredSignatures()
|
||||
@ -335,12 +341,6 @@ open class DBTransactionStorage(private val database: CordaPersistence, cacheFac
|
||||
}
|
||||
}
|
||||
|
||||
override fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, net.corda.core.flows.TransactionStatus>? {
|
||||
return database.transaction {
|
||||
txStorage.content[id]?.let { it.toSignedTx() to it.status.toTransactionStatus() }
|
||||
}
|
||||
}
|
||||
|
||||
private val updatesPublisher = PublishSubject.create<SignedTransaction>().toSerialized()
|
||||
override val updates: Observable<SignedTransaction> = updatesPublisher.wrapWithDatabaseTransaction()
|
||||
|
||||
|
@ -17,11 +17,10 @@ import net.corda.core.crypto.SignatureMetadata
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.TransactionMetadata
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.flows.TransactionStatus
|
||||
import net.corda.core.flows.TransactionMetadata
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.AnonymousParty
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
@ -31,6 +30,7 @@ import net.corda.core.internal.concurrent.map
|
||||
import net.corda.core.internal.rootCause
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.messaging.StateMachineTransactionMapping
|
||||
import net.corda.core.node.services.SignedTransactionWithStatus
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
@ -44,12 +44,12 @@ import net.corda.core.utilities.toNonEmptySet
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.coretesting.internal.TEST_TX_TIME
|
||||
import net.corda.finance.DOLLARS
|
||||
import net.corda.finance.`issued by`
|
||||
import net.corda.finance.contracts.CommercialPaper
|
||||
import net.corda.finance.contracts.asset.CASH
|
||||
import net.corda.finance.contracts.asset.Cash
|
||||
import net.corda.finance.flows.TwoPartyTradeFlow.Buyer
|
||||
import net.corda.finance.flows.TwoPartyTradeFlow.Seller
|
||||
import net.corda.finance.`issued by`
|
||||
import net.corda.node.services.api.CheckpointStorage
|
||||
import net.corda.node.services.api.WritableTransactionStorage
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
@ -857,12 +857,12 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
|
||||
}
|
||||
}
|
||||
|
||||
override fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, TransactionStatus>? {
|
||||
override fun getTransactionWithStatus(id: SecureHash): SignedTransactionWithStatus? {
|
||||
return database.transaction {
|
||||
delegate.getTransactionInternal(id)
|
||||
records.add(TxRecord.Get(id))
|
||||
delegate.getTransactionWithStatus(id)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
interface TxRecord {
|
||||
|
@ -308,7 +308,7 @@ class DBTransactionStorageLedgerRecoveryTests {
|
||||
assertEquals(true, transactionRecovery.removeUnnotarisedTransaction(senderTransaction.id))
|
||||
assertFailsWith<AssertionError> { readTransactionFromDB(senderTransaction.id).status }
|
||||
assertEquals(0, readSenderDistributionRecordFromDB(senderTransaction.id).size)
|
||||
assertNull(transactionRecovery.getTransactionInternal(senderTransaction.id))
|
||||
assertNull(transactionRecovery.getTransactionWithStatus(senderTransaction.id))
|
||||
|
||||
val receiverTransaction = newTransaction(notarySig = false)
|
||||
transactionRecovery.addUnnotarisedTransaction(receiverTransaction)
|
||||
@ -322,7 +322,7 @@ class DBTransactionStorageLedgerRecoveryTests {
|
||||
assertEquals(true, transactionRecovery.removeUnnotarisedTransaction(receiverTransaction.id))
|
||||
assertFailsWith<AssertionError> { readTransactionFromDB(receiverTransaction.id).status }
|
||||
assertFailsWith<AssertionError> { readReceiverDistributionRecordFromDB(receiverTransaction.id) }
|
||||
assertNull(transactionRecovery.getTransactionInternal(receiverTransaction.id))
|
||||
assertNull(transactionRecovery.getTransactionWithStatus(receiverTransaction.id))
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
|
@ -170,7 +170,7 @@ class DBTransactionStorageTests {
|
||||
|
||||
assertEquals(true, transactionStorage.removeUnnotarisedTransaction(transaction.id))
|
||||
assertFailsWith<AssertionError> { readTransactionFromDB(transaction.id).status }
|
||||
assertNull(transactionStorage.getTransactionInternal(transaction.id))
|
||||
assertNull(transactionStorage.getTransactionWithStatus(transaction.id))
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
|
@ -12,6 +12,7 @@ import net.corda.node.services.api.WritableTransactionStorage
|
||||
import net.corda.core.flows.TransactionMetadata
|
||||
import net.corda.core.flows.TransactionStatus
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.node.services.SignedTransactionWithStatus
|
||||
import net.corda.testing.node.MockServices
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
@ -86,9 +87,9 @@ open class MockTransactionStorage : WritableTransactionStorage, SingletonSeriali
|
||||
txns.putIfAbsent(transaction.id, TxHolder(transaction, status = TransactionStatus.UNVERIFIED))
|
||||
}
|
||||
|
||||
override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id]?.let { if (it.status == TransactionStatus.VERIFIED) it.stx else null }
|
||||
override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id]?.let { if (it.isVerified) it.stx else null }
|
||||
|
||||
override fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, TransactionStatus>? = txns[id]?.let { Pair(it.stx, it.status) }
|
||||
override fun getTransactionWithStatus(id: SecureHash): SignedTransactionWithStatus? = txns[id]?.let { SignedTransactionWithStatus(it.stx, it.status) }
|
||||
|
||||
private class TxHolder(val stx: SignedTransaction, var status: TransactionStatus) {
|
||||
val isVerified = status == TransactionStatus.VERIFIED
|
||||
|
Loading…
Reference in New Issue
Block a user