ENT-9924 Update recording of transaction flow recovery metadata into Send/Receive transaction flows. (#7374)

This commit is contained in:
Jose Coll 2023-06-02 16:05:28 +01:00 committed by GitHub
parent 2e29e36e01
commit 2c775bcc41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 530 additions and 167 deletions

View File

@ -10,6 +10,7 @@ import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.TransactionVerificationException
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.DistributionList
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
@ -26,6 +27,7 @@ import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.TransactionMetadata
import net.corda.core.flows.TransactionStatus
import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.FetchDataFlow
import net.corda.core.internal.PLATFORM_VERSION
@ -47,9 +49,11 @@ import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.finance.issuedBy
import net.corda.finance.test.flows.CashIssueWithObserversFlow
import net.corda.finance.test.flows.CashPaymentWithObserversFlow
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.DBTransactionStorageLedgerRecovery
import net.corda.node.services.persistence.DistributionRecord
import net.corda.node.services.persistence.ReceiverDistributionRecord
import net.corda.node.services.persistence.SenderDistributionRecord
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.ALICE_NAME
@ -350,28 +354,120 @@ class FinalityFlowTests : WithFinality {
assertThat(aliceNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
assertThat(bobNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
assertThat(getSenderRecoveryData(stx.id, aliceNode.database)).isNotNull
assertThat(getReceiverRecoveryData(stx.id, bobNode.database)).isNotNull
getSenderRecoveryData(stx.id, aliceNode.database).apply {
assertEquals(1, this.size)
assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].statesToRecord)
assertEquals(BOB_NAME.hashCode().toLong(), this[0].peerPartyId)
}
getReceiverRecoveryData(stx.id, bobNode.database).apply {
assertEquals(StatesToRecord.ALL_VISIBLE, this?.statesToRecord)
assertEquals(StatesToRecord.ONLY_RELEVANT, this?.senderStatesToRecord)
assertEquals(aliceNode.info.singleIdentity().name.hashCode().toLong(), this?.initiatorPartyId)
assertEquals(mapOf(BOB_NAME.hashCode().toLong() to StatesToRecord.ALL_VISIBLE), this?.peersToStatesToRecord)
}
}
private fun getSenderRecoveryData(id: SecureHash, database: CordaPersistence): DistributionRecord? {
@Test(timeout=300_000)
fun `two phase finality flow payment transaction with observers`() {
val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY)
val charlieNode = createNode(CHARLIE_NAME, platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY)
// issue some cash
aliceNode.startFlow(CashIssueFlow(Amount(1000L, GBP), OpaqueBytes.of(1), notary)).resultFuture.getOrThrow().stx
// standard issuance with observers passed in as FinalityFlow sessions
val stx = aliceNode.startFlowAndRunNetwork(CashPaymentWithObserversFlow(
amount = Amount(100L, GBP),
recipient = bobNode.info.singleIdentity(),
observers = setOf(charlieNode.info.singleIdentity()))).resultFuture.getOrThrow()
assertThat(aliceNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
assertThat(bobNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
assertThat(charlieNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
getSenderRecoveryData(stx.id, aliceNode.database).apply {
assertEquals(2, this.size)
assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].statesToRecord)
assertEquals(BOB_NAME.hashCode().toLong(), this[0].peerPartyId)
assertEquals(StatesToRecord.ONLY_RELEVANT, this[1].statesToRecord)
assertEquals(CHARLIE_NAME.hashCode().toLong(), this[1].peerPartyId)
}
getReceiverRecoveryData(stx.id, bobNode.database).apply {
assertEquals(StatesToRecord.ONLY_RELEVANT, this?.statesToRecord)
assertEquals(StatesToRecord.ONLY_RELEVANT, this?.senderStatesToRecord)
assertEquals(aliceNode.info.singleIdentity().name.hashCode().toLong(), this?.initiatorPartyId)
// note: Charlie assertion here is using the hinted StatesToRecord value passed to it from Alice
assertEquals(mapOf(BOB_NAME.hashCode().toLong() to StatesToRecord.ONLY_RELEVANT,
CHARLIE_NAME.hashCode().toLong() to StatesToRecord.ALL_VISIBLE), this?.peersToStatesToRecord)
}
getReceiverRecoveryData(stx.id, charlieNode.database).apply {
assertEquals(StatesToRecord.ONLY_RELEVANT, this?.statesToRecord)
assertEquals(StatesToRecord.ONLY_RELEVANT, this?.senderStatesToRecord)
// note: Charlie assertion here is using actually default StatesToRecord.ONLY_RELEVANT
assertEquals(mapOf(BOB_NAME.hashCode().toLong() to StatesToRecord.ONLY_RELEVANT,
CHARLIE_NAME.hashCode().toLong() to StatesToRecord.ONLY_RELEVANT), this?.peersToStatesToRecord)
}
// exercise the new FinalityFlow observerSessions constructor parameter
val stx3 = aliceNode.startFlowAndRunNetwork(CashPaymentWithObserversFlow(
amount = Amount(100L, GBP),
recipient = bobNode.info.singleIdentity(),
observers = setOf(charlieNode.info.singleIdentity()),
useObserverSessions = true)).resultFuture.getOrThrow()
assertThat(aliceNode.services.validatedTransactions.getTransaction(stx3.id)).isNotNull
assertThat(bobNode.services.validatedTransactions.getTransaction(stx3.id)).isNotNull
assertThat(charlieNode.services.validatedTransactions.getTransaction(stx3.id)).isNotNull
assertEquals(2, getSenderRecoveryData(stx3.id, aliceNode.database).size)
assertThat(getReceiverRecoveryData(stx3.id, bobNode.database)).isNotNull
assertThat(getReceiverRecoveryData(stx3.id, charlieNode.database)).isNotNull
}
@Test(timeout=300_000)
fun `two phase finality flow payment transaction using confidential identities`() {
val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY)
aliceNode.startFlow(CashIssueFlow(Amount(1000L, GBP), OpaqueBytes.of(1), notary)).resultFuture.getOrThrow().stx
val stx = aliceNode.startFlowAndRunNetwork(CashPaymentFlow(
amount = Amount(100L, GBP),
recipient = bobNode.info.singleIdentity(),
anonymous = true)).resultFuture.getOrThrow().stx
assertThat(aliceNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
assertThat(bobNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
getSenderRecoveryData(stx.id, aliceNode.database).apply {
assertEquals(1, this.size)
assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].statesToRecord)
assertEquals(BOB_NAME.hashCode().toLong(), this[0].peerPartyId)
}
getReceiverRecoveryData(stx.id, bobNode.database).apply {
assertEquals(StatesToRecord.ONLY_RELEVANT, this?.statesToRecord)
assertEquals(StatesToRecord.ONLY_RELEVANT, this?.senderStatesToRecord)
assertEquals(aliceNode.info.singleIdentity().name.hashCode().toLong(), this?.initiatorPartyId)
assertEquals(mapOf(BOB_NAME.hashCode().toLong() to StatesToRecord.ONLY_RELEVANT), this?.peersToStatesToRecord)
}
}
private fun getSenderRecoveryData(id: SecureHash, database: CordaPersistence): List<SenderDistributionRecord> {
val fromDb = database.transaction {
session.createQuery(
"from ${DBTransactionStorageLedgerRecovery.DBSenderDistributionRecord::class.java.name} where tx_id = :transactionId",
DBTransactionStorageLedgerRecovery.DBSenderDistributionRecord::class.java
).setParameter("transactionId", id.toString()).resultList.map { it }
}
return fromDb.singleOrNull()?.toSenderDistributionRecord()
return fromDb.map { it.toSenderDistributionRecord() }.also { println("SenderDistributionRecord\n$it") }
}
private fun getReceiverRecoveryData(id: SecureHash, database: CordaPersistence): DistributionRecord? {
private fun getReceiverRecoveryData(id: SecureHash, database: CordaPersistence): ReceiverDistributionRecord? {
val fromDb = database.transaction {
session.createQuery(
"from ${DBTransactionStorageLedgerRecovery.DBReceiverDistributionRecord::class.java.name} where tx_id = :transactionId",
DBTransactionStorageLedgerRecovery.DBReceiverDistributionRecord::class.java
).setParameter("transactionId", id.toString()).resultList.map { it }
}
return fromDb.singleOrNull()?.toReceiverDistributionRecord(MockCryptoService(emptyMap()))
return fromDb.singleOrNull()?.toReceiverDistributionRecord(MockCryptoService(emptyMap())).also { println("ReceiverDistributionRecord\n$it") }
}
@StartableByRPC
@ -445,12 +541,10 @@ class FinalityFlowTests : WithFinality {
override fun call(): SignedTransaction {
// Mimic ReceiveFinalityFlow but fail to finalise
try {
val stx = subFlow(ReceiveTransactionFlow(otherSideSession,
checkSufficientSignatures = false, statesToRecord = StatesToRecord.ONLY_RELEVANT, deferredAck = true))
val stx = subFlow(ReceiveTransactionFlow(otherSideSession, false, StatesToRecord.ONLY_RELEVANT, true))
require(NotarySigCheck.needsNotarySignature(stx))
logger.info("Peer recording transaction without notary signature.")
(serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx,
TransactionMetadata(otherSideSession.counterparty.name, StatesToRecord.ONLY_RELEVANT))
(serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx)
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (overrideAutoAck)
logger.info("Peer recorded transaction without notary signature.")
@ -494,7 +588,8 @@ class FinalityFlowTests : WithFinality {
val txBuilder = DummyContract.move(stateAndRef, newOwner)
val stxn = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey)
val sessionWithCounterParty = initiateFlow(newOwner)
subFlow(SendTransactionFlow(sessionWithCounterParty, stxn))
subFlow(SendTransactionFlow(sessionWithCounterParty, stxn,
TransactionMetadata(ourIdentity.name, DistributionList(StatesToRecord.ONLY_RELEVANT, mapOf(BOB_NAME to StatesToRecord.ONLY_RELEVANT)))))
throw UnexpectedFlowEndException("${stxn.id}")
}
}
@ -514,6 +609,11 @@ class FinalityFlowTests : WithFinality {
version = MOCK_VERSION_INFO.copy(platformVersion = platformVersion)))
}
private fun createNode(legalName: CordaX500Name, cordapps: List<TestCordappInternal> = emptyList(), platformVersion: Int = PLATFORM_VERSION): TestStartedNode {
return mockNet.createNode(InternalMockNodeParameters(legalName = legalName, additionalCordapps = cordapps,
version = MOCK_VERSION_INFO.copy(platformVersion = platformVersion)))
}
private fun TestStartedNode.issuesCashTo(recipient: TestStartedNode): SignedTransaction {
return issuesCashTo(recipient.info.singleIdentity())
}

View File

@ -6,6 +6,7 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.crypto.isFulfilledBy
import net.corda.core.flows.NotarySigCheck.needsNotarySignature
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.groupAbstractPartyByWellKnownParty
import net.corda.core.internal.FetchDataFlow
@ -15,6 +16,7 @@ import net.corda.core.internal.pushToLoggingContext
import net.corda.core.internal.telemetry.telemetryServiceInternal
import net.corda.core.internal.warnOnce
import net.corda.core.node.StatesToRecord
import net.corda.core.node.StatesToRecord.ALL_VISIBLE
import net.corda.core.node.StatesToRecord.ONLY_RELEVANT
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.transactions.SignedTransaction
@ -41,6 +43,9 @@ import java.time.Duration
* can also be included, but they must specify [StatesToRecord.ALL_VISIBLE] for statesToRecord if they wish to record the
* contract states into their vaults.
*
* As of 4.11 a list of observer [FlowSession] can be specified to indicate sessions with transaction non-participants (e.g. observers).
* This enables ledger recovery to default these sessions associated StatesToRecord value to [StatesToRecord.ALL_VISIBLE].
*
* The flow returns the same transaction but with the additional signatures from the notary.
*
* NOTE: This is an inlined flow but for backwards compatibility is annotated with [InitiatingFlow].
@ -55,13 +60,14 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
override val progressTracker: ProgressTracker,
private val sessions: Collection<FlowSession>,
private val newApi: Boolean,
private val statesToRecord: StatesToRecord = ONLY_RELEVANT) : FlowLogic<SignedTransaction>() {
private val statesToRecord: StatesToRecord = ONLY_RELEVANT,
private val observerSessions: Collection<FlowSession> = emptySet()) : FlowLogic<SignedTransaction>() {
@CordaInternal
data class ExtraConstructorArgs(val oldParticipants: Collection<Party>, val sessions: Collection<FlowSession>, val newApi: Boolean, val statesToRecord: StatesToRecord)
data class ExtraConstructorArgs(val oldParticipants: Collection<Party>, val sessions: Collection<FlowSession>, val newApi: Boolean, val statesToRecord: StatesToRecord, val observerSessions: Collection<FlowSession>)
@CordaInternal
fun getExtraConstructorArgs() = ExtraConstructorArgs(oldParticipants, sessions, newApi, statesToRecord)
fun getExtraConstructorArgs() = ExtraConstructorArgs(oldParticipants, sessions, newApi, statesToRecord, observerSessions)
@Deprecated(DEPRECATION_MSG)
constructor(transaction: SignedTransaction, extraRecipients: Set<Party>, progressTracker: ProgressTracker) : this(
@ -133,6 +139,10 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
progressTracker: ProgressTracker
) : this(transaction, oldParticipants, progressTracker, sessions, true)
constructor(transaction: SignedTransaction,
sessions: Collection<FlowSession>,
observerSessions: Collection<FlowSession>) : this(transaction, emptyList(), tracker(), sessions, true, observerSessions = observerSessions)
companion object {
private const val DEPRECATION_MSG = "It is unsafe to use this constructor as it requires nodes to automatically " +
"accept notarised transactions without first checking their relevancy. Instead, use one of the constructors " +
@ -158,6 +168,9 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
fun tracker() = ProgressTracker(RECORD_UNNOTARISED, BROADCASTING_PRE_NOTARISATION, NOTARISING, BROADCASTING_POST_NOTARISATION, BROADCASTING_NOTARY_ERROR, FINALISING_TRANSACTION, BROADCASTING)
}
private lateinit var externalTxParticipants: Set<Party>
private lateinit var txnMetadata: TransactionMetadata
@Suspendable
@Suppress("ComplexMethod", "NestedBlockDepth")
@Throws(NotaryException::class)
@ -169,6 +182,9 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
require(sessions.none { serviceHub.myInfo.isLegalIdentity(it.counterparty) }) {
"Do not provide flow sessions for the local node. FinalityFlow will record the notarised transaction locally."
}
sessions.intersect(observerSessions.toSet()).let {
require(it.isEmpty()) { "The following parties are specified both in flow sessions and observer flow sessions: $it" }
}
}
// Note: this method is carefully broken up to minimize the amount of data reachable from the stack at
@ -179,7 +195,7 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
transaction.pushToLoggingContext()
logCommandData()
val ledgerTransaction = verifyTx()
val externalTxParticipants = extractExternalParticipants(ledgerTransaction)
externalTxParticipants = extractExternalParticipants(ledgerTransaction)
if (newApi) {
val sessionParties = sessions.map { it.counterparty }.toSet()
@ -199,12 +215,14 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
// - broadcast notary signature to external participants (finalise remotely)
// - finalise locally
val (oldPlatformSessions, newPlatformSessions) = sessions.partition {
val (oldPlatformSessions, newPlatformSessions) = (sessions + observerSessions).partition {
serviceHub.networkMapCache.getNodeByLegalIdentity(it.counterparty)?.platformVersion!! < PlatformVersionSwitches.TWO_PHASE_FINALITY
}
val requiresNotarisation = needsNotarySignature(transaction)
val useTwoPhaseFinality = serviceHub.myInfo.platformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY
txnMetadata = TransactionMetadata(serviceHub.myInfo.legalIdentities.first().name,
DistributionList(statesToRecord, deriveStatesToRecord(newPlatformSessions)))
if (useTwoPhaseFinality) {
val stxn = if (requiresNotarisation) {
recordLocallyAndBroadcast(newPlatformSessions, transaction)
@ -226,11 +244,7 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
}
else {
if (newPlatformSessions.isNotEmpty())
finaliseLocallyAndBroadcast(newPlatformSessions, transaction,
TransactionMetadata(
serviceHub.myInfo.legalIdentities.first().name,
statesToRecord,
sessions.map { it.counterparty.name }.toSet()))
finaliseLocallyAndBroadcast(newPlatformSessions, transaction)
else
recordTransactionLocally(transaction)
transaction
@ -258,9 +272,9 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
}
@Suspendable
private fun finaliseLocallyAndBroadcast(sessions: Collection<FlowSession>, tx: SignedTransaction, metadata: TransactionMetadata) {
private fun finaliseLocallyAndBroadcast(sessions: Collection<FlowSession>, tx: SignedTransaction) {
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finaliseLocallyAndBroadcast", flowLogic = this) {
finaliseLocally(tx, metadata = metadata)
finaliseLocally(tx)
progressTracker.currentStep = BROADCASTING
broadcast(sessions, tx)
}
@ -272,7 +286,8 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
sessions.forEach { session ->
try {
logger.debug { "Sending transaction to party $session." }
subFlow(SendTransactionFlow(session, tx))
subFlow(SendTransactionFlow(session, tx, txnMetadata))
txnMetadata = txnMetadata.copy(persist = false)
} catch (e: UnexpectedFlowEndException) {
throw UnexpectedFlowEndException(
"${session.counterparty} has finished prematurely and we're trying to send them a transaction." +
@ -285,6 +300,13 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
}
}
private fun deriveStatesToRecord(newPlatformSessions: Collection<FlowSession>): Map<CordaX500Name, StatesToRecord> {
val derivedObserverSessions = newPlatformSessions.map { it.counterparty }.toSet() - externalTxParticipants
val txParticipantSessions = externalTxParticipants
return txParticipantSessions.map { it.name to ONLY_RELEVANT }.toMap() +
(derivedObserverSessions + observerSessions.map { it.counterparty }).map { it.name to ALL_VISIBLE }
}
@Suspendable
private fun broadcastSignaturesAndFinalise(sessions: Collection<FlowSession>, notarySignatures: List<TransactionSignature>) {
progressTracker.currentStep = BROADCASTING_POST_NOTARISATION
@ -309,12 +331,11 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
}
@Suspendable
private fun finaliseLocally(stx: SignedTransaction, notarySignatures: List<TransactionSignature> = emptyList(),
metadata: TransactionMetadata? = null) {
private fun finaliseLocally(stx: SignedTransaction, notarySignatures: List<TransactionSignature> = emptyList()) {
progressTracker.currentStep = FINALISING_TRANSACTION
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finaliseLocally", flowLogic = this) {
if (notarySignatures.isEmpty()) {
(serviceHub as ServiceHubCoreInternal).finalizeTransaction(stx, statesToRecord, metadata!!)
(serviceHub as ServiceHubCoreInternal).finalizeTransaction(stx, statesToRecord)
logger.info("Finalised transaction locally.")
} else {
(serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx, notarySignatures, statesToRecord)
@ -355,7 +376,8 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
for (session in sessions) {
try {
logger.debug { "Sending transaction to party $session." }
subFlow(SendTransactionFlow(session, tx))
subFlow(SendTransactionFlow(session, tx, txnMetadata))
txnMetadata = txnMetadata.copy(persist = false)
} catch (e: UnexpectedFlowEndException) {
throw UnexpectedFlowEndException(
"${session.counterparty} has finished prematurely and we're trying to send them the finalised transaction. " +
@ -378,7 +400,8 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
if (!serviceHub.myInfo.isLegalIdentity(recipient)) {
logger.debug { "Sending transaction to party $recipient." }
val session = initiateFlow(recipient)
subFlow(SendTransactionFlow(session, notarised))
subFlow(SendTransactionFlow(session, notarised, txnMetadata))
txnMetadata = txnMetadata.copy(persist = false)
logger.info("Party $recipient received the transaction.")
}
}
@ -404,11 +427,7 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
private fun recordUnnotarisedTransaction(tx: SignedTransaction): SignedTransaction {
progressTracker.currentStep = RECORD_UNNOTARISED
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) {
(serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(tx,
TransactionMetadata(
serviceHub.myInfo.legalIdentities.first().name,
statesToRecord,
sessions.map { it.counterparty.name }.toSet()))
(serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(tx)
logger.info("Recorded un-notarised transaction locally.")
return tx
}
@ -487,7 +506,7 @@ class ReceiveFinalityFlow @JvmOverloads constructor(private val otherSideSession
@Suppress("ComplexMethod", "NestedBlockDepth")
@Suspendable
override fun call(): SignedTransaction {
val stx = subFlow(ReceiveTransactionFlow(otherSideSession, checkSufficientSignatures = false, statesToRecord = statesToRecord, deferredAck = true))
val stx = subFlow(ReceiveTransactionFlow(otherSideSession, false, statesToRecord, true))
val requiresNotarisation = needsNotarySignature(stx)
val fromTwoPhaseFinalityNode = serviceHub.networkMapCache.getNodeByLegalIdentity(otherSideSession.counterparty)?.platformVersion!! >= PlatformVersionSwitches.TWO_PHASE_FINALITY
@ -495,8 +514,7 @@ class ReceiveFinalityFlow @JvmOverloads constructor(private val otherSideSession
if (requiresNotarisation) {
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) {
logger.debug { "Peer recording transaction without notary signature." }
(serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx,
TransactionMetadata(otherSideSession.counterparty.name, statesToRecord))
(serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx)
}
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)
logger.info("Peer recorded transaction without notary signature. Waiting to receive notary signature.")
@ -522,8 +540,7 @@ class ReceiveFinalityFlow @JvmOverloads constructor(private val otherSideSession
}
} else {
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransaction", flowLogic = this) {
(serviceHub as ServiceHubCoreInternal).finalizeTransaction(stx, statesToRecord,
TransactionMetadata(otherSideSession.counterparty.name, statesToRecord))
(serviceHub as ServiceHubCoreInternal).finalizeTransaction(stx, statesToRecord)
logger.info("Peer recorded transaction with recovery metadata.")
}
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)

View File

@ -24,8 +24,14 @@ data class FlowTransactionInfo(
@CordaSerializable
data class TransactionMetadata(
val initiator: CordaX500Name,
val statesToRecord: StatesToRecord? = StatesToRecord.ONLY_RELEVANT,
val peers: Set<CordaX500Name>? = null
val distributionList: DistributionList,
val persist: Boolean = true // hint to persist to transactional store
)
@CordaSerializable
data class DistributionList(
val senderStatesToRecord: StatesToRecord,
val peersToStatesToRecord: Map<CordaX500Name, StatesToRecord>
)
@CordaSerializable

View File

@ -1,8 +1,13 @@
package net.corda.core.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.*
import net.corda.core.contracts.AttachmentResolutionException
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.TransactionResolutionException
import net.corda.core.contracts.TransactionVerificationException
import net.corda.core.internal.ResolveTransactionsFlow
import net.corda.core.internal.ServiceHubCoreInternal
import net.corda.core.internal.checkParameterHash
import net.corda.core.internal.pushToLoggingContext
import net.corda.core.node.StatesToRecord
@ -53,19 +58,23 @@ open class ReceiveTransactionFlow constructor(private val otherSideSession: Flow
} else {
logger.trace { "Receiving a transaction (but without checking the signatures) from ${otherSideSession.counterparty}" }
}
val stx = otherSideSession.receive<SignedTransaction>().unwrap {
it.pushToLoggingContext()
logger.info("Received transaction acknowledgement request from party ${otherSideSession.counterparty}.")
checkParameterHash(it.networkParametersHash)
subFlow(ResolveTransactionsFlow(it, otherSideSession, statesToRecord, deferredAck))
logger.info("Transaction dependencies resolution completed.")
try {
it.verify(serviceHub, checkSufficientSignatures)
it
} catch (e: Exception) {
logger.warn("Transaction verification failed.")
throw e
}
val payload = otherSideSession.receive<Any>().unwrap { it }
val stx =
if (payload is SignedTransactionWithDistributionList) {
recordTransactionMetadata(payload.stx, payload.distributionList)
payload.stx
} else payload as SignedTransaction
stx.pushToLoggingContext()
logger.info("Received transaction acknowledgement request from party ${otherSideSession.counterparty}.")
checkParameterHash(stx.networkParametersHash)
subFlow(ResolveTransactionsFlow(stx, otherSideSession, statesToRecord, deferredAck))
logger.info("Transaction dependencies resolution completed.")
try {
stx.verify(serviceHub, checkSufficientSignatures)
} catch (e: Exception) {
logger.warn("Transaction verification failed.")
throw e
}
if (checkSufficientSignatures) {
// We should only send a transaction to the vault for processing if we did in fact fully verify it, and
@ -78,6 +87,21 @@ open class ReceiveTransactionFlow constructor(private val otherSideSession: Flow
return stx
}
@Suspendable
private fun recordTransactionMetadata(stx: SignedTransaction, distributionList: DistributionList?) {
distributionList?.let {
val txnMetadata = TransactionMetadata(otherSideSession.counterparty.name,
DistributionList(distributionList.senderStatesToRecord,
distributionList.peersToStatesToRecord.map { (peer, peerStatesToRecord) ->
if (peer == ourIdentity.name)
peer to statesToRecord // use actual value
else
peer to peerStatesToRecord // use hinted value
}.toMap()))
(serviceHub as ServiceHubCoreInternal).recordTransactionRecoveryMetadata(stx.id, txnMetadata, ourIdentity.name)
}
}
/**
* Hook to perform extra checks on the received transaction just before it's recorded. The transaction has already
* been resolved and verified at this point.

View File

@ -4,14 +4,24 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.NamedByHash
import net.corda.core.contracts.StateAndRef
import net.corda.core.crypto.SecureHash
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.*
import net.corda.core.node.StatesToRecord
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.unwrap
import net.corda.core.utilities.trace
import net.corda.core.utilities.unwrap
import kotlin.collections.List
import kotlin.collections.MutableSet
import kotlin.collections.Set
import kotlin.collections.flatMap
import kotlin.collections.map
import kotlin.collections.mutableSetOf
import kotlin.collections.plus
import kotlin.collections.toSet
/**
* In the words of Matt working code is more important then pretty code. This class that contains code that may
@ -66,8 +76,16 @@ class MaybeSerializedSignedTransaction(override val id: SecureHash, val serializ
*
* @param otherSide the target party.
* @param stx the [SignedTransaction] being sent to the [otherSideSession].
* @property txnMetadata transaction recovery metadata (eg. used by Two Phase Finality).
*/
open class SendTransactionFlow(otherSide: FlowSession, stx: SignedTransaction) : DataVendingFlow(otherSide, stx)
open class SendTransactionFlow(otherSide: FlowSession, stx: SignedTransaction, txnMetadata: TransactionMetadata) : DataVendingFlow(otherSide, stx, txnMetadata) {
constructor(otherSide: FlowSession, stx: SignedTransaction) : this(otherSide, stx,
TransactionMetadata(DUMMY_PARTICIPANT_NAME, DistributionList(StatesToRecord.NONE, mapOf(otherSide.counterparty.name to StatesToRecord.ALL_VISIBLE))))
// Note: DUMMY_PARTICIPANT_NAME to be substituted with actual "ourIdentity.name" in flow call()
companion object {
val DUMMY_PARTICIPANT_NAME = CordaX500Name("Transaction Participant", "London", "GB")
}
}
/**
* The [SendStateAndRefFlow] should be used to send a list of input [StateAndRef] to another peer that wishes to verify
@ -80,7 +98,9 @@ open class SendTransactionFlow(otherSide: FlowSession, stx: SignedTransaction) :
*/
open class SendStateAndRefFlow(otherSideSession: FlowSession, stateAndRefs: List<StateAndRef<*>>) : DataVendingFlow(otherSideSession, stateAndRefs)
open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any) : FlowLogic<Void?>() {
open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any, val txnMetadata: TransactionMetadata? = null) : FlowLogic<Void?>() {
constructor(otherSideSession: FlowSession, payload: Any) : this(otherSideSession, payload, null)
@Suspendable
protected open fun sendPayloadAndReceiveDataRequest(otherSideSession: FlowSession, payload: Any) = otherSideSession.sendAndReceive<FetchDataFlow.Request>(payload)
@ -89,6 +109,7 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any)
// User can override this method to perform custom request verification.
}
@Suppress("ComplexCondition", "ComplexMethod")
@Suspendable
override fun call(): Void? {
val networkMaxMessageSize = serviceHub.networkParameters.maxMessageSize
@ -117,6 +138,15 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any)
else -> throw Exception("Unknown payload type: ${payload::class.java} ?")
}
// store and share transaction recovery metadata if required
val useTwoPhaseFinality = serviceHub.myInfo.platformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY
val toTwoPhaseFinalityNode = serviceHub.networkMapCache.getNodeByLegalIdentity(otherSideSession.counterparty)?.platformVersion!! >= PlatformVersionSwitches.TWO_PHASE_FINALITY
if (txnMetadata != null && toTwoPhaseFinalityNode && useTwoPhaseFinality && payload is SignedTransaction) {
payload = SignedTransactionWithDistributionList(payload, txnMetadata.distributionList)
if (txnMetadata.persist)
(serviceHub as ServiceHubCoreInternal).recordTransactionRecoveryMetadata(payload.stx.id, txnMetadata.copy(initiator = ourIdentity.name), ourIdentity.name)
}
// This loop will receive [FetchDataFlow.Request] continuously until the `otherSideSession` has all the data they need
// to resolve the transaction, a [FetchDataFlow.EndRequest] will be sent from the `otherSideSession` to indicate end of
// data request.
@ -240,3 +270,9 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any)
}
}
}
@CordaSerializable
data class SignedTransactionWithDistributionList(
val stx: SignedTransaction,
val distributionList: DistributionList
)

View File

@ -5,6 +5,7 @@ import net.corda.core.DeleteForDJVM
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.TransactionMetadata
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.notary.NotaryService
import net.corda.core.node.ServiceHub
import net.corda.core.node.StatesToRecord
@ -35,9 +36,8 @@ interface ServiceHubCoreInternal : ServiceHub {
* This is expected to be run within a database transaction.
*
* @param txn The transaction to record.
* @param metadata Finality flow recovery metadata.
*/
fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: TransactionMetadata)
fun recordUnnotarisedTransaction(txn: SignedTransaction)
/**
* Removes transaction from data store.
@ -61,9 +61,17 @@ interface ServiceHubCoreInternal : ServiceHub {
*
* @param txn The transaction to record.
* @param statesToRecord how the vault should treat the output states of the transaction.
* @param metadata Finality flow recovery metadata.
*/
fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord, metadata: TransactionMetadata)
fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord)
/**
* Records [TransactionMetadata] for a given txnId.
*
* @param txnId The SecureHash of a transaction.
* @param txnMetadata The recovery metadata associated with a transaction.
* @param caller The CordaX500Name of the party calling this operation.
*/
fun recordTransactionRecoveryMetadata(txnId: SecureHash, txnMetadata: TransactionMetadata, caller: CordaX500Name)
}
interface TransactionsResolver {

View File

@ -14,6 +14,7 @@ import net.corda.core.internal.PlatformVersionSwitches.TWO_PHASE_FINALITY
import net.corda.core.internal.telemetry.TelemetryComponent
import net.corda.core.node.services.*
import net.corda.core.node.services.diagnostics.DiagnosticsService
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.transactions.FilteredTransaction
import net.corda.core.transactions.LedgerTransaction
@ -90,6 +91,7 @@ interface ServicesForResolution {
* Controls whether the transaction is sent to the vault at all, and if so whether states have to be relevant
* or not in order to be recorded. Used in [ServiceHub.recordTransactions]
*/
@CordaSerializable
enum class StatesToRecord {
/** The received transaction is not sent to the vault at all. This is used within transaction resolution. */
NONE,

View File

@ -8,6 +8,7 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.flows.TransactionMetadata
import net.corda.core.flows.StateMachineRunId
import net.corda.core.flows.TransactionStatus
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.FlowStateMachineHandle
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.ResolveTransactionsFlow
@ -194,6 +195,9 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) =
recordTransactions(statesToRecord, txs, SIGNATURE_VERIFICATION_DISABLED)
override fun recordTransactionRecoveryMetadata(txnId: SecureHash, txnMetadata: TransactionMetadata, caller: CordaX500Name) =
validatedTransactions.addTransactionRecoveryMetadata(txnId, txnMetadata, caller)
@Suppress("NestedBlockDepth")
@VisibleForTesting
fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>, disableSignatureVerification: Boolean) {
@ -240,27 +244,25 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
)
}
override fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord, metadata: TransactionMetadata) {
override fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord) {
requireSupportedHashType(txn)
if (txn.coreTransaction is WireTransaction)
txn.verifyRequiredSignatures()
database.transaction {
recordTransactions(statesToRecord, listOf(txn), validatedTransactions, stateMachineRecordedTransactionMapping, vaultService, database) {
val isInitiator = metadata.initiator == myInfo.legalIdentities.first().name
validatedTransactions.finalizeTransaction(txn, metadata, isInitiator)
validatedTransactions.finalizeTransaction(txn)
}
}
}
override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: TransactionMetadata) {
override fun recordUnnotarisedTransaction(txn: SignedTransaction) {
if (txn.coreTransaction is WireTransaction) {
txn.notary?.let { notary ->
txn.verifySignaturesExcept(notary.owningKey)
} ?: txn.verifyRequiredSignatures()
}
database.transaction {
val isInitiator = metadata.initiator == myInfo.legalIdentities.first().name
validatedTransactions.addUnnotarisedTransaction(txn, metadata, isInitiator)
validatedTransactions.addUnnotarisedTransaction(txn)
}
}
@ -360,10 +362,18 @@ interface WritableTransactionStorage : TransactionStorage {
* Add an un-notarised transaction to the store with a status of *MISSING_TRANSACTION_SIG* and inclusive of flow recovery metadata.
*
* @param transaction The transaction to be recorded.
* @param metadata Finality flow recovery metadata.
* @return true if the transaction was recorded as a *new* transaction, false if the transaction already exists.
*/
fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean): Boolean
fun addUnnotarisedTransaction(transaction: SignedTransaction): Boolean
/**
* Record transaction recovery metadata for a given transaction id.
*
* @param id The SecureHash of the transaction to be recorded.
* @param metadata transaction recovery metadata.
* @param caller The CordaX500Name of the party calling this operation.
*/
fun addTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata, caller: CordaX500Name)
/**
* Removes an un-notarised transaction (with a status of *MISSING_TRANSACTION_SIG*) from the data store.
@ -375,10 +385,9 @@ interface WritableTransactionStorage : TransactionStorage {
* Add a finalised transaction to the store with flow recovery metadata.
*
* @param transaction The transaction to be recorded.
* @param metadata Finality flow recovery metadata.
* @return true if the transaction was recorded as a *new* transaction, false if the transaction already exists.
*/
fun finalizeTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean): Boolean
fun finalizeTransaction(transaction: SignedTransaction): Boolean
/**
* Update a previously un-notarised transaction including associated notary signatures.

View File

@ -56,7 +56,6 @@ class PersistentPartyInfoCache(private val networkMapCache: PersistentNetworkMap
private fun updateInfoDB(partyHashCode: Long, partyName: CordaX500Name) {
database.transaction {
if (queryByPartyId(session, partyHashCode) == null) {
println("PartyInfo: $partyHashCode -> $partyName")
session.save(DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo(partyHashCode, partyName.toString()))
}
}

View File

@ -4,6 +4,7 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.TransactionMetadata
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.VisibleForTesting
@ -208,12 +209,14 @@ open class DBTransactionStorage(private val database: CordaPersistence, cacheFac
updateTransaction(transaction.id)
}
override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean) =
override fun addUnnotarisedTransaction(transaction: SignedTransaction) =
addTransaction(transaction, TransactionStatus.IN_FLIGHT) {
false
}
override fun finalizeTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean) =
override fun addTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata, caller: CordaX500Name) { }
override fun finalizeTransaction(transaction: SignedTransaction) =
addTransaction(transaction) {
false
}

View File

@ -10,7 +10,6 @@ import net.corda.core.node.services.vault.Sort
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.node.CordaClock
import net.corda.node.services.network.PersistentPartyInfoCache
import net.corda.nodeapi.internal.cryptoservice.CryptoService
@ -19,6 +18,7 @@ import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.serialization.internal.CordaSerializationEncoding
import org.hibernate.annotations.Immutable
import java.io.Serializable
import java.lang.IllegalStateException
import java.time.Instant
import java.util.concurrent.atomic.AtomicLong
import javax.persistence.Column
@ -100,13 +100,13 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
@Column(name = "sender_states_to_record", nullable = false)
val senderStatesToRecord: StatesToRecord
) {
constructor(key: Key, txId: SecureHash, initiatorPartyId: Long, peerPartyIds: Set<Long>, statesToRecord: StatesToRecord, cryptoService: CryptoService) :
constructor(key: Key, txId: SecureHash, initiatorPartyId: Long, peersToStatesToRecord: Map<Long, StatesToRecord>, senderStatesToRecord: StatesToRecord, receiverStatesToRecord: StatesToRecord, cryptoService: CryptoService) :
this(PersistentKey(key),
txId = txId.toString(),
senderPartyId = initiatorPartyId,
distributionList = cryptoService.encrypt(peerPartyIds.serialize(context = contextToUse().withEncoding(CordaSerializationEncoding.SNAPPY)).bytes),
receiverStatesToRecord = statesToRecord,
senderStatesToRecord = StatesToRecord.NONE // to be set in follow-up PR.
distributionList = cryptoService.encrypt(peersToStatesToRecord.serialize(context = contextToUse().withEncoding(CordaSerializationEncoding.SNAPPY)).bytes),
receiverStatesToRecord = receiverStatesToRecord,
senderStatesToRecord = senderStatesToRecord
)
fun toReceiverDistributionRecord(cryptoService: CryptoService) =
@ -115,6 +115,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
this.senderPartyId,
cryptoService.decrypt(this.distributionList).deserialize(context = contextToUse()),
this.receiverStatesToRecord,
this.senderStatesToRecord,
this.compositeKey.timestamp
)
}
@ -141,17 +142,33 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
}
}
override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean): Boolean {
return addTransaction(transaction, TransactionStatus.IN_FLIGHT) {
addTransactionRecoveryMetadata(transaction.id, metadata, isInitiator, clock)
@Suppress("IMPLICIT_CAST_TO_ANY")
override fun addTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata, caller: CordaX500Name) {
database.transaction {
if (caller == metadata.initiator) {
metadata.distributionList.peersToStatesToRecord.map { (peer, _) ->
val senderDistributionRecord = DBSenderDistributionRecord(PersistentKey(Key(clock.instant())),
id.toString(),
partyInfoCache.getPartyIdByCordaX500Name(peer),
metadata.distributionList.senderStatesToRecord)
session.save(senderDistributionRecord)
}
} else {
val receiverStatesToRecord = metadata.distributionList.peersToStatesToRecord[caller] ?: throw IllegalStateException("Missing peer $caller in distribution list of Receiver recovery metadata")
val receiverDistributionRecord =
DBReceiverDistributionRecord(Key(clock.instant()),
id,
partyInfoCache.getPartyIdByCordaX500Name(metadata.initiator),
metadata.distributionList.peersToStatesToRecord.map { (peer, statesToRecord) ->
partyInfoCache.getPartyIdByCordaX500Name(peer) to statesToRecord }.toMap(),
metadata.distributionList.senderStatesToRecord,
receiverStatesToRecord,
cryptoService)
session.save(receiverDistributionRecord)
}
}
}
override fun finalizeTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean) =
addTransaction(transaction) {
addTransactionRecoveryMetadata(transaction.id, metadata, isInitiator, clock)
}
override fun removeUnnotarisedTransaction(id: SecureHash): Boolean {
return database.transaction {
super.removeUnnotarisedTransaction(id)
@ -260,31 +277,6 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence,
results.map { it.toReceiverDistributionRecord(cryptoService) }.toList()
}
}
@Suppress("IMPLICIT_CAST_TO_ANY")
private fun addTransactionRecoveryMetadata(txId: SecureHash, metadata: TransactionMetadata, isInitiator: Boolean, clock: CordaClock): Boolean {
database.transaction {
if (isInitiator) {
metadata.peers?.map { peer ->
val senderDistributionRecord = DBSenderDistributionRecord(PersistentKey(Key(clock.instant())),
txId.toString(),
partyInfoCache.getPartyIdByCordaX500Name(peer),
metadata.statesToRecord ?: StatesToRecord.ONLY_RELEVANT)
session.save(senderDistributionRecord)
}
} else {
val receiverDistributionRecord =
DBReceiverDistributionRecord(Key(clock.instant()),
txId,
partyInfoCache.getPartyIdByCordaX500Name(metadata.initiator),
metadata.peers?.map { partyInfoCache.getPartyIdByCordaX500Name(it) }?.toSet() ?: emptySet(),
metadata.statesToRecord ?: StatesToRecord.ONLY_RELEVANT,
cryptoService)
session.save(receiverDistributionRecord)
}
}
return false
}
}
// TO DO: https://r3-cev.atlassian.net/browse/ENT-9876
@ -316,8 +308,9 @@ data class SenderDistributionRecord(
data class ReceiverDistributionRecord(
override val txId: SecureHash,
val initiatorPartyId: Long, // CordaX500Name hashCode()
val peerPartyIds: Set<Long>, // CordaX500Name hashCode()
val peersToStatesToRecord: Map<Long, StatesToRecord>, // CordaX500Name hashCode() -> StatesToRecord
override val statesToRecord: StatesToRecord,
val senderStatesToRecord: StatesToRecord,
override val timestamp: Instant
) : DistributionRecord(txId, statesToRecord, timestamp)

View File

@ -801,23 +801,29 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
return true
}
override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean): Boolean {
override fun addUnnotarisedTransaction(transaction: SignedTransaction): Boolean {
database.transaction {
records.add(TxRecord.Add(transaction))
delegate.addUnnotarisedTransaction(transaction, metadata, isInitiator)
delegate.addUnnotarisedTransaction(transaction)
}
return true
}
override fun addTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata, caller: CordaX500Name) {
database.transaction {
delegate.addTransactionRecoveryMetadata(id, metadata, caller)
}
}
override fun removeUnnotarisedTransaction(id: SecureHash): Boolean {
return database.transaction {
delegate.removeUnnotarisedTransaction(id)
}
}
override fun finalizeTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean): Boolean {
override fun finalizeTransaction(transaction: SignedTransaction): Boolean {
database.transaction {
delegate.finalizeTransaction(transaction, metadata, isInitiator)
delegate.finalizeTransaction(transaction)
}
return true
}

View File

@ -6,10 +6,13 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignableData
import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.sign
import net.corda.core.flows.DistributionList
import net.corda.core.flows.TransactionMetadata
import net.corda.core.flows.RecoveryTimeWindow
import net.corda.core.node.NodeInfo
import net.corda.core.node.StatesToRecord
import net.corda.core.node.StatesToRecord.ALL_VISIBLE
import net.corda.core.node.StatesToRecord.ONLY_RELEVANT
import net.corda.core.node.StatesToRecord.NONE
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.NetworkHostAndPort
@ -79,14 +82,18 @@ class DBTransactionStorageLedgerRecoveryTests {
@Test(timeout = 300_000)
fun `query local ledger for transactions with recovery peers within time window`() {
val beforeFirstTxn = now()
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME)), true)
val txn = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn)
transactionRecovery.addTransactionRecoveryMetadata(txn.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))), ALICE_NAME)
val timeWindow = RecoveryTimeWindow(fromTime = beforeFirstTxn,
untilTime = beforeFirstTxn.plus(1, ChronoUnit.MINUTES))
val results = transactionRecovery.querySenderDistributionRecords(timeWindow)
assertEquals(1, results.size)
val afterFirstTxn = now()
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ONLY_RELEVANT, setOf(CHARLIE_NAME)), true)
val txn2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn2)
transactionRecovery.addTransactionRecoveryMetadata(txn2.id, TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to ONLY_RELEVANT))), ALICE_NAME)
assertEquals(2, transactionRecovery.querySenderDistributionRecords(timeWindow).size)
assertEquals(1, transactionRecovery.querySenderDistributionRecords(RecoveryTimeWindow(fromTime = afterFirstTxn)).size)
}
@ -94,9 +101,11 @@ class DBTransactionStorageLedgerRecoveryTests {
@Test(timeout = 300_000)
fun `query local ledger for transactions within timeWindow and excluding remoteTransactionIds`() {
val transaction1 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(transaction1, TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME)), true)
transactionRecovery.addUnnotarisedTransaction(transaction1)
transactionRecovery.addTransactionRecoveryMetadata(transaction1.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))), ALICE_NAME)
val transaction2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(transaction2, TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME)), true)
transactionRecovery.addUnnotarisedTransaction(transaction2)
transactionRecovery.addTransactionRecoveryMetadata(transaction2.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))), ALICE_NAME)
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
val results = transactionRecovery.querySenderDistributionRecords(timeWindow, excludingTxnIds = setOf(transaction1.id))
assertEquals(1, results.size)
@ -106,18 +115,22 @@ class DBTransactionStorageLedgerRecoveryTests {
fun `query local ledger by distribution record type`() {
val transaction1 = newTransaction()
// sender txn
transactionRecovery.addUnnotarisedTransaction(transaction1, TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME)), true)
transactionRecovery.addUnnotarisedTransaction(transaction1)
transactionRecovery.addTransactionRecoveryMetadata(transaction1.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ALL_VISIBLE))), ALICE_NAME)
val transaction2 = newTransaction()
// receiver txn
transactionRecovery.addUnnotarisedTransaction(transaction2, TransactionMetadata(BOB_NAME, StatesToRecord.ALL_VISIBLE, setOf(ALICE_NAME)), false)
transactionRecovery.addUnnotarisedTransaction(transaction2)
transactionRecovery.addTransactionRecoveryMetadata(transaction2.id, TransactionMetadata(BOB_NAME, DistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ALL_VISIBLE))), ALICE_NAME)
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.SENDER).let {
assertEquals(1, it.size)
assertEquals((it[0] as SenderDistributionRecord).peerPartyId, BOB_NAME.hashCode().toLong())
assertEquals(BOB_NAME.hashCode().toLong(), (it[0] as SenderDistributionRecord).peerPartyId)
assertEquals(ALL_VISIBLE, (it[0] as SenderDistributionRecord).statesToRecord)
}
transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.RECEIVER).let {
assertEquals(1, it.size)
assertEquals((it[0] as ReceiverDistributionRecord).initiatorPartyId, BOB_NAME.hashCode().toLong())
assertEquals(BOB_NAME.hashCode().toLong(), (it[0] as ReceiverDistributionRecord).initiatorPartyId)
assertEquals(ALL_VISIBLE, (it[0] as ReceiverDistributionRecord).statesToRecord)
}
val resultsAll = transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.ALL)
assertEquals(2, resultsAll.size)
@ -125,18 +138,28 @@ class DBTransactionStorageLedgerRecoveryTests {
@Test(timeout = 300_000)
fun `query for sender distribution records by peers`() {
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME)), true)
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ONLY_RELEVANT, setOf(CHARLIE_NAME)), true)
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ONLY_RELEVANT, setOf(BOB_NAME, CHARLIE_NAME)), true)
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(BOB_NAME, StatesToRecord.ONLY_RELEVANT, setOf(ALICE_NAME)), true)
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(CHARLIE_NAME, StatesToRecord.ONLY_RELEVANT), true)
val txn1 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn1)
transactionRecovery.addTransactionRecoveryMetadata(txn1.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ALL_VISIBLE))), ALICE_NAME)
val txn2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn2)
transactionRecovery.addTransactionRecoveryMetadata(txn2.id, TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to ONLY_RELEVANT))), ALICE_NAME)
val txn3 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn3)
transactionRecovery.addTransactionRecoveryMetadata(txn3.id, TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT, CHARLIE_NAME to ALL_VISIBLE))), ALICE_NAME)
val txn4 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn4)
transactionRecovery.addTransactionRecoveryMetadata(txn4.id, TransactionMetadata(BOB_NAME, DistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ONLY_RELEVANT))), BOB_NAME)
val txn5 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn5)
transactionRecovery.addTransactionRecoveryMetadata(txn5.id, TransactionMetadata(CHARLIE_NAME, DistributionList(ONLY_RELEVANT, emptyMap())), CHARLIE_NAME)
assertEquals(5, readSenderDistributionRecordFromDB().size)
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(BOB_NAME)).let {
assertEquals(2, it.size)
assertEquals(it[0].statesToRecord, StatesToRecord.ALL_VISIBLE)
assertEquals(it[1].statesToRecord, StatesToRecord.ONLY_RELEVANT)
assertEquals(it[0].statesToRecord, ALL_VISIBLE)
assertEquals(it[1].statesToRecord, ONLY_RELEVANT)
}
assertEquals(1, transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(ALICE_NAME)).size)
assertEquals(2, transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(CHARLIE_NAME)).size)
@ -144,59 +167,93 @@ class DBTransactionStorageLedgerRecoveryTests {
@Test(timeout = 300_000)
fun `query for receiver distribution records by initiator`() {
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME, CHARLIE_NAME)), false)
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ONLY_RELEVANT, setOf(BOB_NAME)), false)
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.NONE, setOf(CHARLIE_NAME)), false)
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(BOB_NAME, StatesToRecord.ALL_VISIBLE, setOf(ALICE_NAME)), false)
transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(CHARLIE_NAME, StatesToRecord.ONLY_RELEVANT), false)
val txn1 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn1)
transactionRecovery.addTransactionRecoveryMetadata(txn1.id, TransactionMetadata(ALICE_NAME,
DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ALL_VISIBLE, CHARLIE_NAME to ALL_VISIBLE))), BOB_NAME)
val txn2 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn2)
transactionRecovery.addTransactionRecoveryMetadata(txn2.id, TransactionMetadata(ALICE_NAME,
DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT))), BOB_NAME)
val txn3 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn3)
transactionRecovery.addTransactionRecoveryMetadata(txn3.id, TransactionMetadata(ALICE_NAME,
DistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to NONE))), CHARLIE_NAME)
val txn4 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn4)
transactionRecovery.addTransactionRecoveryMetadata(txn4.id, TransactionMetadata(BOB_NAME,
DistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ALL_VISIBLE))), ALICE_NAME)
val txn5 = newTransaction()
transactionRecovery.addUnnotarisedTransaction(txn5)
transactionRecovery.addTransactionRecoveryMetadata(txn5.id, TransactionMetadata(CHARLIE_NAME,
DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT))), BOB_NAME)
val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS))
transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(ALICE_NAME)).let {
assertEquals(3, it.size)
assertEquals(it[0].statesToRecord, StatesToRecord.ALL_VISIBLE)
assertEquals(it[1].statesToRecord, StatesToRecord.ONLY_RELEVANT)
assertEquals(it[2].statesToRecord, StatesToRecord.NONE)
assertEquals(it[0].statesToRecord, ALL_VISIBLE)
assertEquals(it[1].statesToRecord, ONLY_RELEVANT)
assertEquals(it[2].statesToRecord, NONE)
}
assertEquals(1, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(BOB_NAME)).size)
assertEquals(1, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(CHARLIE_NAME)).size)
assertEquals(2, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(BOB_NAME, CHARLIE_NAME)).size)
}
@Test(timeout = 300_000)
fun `transaction without peers does not store recovery metadata in database`() {
val senderTransaction = newTransaction()
transactionRecovery.addUnnotarisedTransaction(senderTransaction)
transactionRecovery.addTransactionRecoveryMetadata(senderTransaction.id, TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, emptyMap())), ALICE_NAME)
assertEquals(IN_FLIGHT, readTransactionFromDB(senderTransaction.id).status)
assertEquals(0, readSenderDistributionRecordFromDB(senderTransaction.id).size)
}
@Test(timeout = 300_000)
fun `create un-notarised transaction with flow metadata and validate status in db`() {
val senderTransaction = newTransaction()
transactionRecovery.addUnnotarisedTransaction(senderTransaction, TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME)), true)
transactionRecovery.addUnnotarisedTransaction(senderTransaction)
transactionRecovery.addTransactionRecoveryMetadata(senderTransaction.id,
TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ALL_VISIBLE))), ALICE_NAME)
assertEquals(IN_FLIGHT, readTransactionFromDB(senderTransaction.id).status)
readSenderDistributionRecordFromDB(senderTransaction.id).let {
assertEquals(1, it.size)
assertEquals(StatesToRecord.ALL_VISIBLE, it[0].statesToRecord)
assertEquals(ALL_VISIBLE, it[0].statesToRecord)
assertEquals(BOB_NAME, partyInfoCache.getCordaX500NameByPartyId(it[0].peerPartyId))
}
val receiverTransaction = newTransaction()
transactionRecovery.addUnnotarisedTransaction(receiverTransaction, TransactionMetadata(ALICE_NAME, StatesToRecord.ONLY_RELEVANT, setOf(BOB_NAME)), false)
transactionRecovery.addUnnotarisedTransaction(receiverTransaction)
transactionRecovery.addTransactionRecoveryMetadata(receiverTransaction.id,
TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ALL_VISIBLE))), BOB_NAME)
assertEquals(IN_FLIGHT, readTransactionFromDB(receiverTransaction.id).status)
readReceiverDistributionRecordFromDB(receiverTransaction.id).let {
assertEquals(StatesToRecord.ONLY_RELEVANT, it.statesToRecord)
assertEquals(ALL_VISIBLE, it.statesToRecord)
assertEquals(ONLY_RELEVANT, it.senderStatesToRecord)
assertEquals(ALICE_NAME, partyInfoCache.getCordaX500NameByPartyId(it.initiatorPartyId))
assertEquals(setOf(BOB_NAME), it.peerPartyIds.map { partyInfoCache.getCordaX500NameByPartyId(it) }.toSet() )
assertEquals(setOf(BOB_NAME), it.peersToStatesToRecord.map { (peer, _) -> partyInfoCache.getCordaX500NameByPartyId(peer) }.toSet() )
}
}
@Test(timeout = 300_000)
fun `finalize transaction with recovery metadata`() {
val transaction = newTransaction(notarySig = false)
transactionRecovery.finalizeTransaction(transaction,
TransactionMetadata(ALICE_NAME), false)
transactionRecovery.finalizeTransaction(transaction)
transactionRecovery.addTransactionRecoveryMetadata(transaction.id,
TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to ALL_VISIBLE))), ALICE_NAME)
assertEquals(VERIFIED, readTransactionFromDB(transaction.id).status)
assertEquals(StatesToRecord.ONLY_RELEVANT, readReceiverDistributionRecordFromDB(transaction.id).statesToRecord)
readSenderDistributionRecordFromDB(transaction.id).apply {
assertEquals(1, this.size)
assertEquals(ONLY_RELEVANT, this[0].statesToRecord)
}
}
@Test(timeout = 300_000)
fun `remove un-notarised transaction and associated recovery metadata`() {
val senderTransaction = newTransaction(notarySig = false)
transactionRecovery.addUnnotarisedTransaction(senderTransaction, TransactionMetadata(ALICE.name, peers = setOf(BOB.name, CHARLIE_NAME)), true)
transactionRecovery.addUnnotarisedTransaction(senderTransaction)
transactionRecovery.addTransactionRecoveryMetadata(senderTransaction.id, TransactionMetadata(ALICE.name,
DistributionList(ONLY_RELEVANT, mapOf(BOB.name to ONLY_RELEVANT, CHARLIE_NAME to ONLY_RELEVANT))), BOB.name)
assertNull(transactionRecovery.getTransaction(senderTransaction.id))
assertEquals(IN_FLIGHT, readTransactionFromDB(senderTransaction.id).status)
@ -206,7 +263,9 @@ class DBTransactionStorageLedgerRecoveryTests {
assertNull(transactionRecovery.getTransactionInternal(senderTransaction.id))
val receiverTransaction = newTransaction(notarySig = false)
transactionRecovery.addUnnotarisedTransaction(receiverTransaction, TransactionMetadata(ALICE.name), false)
transactionRecovery.addUnnotarisedTransaction(receiverTransaction)
transactionRecovery.addTransactionRecoveryMetadata(receiverTransaction.id, TransactionMetadata(ALICE.name,
DistributionList(ONLY_RELEVANT, mapOf(BOB.name to ONLY_RELEVANT))), ALICE.name)
assertNull(transactionRecovery.getTransaction(receiverTransaction.id))
assertEquals(IN_FLIGHT, readTransactionFromDB(receiverTransaction.id).status)

View File

@ -10,7 +10,6 @@ import net.corda.core.crypto.SignableData
import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.TransactionSignature
import net.corda.core.crypto.sign
import net.corda.core.flows.TransactionMetadata
import net.corda.core.serialization.deserialize
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
@ -109,7 +108,7 @@ class DBTransactionStorageTests {
val transactionClock = TransactionClock(now)
newTransactionStorage(clock = transactionClock)
val transaction = newTransaction()
transactionStorage.addUnnotarisedTransaction(transaction, TransactionMetadata(ALICE.party.name), true)
transactionStorage.addUnnotarisedTransaction(transaction)
assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status)
}
@ -132,7 +131,7 @@ class DBTransactionStorageTests {
val transactionClock = TransactionClock(now)
newTransactionStorage(clock = transactionClock)
val transaction = newTransaction(notarySig = false)
transactionStorage.addUnnotarisedTransaction(transaction, TransactionMetadata(ALICE.party.name), true)
transactionStorage.addUnnotarisedTransaction(transaction)
assertNull(transactionStorage.getTransaction(transaction.id))
assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status)
transactionStorage.finalizeTransactionWithExtraSignatures(transaction, emptyList())
@ -148,7 +147,7 @@ class DBTransactionStorageTests {
val transactionClock = TransactionClock(now)
newTransactionStorage(clock = transactionClock)
val transaction = newTransaction(notarySig = false)
transactionStorage.addUnnotarisedTransaction(transaction, TransactionMetadata(ALICE.party.name), true)
transactionStorage.addUnnotarisedTransaction(transaction)
assertNull(transactionStorage.getTransaction(transaction.id))
assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status)
val notarySig = notarySig(transaction.id)
@ -165,7 +164,7 @@ class DBTransactionStorageTests {
val transactionClock = TransactionClock(now)
newTransactionStorage(clock = transactionClock)
val transaction = newTransaction(notarySig = false)
transactionStorage.addUnnotarisedTransaction(transaction, TransactionMetadata(ALICE.party.name), true)
transactionStorage.addUnnotarisedTransaction(transaction)
assertNull(transactionStorage.getTransaction(transaction.id))
assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status)
@ -199,7 +198,7 @@ class DBTransactionStorageTests {
val transactionWithoutNotarySig = newTransaction(notarySig = false)
// txn recorded as un-notarised (simulate ReceiverFinalityFlow in initial flow)
transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySig, TransactionMetadata(ALICE.party.name), false)
transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySig)
assertEquals(IN_FLIGHT, readTransactionFromDB(transactionWithoutNotarySig.id).status)
// txn then recorded as unverified (simulate ResolveTransactionFlow in follow-up flow)
@ -230,7 +229,7 @@ class DBTransactionStorageTests {
val transactionWithoutNotarySigs = newTransaction(notarySig = false)
// txn recorded as un-notarised (simulate ReceiverFinalityFlow in initial flow)
transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySigs, TransactionMetadata(ALICE.party.name), false)
transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySigs)
assertEquals(IN_FLIGHT, readTransactionFromDB(transactionWithoutNotarySigs.id).status)
// txn then recorded as unverified (simulate ResolveTransactionFlow in follow-up flow)

View File

@ -2,17 +2,22 @@ package net.corda.finance.test.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.Amount
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.NotaryException
import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.node.StatesToRecord
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.OpaqueBytes
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.AbstractCashFlow
import net.corda.finance.flows.CashException
import net.corda.finance.issuedBy
import java.util.Currency
@ -32,9 +37,18 @@ class CashIssueWithObserversFlow(private val amount: Amount<Currency>,
val tx = serviceHub.signInitialTransaction(builder, signers)
progressTracker.currentStep = Companion.FINALISING_TX
val observerSessions = observers.map { initiateFlow(it) }
val notarised = finaliseTx(tx, observerSessions, "Unable to notarise issue")
val notarised = finalise(tx, observerSessions, "Unable to notarise issue")
return Result(notarised, ourIdentity)
}
@Suspendable
private fun finalise(tx: SignedTransaction, sessions: Collection<FlowSession>, message: String): SignedTransaction {
try {
return subFlow(FinalityFlow(tx, sessions))
} catch (e: NotaryException) {
throw CashException(message, e)
}
}
}
@InitiatedBy(CashIssueWithObserversFlow::class)
@ -42,7 +56,7 @@ class CashIssueReceiverFlowWithObservers(private val otherSide: FlowSession) : F
@Suspendable
override fun call() {
if (!serviceHub.myInfo.isLegalIdentity(otherSide.counterparty)) {
subFlow(ReceiveFinalityFlow(otherSide))
subFlow(ReceiveFinalityFlow(otherSide, statesToRecord = StatesToRecord.ALL_VISIBLE))
}
}
}

View File

@ -0,0 +1,82 @@
package net.corda.finance.test.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.Amount
import net.corda.core.contracts.InsufficientBalanceException
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.NotaryException
import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.finance.flows.AbstractCashFlow
import net.corda.finance.flows.CashException
import net.corda.finance.workflows.asset.CashUtils
import java.util.Currency
@StartableByRPC
@InitiatingFlow
open class CashPaymentWithObserversFlow(
val amount: Amount<Currency>,
val recipient: Party,
val observers: Set<Party>,
private val useObserverSessions: Boolean = false
) : AbstractCashFlow<SignedTransaction>(tracker()) {
@Suspendable
override fun call(): SignedTransaction {
val recipientSession = initiateFlow(recipient)
val observerSessions = observers.map { initiateFlow(it) }
val builder = TransactionBuilder(notary = serviceHub.networkMapCache.notaryIdentities.first())
logger.info("Generating spend for: ${builder.lockId}")
val (spendTX, keysForSigning) = try {
CashUtils.generateSpend(
serviceHub,
builder,
amount,
ourIdentityAndCert,
recipient
)
} catch (e: InsufficientBalanceException) {
throw CashException("Insufficient cash for spend: ${e.message}", e)
}
logger.info("Signing transaction for: ${spendTX.lockId}")
val tx = serviceHub.signInitialTransaction(spendTX, keysForSigning)
logger.info("Finalising transaction for: ${tx.id}")
val sessionsForFinality = if (serviceHub.myInfo.isLegalIdentity(recipient)) emptyList() else listOf(recipientSession)
val notarised = finalise(tx, sessionsForFinality, observerSessions)
logger.info("Finalised transaction for: ${notarised.id}")
return notarised
}
@Suspendable
private fun finalise(tx: SignedTransaction,
sessions: Collection<FlowSession>,
observerSessions: Collection<FlowSession>): SignedTransaction {
try {
return if (useObserverSessions)
subFlow(FinalityFlow(tx, sessions, observerSessions = observerSessions))
else
subFlow(FinalityFlow(tx, sessions + observerSessions))
} catch (e: NotaryException) {
throw CashException("Unable to notarise spend", e)
}
}
}
@InitiatedBy(CashPaymentWithObserversFlow::class)
class CashPaymentReceiverWithObserversFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
if (!serviceHub.myInfo.isLegalIdentity(otherSide.counterparty)) {
subFlow(ReceiveFinalityFlow(otherSide))
}
}
}

View File

@ -11,6 +11,7 @@ import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.core.flows.TransactionMetadata
import net.corda.core.flows.TransactionStatus
import net.corda.core.identity.CordaX500Name
import net.corda.testing.node.MockServices
import rx.Observable
import rx.subjects.PublishSubject
@ -55,15 +56,17 @@ open class MockTransactionStorage : WritableTransactionStorage, SingletonSeriali
}
}
override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean): Boolean {
override fun addUnnotarisedTransaction(transaction: SignedTransaction): Boolean {
return txns.putIfAbsent(transaction.id, TxHolder(transaction, status = TransactionStatus.IN_FLIGHT)) == null
}
override fun addTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata, caller: CordaX500Name) { }
override fun removeUnnotarisedTransaction(id: SecureHash): Boolean {
return txns.remove(id) != null
}
override fun finalizeTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean) =
override fun finalizeTransaction(transaction: SignedTransaction) =
addTransaction(transaction)
override fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection<TransactionSignature>): Boolean {

View File

@ -9,6 +9,7 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.FlowException
import net.corda.core.flows.TransactionMetadata
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.notary.NotaryService
@ -139,13 +140,15 @@ data class TestTransactionDSLInterpreter private constructor(
override val attachmentsClassLoaderCache: AttachmentsClassLoaderCache = AttachmentsClassLoaderCacheImpl(TestingNamedCacheFactory())
override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: TransactionMetadata) {}
override fun recordUnnotarisedTransaction(txn: SignedTransaction) {}
override fun removeUnnotarisedTransaction(id: SecureHash) {}
override fun finalizeTransactionWithExtraSignatures(txn: SignedTransaction, sigs: Collection<TransactionSignature>, statesToRecord: StatesToRecord) {}
override fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord, metadata: TransactionMetadata) {}
override fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord) {}
override fun recordTransactionRecoveryMetadata(txnId: SecureHash, txnMetadata: TransactionMetadata, caller: CordaX500Name) {}
}
private fun copy(): TestTransactionDSLInterpreter =