diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt index 77a7731d39..35dc41dadd 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt @@ -10,6 +10,7 @@ import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.TransactionVerificationException import net.corda.core.crypto.SecureHash import net.corda.core.crypto.TransactionSignature +import net.corda.core.flows.DistributionList import net.corda.core.flows.FinalityFlow import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic @@ -26,6 +27,7 @@ import net.corda.core.flows.StartableByRPC import net.corda.core.flows.TransactionMetadata import net.corda.core.flows.TransactionStatus import net.corda.core.flows.UnexpectedFlowEndException +import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.FetchDataFlow import net.corda.core.internal.PLATFORM_VERSION @@ -47,9 +49,11 @@ import net.corda.finance.flows.CashIssueFlow import net.corda.finance.flows.CashPaymentFlow import net.corda.finance.issuedBy import net.corda.finance.test.flows.CashIssueWithObserversFlow +import net.corda.finance.test.flows.CashPaymentWithObserversFlow import net.corda.node.services.persistence.DBTransactionStorage import net.corda.node.services.persistence.DBTransactionStorageLedgerRecovery -import net.corda.node.services.persistence.DistributionRecord +import net.corda.node.services.persistence.ReceiverDistributionRecord +import net.corda.node.services.persistence.SenderDistributionRecord import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.testing.contracts.DummyContract import net.corda.testing.core.ALICE_NAME @@ -350,28 +354,120 @@ class FinalityFlowTests : WithFinality { assertThat(aliceNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull assertThat(bobNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull - assertThat(getSenderRecoveryData(stx.id, aliceNode.database)).isNotNull - assertThat(getReceiverRecoveryData(stx.id, bobNode.database)).isNotNull + getSenderRecoveryData(stx.id, aliceNode.database).apply { + assertEquals(1, this.size) + assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].statesToRecord) + assertEquals(BOB_NAME.hashCode().toLong(), this[0].peerPartyId) + } + getReceiverRecoveryData(stx.id, bobNode.database).apply { + assertEquals(StatesToRecord.ALL_VISIBLE, this?.statesToRecord) + assertEquals(StatesToRecord.ONLY_RELEVANT, this?.senderStatesToRecord) + assertEquals(aliceNode.info.singleIdentity().name.hashCode().toLong(), this?.initiatorPartyId) + assertEquals(mapOf(BOB_NAME.hashCode().toLong() to StatesToRecord.ALL_VISIBLE), this?.peersToStatesToRecord) + } } - private fun getSenderRecoveryData(id: SecureHash, database: CordaPersistence): DistributionRecord? { + @Test(timeout=300_000) + fun `two phase finality flow payment transaction with observers`() { + val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY) + val charlieNode = createNode(CHARLIE_NAME, platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY) + + // issue some cash + aliceNode.startFlow(CashIssueFlow(Amount(1000L, GBP), OpaqueBytes.of(1), notary)).resultFuture.getOrThrow().stx + + // standard issuance with observers passed in as FinalityFlow sessions + val stx = aliceNode.startFlowAndRunNetwork(CashPaymentWithObserversFlow( + amount = Amount(100L, GBP), + recipient = bobNode.info.singleIdentity(), + observers = setOf(charlieNode.info.singleIdentity()))).resultFuture.getOrThrow() + + assertThat(aliceNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull + assertThat(bobNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull + assertThat(charlieNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull + + getSenderRecoveryData(stx.id, aliceNode.database).apply { + assertEquals(2, this.size) + assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].statesToRecord) + assertEquals(BOB_NAME.hashCode().toLong(), this[0].peerPartyId) + assertEquals(StatesToRecord.ONLY_RELEVANT, this[1].statesToRecord) + assertEquals(CHARLIE_NAME.hashCode().toLong(), this[1].peerPartyId) + } + getReceiverRecoveryData(stx.id, bobNode.database).apply { + assertEquals(StatesToRecord.ONLY_RELEVANT, this?.statesToRecord) + assertEquals(StatesToRecord.ONLY_RELEVANT, this?.senderStatesToRecord) + assertEquals(aliceNode.info.singleIdentity().name.hashCode().toLong(), this?.initiatorPartyId) + // note: Charlie assertion here is using the hinted StatesToRecord value passed to it from Alice + assertEquals(mapOf(BOB_NAME.hashCode().toLong() to StatesToRecord.ONLY_RELEVANT, + CHARLIE_NAME.hashCode().toLong() to StatesToRecord.ALL_VISIBLE), this?.peersToStatesToRecord) + } + getReceiverRecoveryData(stx.id, charlieNode.database).apply { + assertEquals(StatesToRecord.ONLY_RELEVANT, this?.statesToRecord) + assertEquals(StatesToRecord.ONLY_RELEVANT, this?.senderStatesToRecord) + // note: Charlie assertion here is using actually default StatesToRecord.ONLY_RELEVANT + assertEquals(mapOf(BOB_NAME.hashCode().toLong() to StatesToRecord.ONLY_RELEVANT, + CHARLIE_NAME.hashCode().toLong() to StatesToRecord.ONLY_RELEVANT), this?.peersToStatesToRecord) + } + + // exercise the new FinalityFlow observerSessions constructor parameter + val stx3 = aliceNode.startFlowAndRunNetwork(CashPaymentWithObserversFlow( + amount = Amount(100L, GBP), + recipient = bobNode.info.singleIdentity(), + observers = setOf(charlieNode.info.singleIdentity()), + useObserverSessions = true)).resultFuture.getOrThrow() + + assertThat(aliceNode.services.validatedTransactions.getTransaction(stx3.id)).isNotNull + assertThat(bobNode.services.validatedTransactions.getTransaction(stx3.id)).isNotNull + assertThat(charlieNode.services.validatedTransactions.getTransaction(stx3.id)).isNotNull + + assertEquals(2, getSenderRecoveryData(stx3.id, aliceNode.database).size) + assertThat(getReceiverRecoveryData(stx3.id, bobNode.database)).isNotNull + assertThat(getReceiverRecoveryData(stx3.id, charlieNode.database)).isNotNull + } + + @Test(timeout=300_000) + fun `two phase finality flow payment transaction using confidential identities`() { + val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY) + + aliceNode.startFlow(CashIssueFlow(Amount(1000L, GBP), OpaqueBytes.of(1), notary)).resultFuture.getOrThrow().stx + val stx = aliceNode.startFlowAndRunNetwork(CashPaymentFlow( + amount = Amount(100L, GBP), + recipient = bobNode.info.singleIdentity(), + anonymous = true)).resultFuture.getOrThrow().stx + + assertThat(aliceNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull + assertThat(bobNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull + + getSenderRecoveryData(stx.id, aliceNode.database).apply { + assertEquals(1, this.size) + assertEquals(StatesToRecord.ONLY_RELEVANT, this[0].statesToRecord) + assertEquals(BOB_NAME.hashCode().toLong(), this[0].peerPartyId) + } + getReceiverRecoveryData(stx.id, bobNode.database).apply { + assertEquals(StatesToRecord.ONLY_RELEVANT, this?.statesToRecord) + assertEquals(StatesToRecord.ONLY_RELEVANT, this?.senderStatesToRecord) + assertEquals(aliceNode.info.singleIdentity().name.hashCode().toLong(), this?.initiatorPartyId) + assertEquals(mapOf(BOB_NAME.hashCode().toLong() to StatesToRecord.ONLY_RELEVANT), this?.peersToStatesToRecord) + } + } + + private fun getSenderRecoveryData(id: SecureHash, database: CordaPersistence): List { val fromDb = database.transaction { session.createQuery( "from ${DBTransactionStorageLedgerRecovery.DBSenderDistributionRecord::class.java.name} where tx_id = :transactionId", DBTransactionStorageLedgerRecovery.DBSenderDistributionRecord::class.java ).setParameter("transactionId", id.toString()).resultList.map { it } } - return fromDb.singleOrNull()?.toSenderDistributionRecord() + return fromDb.map { it.toSenderDistributionRecord() }.also { println("SenderDistributionRecord\n$it") } } - private fun getReceiverRecoveryData(id: SecureHash, database: CordaPersistence): DistributionRecord? { + private fun getReceiverRecoveryData(id: SecureHash, database: CordaPersistence): ReceiverDistributionRecord? { val fromDb = database.transaction { session.createQuery( "from ${DBTransactionStorageLedgerRecovery.DBReceiverDistributionRecord::class.java.name} where tx_id = :transactionId", DBTransactionStorageLedgerRecovery.DBReceiverDistributionRecord::class.java ).setParameter("transactionId", id.toString()).resultList.map { it } } - return fromDb.singleOrNull()?.toReceiverDistributionRecord(MockCryptoService(emptyMap())) + return fromDb.singleOrNull()?.toReceiverDistributionRecord(MockCryptoService(emptyMap())).also { println("ReceiverDistributionRecord\n$it") } } @StartableByRPC @@ -445,12 +541,10 @@ class FinalityFlowTests : WithFinality { override fun call(): SignedTransaction { // Mimic ReceiveFinalityFlow but fail to finalise try { - val stx = subFlow(ReceiveTransactionFlow(otherSideSession, - checkSufficientSignatures = false, statesToRecord = StatesToRecord.ONLY_RELEVANT, deferredAck = true)) + val stx = subFlow(ReceiveTransactionFlow(otherSideSession, false, StatesToRecord.ONLY_RELEVANT, true)) require(NotarySigCheck.needsNotarySignature(stx)) logger.info("Peer recording transaction without notary signature.") - (serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx, - TransactionMetadata(otherSideSession.counterparty.name, StatesToRecord.ONLY_RELEVANT)) + (serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx) otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (overrideAutoAck) logger.info("Peer recorded transaction without notary signature.") @@ -494,7 +588,8 @@ class FinalityFlowTests : WithFinality { val txBuilder = DummyContract.move(stateAndRef, newOwner) val stxn = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey) val sessionWithCounterParty = initiateFlow(newOwner) - subFlow(SendTransactionFlow(sessionWithCounterParty, stxn)) + subFlow(SendTransactionFlow(sessionWithCounterParty, stxn, + TransactionMetadata(ourIdentity.name, DistributionList(StatesToRecord.ONLY_RELEVANT, mapOf(BOB_NAME to StatesToRecord.ONLY_RELEVANT))))) throw UnexpectedFlowEndException("${stxn.id}") } } @@ -514,6 +609,11 @@ class FinalityFlowTests : WithFinality { version = MOCK_VERSION_INFO.copy(platformVersion = platformVersion))) } + private fun createNode(legalName: CordaX500Name, cordapps: List = emptyList(), platformVersion: Int = PLATFORM_VERSION): TestStartedNode { + return mockNet.createNode(InternalMockNodeParameters(legalName = legalName, additionalCordapps = cordapps, + version = MOCK_VERSION_INFO.copy(platformVersion = platformVersion))) + } + private fun TestStartedNode.issuesCashTo(recipient: TestStartedNode): SignedTransaction { return issuesCashTo(recipient.info.singleIdentity()) } diff --git a/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt b/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt index 9219edccd4..4770244c72 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt @@ -6,6 +6,7 @@ import net.corda.core.crypto.SecureHash import net.corda.core.crypto.TransactionSignature import net.corda.core.crypto.isFulfilledBy import net.corda.core.flows.NotarySigCheck.needsNotarySignature +import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.identity.groupAbstractPartyByWellKnownParty import net.corda.core.internal.FetchDataFlow @@ -15,6 +16,7 @@ import net.corda.core.internal.pushToLoggingContext import net.corda.core.internal.telemetry.telemetryServiceInternal import net.corda.core.internal.warnOnce import net.corda.core.node.StatesToRecord +import net.corda.core.node.StatesToRecord.ALL_VISIBLE import net.corda.core.node.StatesToRecord.ONLY_RELEVANT import net.corda.core.transactions.LedgerTransaction import net.corda.core.transactions.SignedTransaction @@ -41,6 +43,9 @@ import java.time.Duration * can also be included, but they must specify [StatesToRecord.ALL_VISIBLE] for statesToRecord if they wish to record the * contract states into their vaults. * + * As of 4.11 a list of observer [FlowSession] can be specified to indicate sessions with transaction non-participants (e.g. observers). + * This enables ledger recovery to default these sessions associated StatesToRecord value to [StatesToRecord.ALL_VISIBLE]. + * * The flow returns the same transaction but with the additional signatures from the notary. * * NOTE: This is an inlined flow but for backwards compatibility is annotated with [InitiatingFlow]. @@ -55,13 +60,14 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, override val progressTracker: ProgressTracker, private val sessions: Collection, private val newApi: Boolean, - private val statesToRecord: StatesToRecord = ONLY_RELEVANT) : FlowLogic() { + private val statesToRecord: StatesToRecord = ONLY_RELEVANT, + private val observerSessions: Collection = emptySet()) : FlowLogic() { @CordaInternal - data class ExtraConstructorArgs(val oldParticipants: Collection, val sessions: Collection, val newApi: Boolean, val statesToRecord: StatesToRecord) + data class ExtraConstructorArgs(val oldParticipants: Collection, val sessions: Collection, val newApi: Boolean, val statesToRecord: StatesToRecord, val observerSessions: Collection) @CordaInternal - fun getExtraConstructorArgs() = ExtraConstructorArgs(oldParticipants, sessions, newApi, statesToRecord) + fun getExtraConstructorArgs() = ExtraConstructorArgs(oldParticipants, sessions, newApi, statesToRecord, observerSessions) @Deprecated(DEPRECATION_MSG) constructor(transaction: SignedTransaction, extraRecipients: Set, progressTracker: ProgressTracker) : this( @@ -133,6 +139,10 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, progressTracker: ProgressTracker ) : this(transaction, oldParticipants, progressTracker, sessions, true) + constructor(transaction: SignedTransaction, + sessions: Collection, + observerSessions: Collection) : this(transaction, emptyList(), tracker(), sessions, true, observerSessions = observerSessions) + companion object { private const val DEPRECATION_MSG = "It is unsafe to use this constructor as it requires nodes to automatically " + "accept notarised transactions without first checking their relevancy. Instead, use one of the constructors " + @@ -158,6 +168,9 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, fun tracker() = ProgressTracker(RECORD_UNNOTARISED, BROADCASTING_PRE_NOTARISATION, NOTARISING, BROADCASTING_POST_NOTARISATION, BROADCASTING_NOTARY_ERROR, FINALISING_TRANSACTION, BROADCASTING) } + private lateinit var externalTxParticipants: Set + private lateinit var txnMetadata: TransactionMetadata + @Suspendable @Suppress("ComplexMethod", "NestedBlockDepth") @Throws(NotaryException::class) @@ -169,6 +182,9 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, require(sessions.none { serviceHub.myInfo.isLegalIdentity(it.counterparty) }) { "Do not provide flow sessions for the local node. FinalityFlow will record the notarised transaction locally." } + sessions.intersect(observerSessions.toSet()).let { + require(it.isEmpty()) { "The following parties are specified both in flow sessions and observer flow sessions: $it" } + } } // Note: this method is carefully broken up to minimize the amount of data reachable from the stack at @@ -179,7 +195,7 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, transaction.pushToLoggingContext() logCommandData() val ledgerTransaction = verifyTx() - val externalTxParticipants = extractExternalParticipants(ledgerTransaction) + externalTxParticipants = extractExternalParticipants(ledgerTransaction) if (newApi) { val sessionParties = sessions.map { it.counterparty }.toSet() @@ -199,12 +215,14 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, // - broadcast notary signature to external participants (finalise remotely) // - finalise locally - val (oldPlatformSessions, newPlatformSessions) = sessions.partition { + val (oldPlatformSessions, newPlatformSessions) = (sessions + observerSessions).partition { serviceHub.networkMapCache.getNodeByLegalIdentity(it.counterparty)?.platformVersion!! < PlatformVersionSwitches.TWO_PHASE_FINALITY } val requiresNotarisation = needsNotarySignature(transaction) val useTwoPhaseFinality = serviceHub.myInfo.platformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY + txnMetadata = TransactionMetadata(serviceHub.myInfo.legalIdentities.first().name, + DistributionList(statesToRecord, deriveStatesToRecord(newPlatformSessions))) if (useTwoPhaseFinality) { val stxn = if (requiresNotarisation) { recordLocallyAndBroadcast(newPlatformSessions, transaction) @@ -226,11 +244,7 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, } else { if (newPlatformSessions.isNotEmpty()) - finaliseLocallyAndBroadcast(newPlatformSessions, transaction, - TransactionMetadata( - serviceHub.myInfo.legalIdentities.first().name, - statesToRecord, - sessions.map { it.counterparty.name }.toSet())) + finaliseLocallyAndBroadcast(newPlatformSessions, transaction) else recordTransactionLocally(transaction) transaction @@ -258,9 +272,9 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, } @Suspendable - private fun finaliseLocallyAndBroadcast(sessions: Collection, tx: SignedTransaction, metadata: TransactionMetadata) { + private fun finaliseLocallyAndBroadcast(sessions: Collection, tx: SignedTransaction) { serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finaliseLocallyAndBroadcast", flowLogic = this) { - finaliseLocally(tx, metadata = metadata) + finaliseLocally(tx) progressTracker.currentStep = BROADCASTING broadcast(sessions, tx) } @@ -272,7 +286,8 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, sessions.forEach { session -> try { logger.debug { "Sending transaction to party $session." } - subFlow(SendTransactionFlow(session, tx)) + subFlow(SendTransactionFlow(session, tx, txnMetadata)) + txnMetadata = txnMetadata.copy(persist = false) } catch (e: UnexpectedFlowEndException) { throw UnexpectedFlowEndException( "${session.counterparty} has finished prematurely and we're trying to send them a transaction." + @@ -285,6 +300,13 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, } } + private fun deriveStatesToRecord(newPlatformSessions: Collection): Map { + val derivedObserverSessions = newPlatformSessions.map { it.counterparty }.toSet() - externalTxParticipants + val txParticipantSessions = externalTxParticipants + return txParticipantSessions.map { it.name to ONLY_RELEVANT }.toMap() + + (derivedObserverSessions + observerSessions.map { it.counterparty }).map { it.name to ALL_VISIBLE } + } + @Suspendable private fun broadcastSignaturesAndFinalise(sessions: Collection, notarySignatures: List) { progressTracker.currentStep = BROADCASTING_POST_NOTARISATION @@ -309,12 +331,11 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, } @Suspendable - private fun finaliseLocally(stx: SignedTransaction, notarySignatures: List = emptyList(), - metadata: TransactionMetadata? = null) { + private fun finaliseLocally(stx: SignedTransaction, notarySignatures: List = emptyList()) { progressTracker.currentStep = FINALISING_TRANSACTION serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finaliseLocally", flowLogic = this) { if (notarySignatures.isEmpty()) { - (serviceHub as ServiceHubCoreInternal).finalizeTransaction(stx, statesToRecord, metadata!!) + (serviceHub as ServiceHubCoreInternal).finalizeTransaction(stx, statesToRecord) logger.info("Finalised transaction locally.") } else { (serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx, notarySignatures, statesToRecord) @@ -355,7 +376,8 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, for (session in sessions) { try { logger.debug { "Sending transaction to party $session." } - subFlow(SendTransactionFlow(session, tx)) + subFlow(SendTransactionFlow(session, tx, txnMetadata)) + txnMetadata = txnMetadata.copy(persist = false) } catch (e: UnexpectedFlowEndException) { throw UnexpectedFlowEndException( "${session.counterparty} has finished prematurely and we're trying to send them the finalised transaction. " + @@ -378,7 +400,8 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, if (!serviceHub.myInfo.isLegalIdentity(recipient)) { logger.debug { "Sending transaction to party $recipient." } val session = initiateFlow(recipient) - subFlow(SendTransactionFlow(session, notarised)) + subFlow(SendTransactionFlow(session, notarised, txnMetadata)) + txnMetadata = txnMetadata.copy(persist = false) logger.info("Party $recipient received the transaction.") } } @@ -404,11 +427,7 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, private fun recordUnnotarisedTransaction(tx: SignedTransaction): SignedTransaction { progressTracker.currentStep = RECORD_UNNOTARISED serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) { - (serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(tx, - TransactionMetadata( - serviceHub.myInfo.legalIdentities.first().name, - statesToRecord, - sessions.map { it.counterparty.name }.toSet())) + (serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(tx) logger.info("Recorded un-notarised transaction locally.") return tx } @@ -487,7 +506,7 @@ class ReceiveFinalityFlow @JvmOverloads constructor(private val otherSideSession @Suppress("ComplexMethod", "NestedBlockDepth") @Suspendable override fun call(): SignedTransaction { - val stx = subFlow(ReceiveTransactionFlow(otherSideSession, checkSufficientSignatures = false, statesToRecord = statesToRecord, deferredAck = true)) + val stx = subFlow(ReceiveTransactionFlow(otherSideSession, false, statesToRecord, true)) val requiresNotarisation = needsNotarySignature(stx) val fromTwoPhaseFinalityNode = serviceHub.networkMapCache.getNodeByLegalIdentity(otherSideSession.counterparty)?.platformVersion!! >= PlatformVersionSwitches.TWO_PHASE_FINALITY @@ -495,8 +514,7 @@ class ReceiveFinalityFlow @JvmOverloads constructor(private val otherSideSession if (requiresNotarisation) { serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) { logger.debug { "Peer recording transaction without notary signature." } - (serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx, - TransactionMetadata(otherSideSession.counterparty.name, statesToRecord)) + (serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx) } otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck) logger.info("Peer recorded transaction without notary signature. Waiting to receive notary signature.") @@ -522,8 +540,7 @@ class ReceiveFinalityFlow @JvmOverloads constructor(private val otherSideSession } } else { serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransaction", flowLogic = this) { - (serviceHub as ServiceHubCoreInternal).finalizeTransaction(stx, statesToRecord, - TransactionMetadata(otherSideSession.counterparty.name, statesToRecord)) + (serviceHub as ServiceHubCoreInternal).finalizeTransaction(stx, statesToRecord) logger.info("Peer recorded transaction with recovery metadata.") } otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck) diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowTransaction.kt b/core/src/main/kotlin/net/corda/core/flows/FlowTransaction.kt index 8f0c4a901a..532a90299d 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowTransaction.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowTransaction.kt @@ -24,8 +24,14 @@ data class FlowTransactionInfo( @CordaSerializable data class TransactionMetadata( val initiator: CordaX500Name, - val statesToRecord: StatesToRecord? = StatesToRecord.ONLY_RELEVANT, - val peers: Set? = null + val distributionList: DistributionList, + val persist: Boolean = true // hint to persist to transactional store +) + +@CordaSerializable +data class DistributionList( + val senderStatesToRecord: StatesToRecord, + val peersToStatesToRecord: Map ) @CordaSerializable diff --git a/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt b/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt index 4f5d04b6d9..6b46c96de5 100644 --- a/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt @@ -1,8 +1,13 @@ package net.corda.core.flows import co.paralleluniverse.fibers.Suspendable -import net.corda.core.contracts.* +import net.corda.core.contracts.AttachmentResolutionException +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.TransactionResolutionException +import net.corda.core.contracts.TransactionVerificationException import net.corda.core.internal.ResolveTransactionsFlow +import net.corda.core.internal.ServiceHubCoreInternal import net.corda.core.internal.checkParameterHash import net.corda.core.internal.pushToLoggingContext import net.corda.core.node.StatesToRecord @@ -53,19 +58,23 @@ open class ReceiveTransactionFlow constructor(private val otherSideSession: Flow } else { logger.trace { "Receiving a transaction (but without checking the signatures) from ${otherSideSession.counterparty}" } } - val stx = otherSideSession.receive().unwrap { - it.pushToLoggingContext() - logger.info("Received transaction acknowledgement request from party ${otherSideSession.counterparty}.") - checkParameterHash(it.networkParametersHash) - subFlow(ResolveTransactionsFlow(it, otherSideSession, statesToRecord, deferredAck)) - logger.info("Transaction dependencies resolution completed.") - try { - it.verify(serviceHub, checkSufficientSignatures) - it - } catch (e: Exception) { - logger.warn("Transaction verification failed.") - throw e - } + + val payload = otherSideSession.receive().unwrap { it } + val stx = + if (payload is SignedTransactionWithDistributionList) { + recordTransactionMetadata(payload.stx, payload.distributionList) + payload.stx + } else payload as SignedTransaction + stx.pushToLoggingContext() + logger.info("Received transaction acknowledgement request from party ${otherSideSession.counterparty}.") + checkParameterHash(stx.networkParametersHash) + subFlow(ResolveTransactionsFlow(stx, otherSideSession, statesToRecord, deferredAck)) + logger.info("Transaction dependencies resolution completed.") + try { + stx.verify(serviceHub, checkSufficientSignatures) + } catch (e: Exception) { + logger.warn("Transaction verification failed.") + throw e } if (checkSufficientSignatures) { // We should only send a transaction to the vault for processing if we did in fact fully verify it, and @@ -78,6 +87,21 @@ open class ReceiveTransactionFlow constructor(private val otherSideSession: Flow return stx } + @Suspendable + private fun recordTransactionMetadata(stx: SignedTransaction, distributionList: DistributionList?) { + distributionList?.let { + val txnMetadata = TransactionMetadata(otherSideSession.counterparty.name, + DistributionList(distributionList.senderStatesToRecord, + distributionList.peersToStatesToRecord.map { (peer, peerStatesToRecord) -> + if (peer == ourIdentity.name) + peer to statesToRecord // use actual value + else + peer to peerStatesToRecord // use hinted value + }.toMap())) + (serviceHub as ServiceHubCoreInternal).recordTransactionRecoveryMetadata(stx.id, txnMetadata, ourIdentity.name) + } + } + /** * Hook to perform extra checks on the received transaction just before it's recorded. The transaction has already * been resolved and verified at this point. diff --git a/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt b/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt index 233a89236b..68f89469d2 100644 --- a/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt @@ -4,14 +4,24 @@ import co.paralleluniverse.fibers.Suspendable import net.corda.core.contracts.NamedByHash import net.corda.core.contracts.StateAndRef import net.corda.core.crypto.SecureHash +import net.corda.core.identity.CordaX500Name import net.corda.core.internal.* +import net.corda.core.node.StatesToRecord import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.transactions.SignedTransaction -import net.corda.core.utilities.unwrap import net.corda.core.utilities.trace +import net.corda.core.utilities.unwrap +import kotlin.collections.List +import kotlin.collections.MutableSet +import kotlin.collections.Set +import kotlin.collections.flatMap +import kotlin.collections.map +import kotlin.collections.mutableSetOf +import kotlin.collections.plus +import kotlin.collections.toSet /** * In the words of Matt working code is more important then pretty code. This class that contains code that may @@ -66,8 +76,16 @@ class MaybeSerializedSignedTransaction(override val id: SecureHash, val serializ * * @param otherSide the target party. * @param stx the [SignedTransaction] being sent to the [otherSideSession]. + * @property txnMetadata transaction recovery metadata (eg. used by Two Phase Finality). */ -open class SendTransactionFlow(otherSide: FlowSession, stx: SignedTransaction) : DataVendingFlow(otherSide, stx) +open class SendTransactionFlow(otherSide: FlowSession, stx: SignedTransaction, txnMetadata: TransactionMetadata) : DataVendingFlow(otherSide, stx, txnMetadata) { + constructor(otherSide: FlowSession, stx: SignedTransaction) : this(otherSide, stx, + TransactionMetadata(DUMMY_PARTICIPANT_NAME, DistributionList(StatesToRecord.NONE, mapOf(otherSide.counterparty.name to StatesToRecord.ALL_VISIBLE)))) + // Note: DUMMY_PARTICIPANT_NAME to be substituted with actual "ourIdentity.name" in flow call() + companion object { + val DUMMY_PARTICIPANT_NAME = CordaX500Name("Transaction Participant", "London", "GB") + } +} /** * The [SendStateAndRefFlow] should be used to send a list of input [StateAndRef] to another peer that wishes to verify @@ -80,7 +98,9 @@ open class SendTransactionFlow(otherSide: FlowSession, stx: SignedTransaction) : */ open class SendStateAndRefFlow(otherSideSession: FlowSession, stateAndRefs: List>) : DataVendingFlow(otherSideSession, stateAndRefs) -open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any) : FlowLogic() { +open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any, val txnMetadata: TransactionMetadata? = null) : FlowLogic() { + constructor(otherSideSession: FlowSession, payload: Any) : this(otherSideSession, payload, null) + @Suspendable protected open fun sendPayloadAndReceiveDataRequest(otherSideSession: FlowSession, payload: Any) = otherSideSession.sendAndReceive(payload) @@ -89,6 +109,7 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any) // User can override this method to perform custom request verification. } + @Suppress("ComplexCondition", "ComplexMethod") @Suspendable override fun call(): Void? { val networkMaxMessageSize = serviceHub.networkParameters.maxMessageSize @@ -117,6 +138,15 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any) else -> throw Exception("Unknown payload type: ${payload::class.java} ?") } + // store and share transaction recovery metadata if required + val useTwoPhaseFinality = serviceHub.myInfo.platformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY + val toTwoPhaseFinalityNode = serviceHub.networkMapCache.getNodeByLegalIdentity(otherSideSession.counterparty)?.platformVersion!! >= PlatformVersionSwitches.TWO_PHASE_FINALITY + if (txnMetadata != null && toTwoPhaseFinalityNode && useTwoPhaseFinality && payload is SignedTransaction) { + payload = SignedTransactionWithDistributionList(payload, txnMetadata.distributionList) + if (txnMetadata.persist) + (serviceHub as ServiceHubCoreInternal).recordTransactionRecoveryMetadata(payload.stx.id, txnMetadata.copy(initiator = ourIdentity.name), ourIdentity.name) + } + // This loop will receive [FetchDataFlow.Request] continuously until the `otherSideSession` has all the data they need // to resolve the transaction, a [FetchDataFlow.EndRequest] will be sent from the `otherSideSession` to indicate end of // data request. @@ -240,3 +270,9 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any) } } } + +@CordaSerializable +data class SignedTransactionWithDistributionList( + val stx: SignedTransaction, + val distributionList: DistributionList +) \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt b/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt index 264353f932..239c37166e 100644 --- a/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt +++ b/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt @@ -5,6 +5,7 @@ import net.corda.core.DeleteForDJVM import net.corda.core.crypto.SecureHash import net.corda.core.crypto.TransactionSignature import net.corda.core.flows.TransactionMetadata +import net.corda.core.identity.CordaX500Name import net.corda.core.internal.notary.NotaryService import net.corda.core.node.ServiceHub import net.corda.core.node.StatesToRecord @@ -35,9 +36,8 @@ interface ServiceHubCoreInternal : ServiceHub { * This is expected to be run within a database transaction. * * @param txn The transaction to record. - * @param metadata Finality flow recovery metadata. */ - fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: TransactionMetadata) + fun recordUnnotarisedTransaction(txn: SignedTransaction) /** * Removes transaction from data store. @@ -61,9 +61,17 @@ interface ServiceHubCoreInternal : ServiceHub { * * @param txn The transaction to record. * @param statesToRecord how the vault should treat the output states of the transaction. - * @param metadata Finality flow recovery metadata. */ - fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord, metadata: TransactionMetadata) + fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord) + + /** + * Records [TransactionMetadata] for a given txnId. + * + * @param txnId The SecureHash of a transaction. + * @param txnMetadata The recovery metadata associated with a transaction. + * @param caller The CordaX500Name of the party calling this operation. + */ + fun recordTransactionRecoveryMetadata(txnId: SecureHash, txnMetadata: TransactionMetadata, caller: CordaX500Name) } interface TransactionsResolver { diff --git a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt index 30163d6442..4b3456d084 100644 --- a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt +++ b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt @@ -14,6 +14,7 @@ import net.corda.core.internal.PlatformVersionSwitches.TWO_PHASE_FINALITY import net.corda.core.internal.telemetry.TelemetryComponent import net.corda.core.node.services.* import net.corda.core.node.services.diagnostics.DiagnosticsService +import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SerializeAsToken import net.corda.core.transactions.FilteredTransaction import net.corda.core.transactions.LedgerTransaction @@ -90,6 +91,7 @@ interface ServicesForResolution { * Controls whether the transaction is sent to the vault at all, and if so whether states have to be relevant * or not in order to be recorded. Used in [ServiceHub.recordTransactions] */ +@CordaSerializable enum class StatesToRecord { /** The received transaction is not sent to the vault at all. This is used within transaction resolution. */ NONE, diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index 4d2e1e7b44..fb7a6d9f16 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -8,6 +8,7 @@ import net.corda.core.flows.FlowLogic import net.corda.core.flows.TransactionMetadata import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.TransactionStatus +import net.corda.core.identity.CordaX500Name import net.corda.core.internal.FlowStateMachineHandle import net.corda.core.internal.NamedCacheFactory import net.corda.core.internal.ResolveTransactionsFlow @@ -194,6 +195,9 @@ interface ServiceHubInternal : ServiceHubCoreInternal { override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable) = recordTransactions(statesToRecord, txs, SIGNATURE_VERIFICATION_DISABLED) + override fun recordTransactionRecoveryMetadata(txnId: SecureHash, txnMetadata: TransactionMetadata, caller: CordaX500Name) = + validatedTransactions.addTransactionRecoveryMetadata(txnId, txnMetadata, caller) + @Suppress("NestedBlockDepth") @VisibleForTesting fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable, disableSignatureVerification: Boolean) { @@ -240,27 +244,25 @@ interface ServiceHubInternal : ServiceHubCoreInternal { ) } - override fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord, metadata: TransactionMetadata) { + override fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord) { requireSupportedHashType(txn) if (txn.coreTransaction is WireTransaction) txn.verifyRequiredSignatures() database.transaction { recordTransactions(statesToRecord, listOf(txn), validatedTransactions, stateMachineRecordedTransactionMapping, vaultService, database) { - val isInitiator = metadata.initiator == myInfo.legalIdentities.first().name - validatedTransactions.finalizeTransaction(txn, metadata, isInitiator) + validatedTransactions.finalizeTransaction(txn) } } } - override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: TransactionMetadata) { + override fun recordUnnotarisedTransaction(txn: SignedTransaction) { if (txn.coreTransaction is WireTransaction) { txn.notary?.let { notary -> txn.verifySignaturesExcept(notary.owningKey) } ?: txn.verifyRequiredSignatures() } database.transaction { - val isInitiator = metadata.initiator == myInfo.legalIdentities.first().name - validatedTransactions.addUnnotarisedTransaction(txn, metadata, isInitiator) + validatedTransactions.addUnnotarisedTransaction(txn) } } @@ -360,10 +362,18 @@ interface WritableTransactionStorage : TransactionStorage { * Add an un-notarised transaction to the store with a status of *MISSING_TRANSACTION_SIG* and inclusive of flow recovery metadata. * * @param transaction The transaction to be recorded. - * @param metadata Finality flow recovery metadata. * @return true if the transaction was recorded as a *new* transaction, false if the transaction already exists. */ - fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean): Boolean + fun addUnnotarisedTransaction(transaction: SignedTransaction): Boolean + + /** + * Record transaction recovery metadata for a given transaction id. + * + * @param id The SecureHash of the transaction to be recorded. + * @param metadata transaction recovery metadata. + * @param caller The CordaX500Name of the party calling this operation. + */ + fun addTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata, caller: CordaX500Name) /** * Removes an un-notarised transaction (with a status of *MISSING_TRANSACTION_SIG*) from the data store. @@ -375,10 +385,9 @@ interface WritableTransactionStorage : TransactionStorage { * Add a finalised transaction to the store with flow recovery metadata. * * @param transaction The transaction to be recorded. - * @param metadata Finality flow recovery metadata. * @return true if the transaction was recorded as a *new* transaction, false if the transaction already exists. */ - fun finalizeTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean): Boolean + fun finalizeTransaction(transaction: SignedTransaction): Boolean /** * Update a previously un-notarised transaction including associated notary signatures. diff --git a/node/src/main/kotlin/net/corda/node/services/network/PersistentPartyInfoCache.kt b/node/src/main/kotlin/net/corda/node/services/network/PersistentPartyInfoCache.kt index bb3c9aed9f..e9ba12a3a6 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/PersistentPartyInfoCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/PersistentPartyInfoCache.kt @@ -56,7 +56,6 @@ class PersistentPartyInfoCache(private val networkMapCache: PersistentNetworkMap private fun updateInfoDB(partyHashCode: Long, partyName: CordaX500Name) { database.transaction { if (queryByPartyId(session, partyHashCode) == null) { - println("PartyInfo: $partyHashCode -> $partyName") session.save(DBTransactionStorageLedgerRecovery.DBRecoveryPartyInfo(partyHashCode, partyName.toString())) } } diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt index b78e2adb35..597d1a8675 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt @@ -4,6 +4,7 @@ import net.corda.core.concurrent.CordaFuture import net.corda.core.crypto.SecureHash import net.corda.core.crypto.TransactionSignature import net.corda.core.flows.TransactionMetadata +import net.corda.core.identity.CordaX500Name import net.corda.core.internal.NamedCacheFactory import net.corda.core.internal.ThreadBox import net.corda.core.internal.VisibleForTesting @@ -208,12 +209,14 @@ open class DBTransactionStorage(private val database: CordaPersistence, cacheFac updateTransaction(transaction.id) } - override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean) = + override fun addUnnotarisedTransaction(transaction: SignedTransaction) = addTransaction(transaction, TransactionStatus.IN_FLIGHT) { false } - override fun finalizeTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean) = + override fun addTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata, caller: CordaX500Name) { } + + override fun finalizeTransaction(transaction: SignedTransaction) = addTransaction(transaction) { false } diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt index f7b65472ac..5abbff52a4 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecovery.kt @@ -10,7 +10,6 @@ import net.corda.core.node.services.vault.Sort import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize -import net.corda.core.transactions.SignedTransaction import net.corda.node.CordaClock import net.corda.node.services.network.PersistentPartyInfoCache import net.corda.nodeapi.internal.cryptoservice.CryptoService @@ -19,6 +18,7 @@ import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.serialization.internal.CordaSerializationEncoding import org.hibernate.annotations.Immutable import java.io.Serializable +import java.lang.IllegalStateException import java.time.Instant import java.util.concurrent.atomic.AtomicLong import javax.persistence.Column @@ -100,13 +100,13 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, @Column(name = "sender_states_to_record", nullable = false) val senderStatesToRecord: StatesToRecord ) { - constructor(key: Key, txId: SecureHash, initiatorPartyId: Long, peerPartyIds: Set, statesToRecord: StatesToRecord, cryptoService: CryptoService) : + constructor(key: Key, txId: SecureHash, initiatorPartyId: Long, peersToStatesToRecord: Map, senderStatesToRecord: StatesToRecord, receiverStatesToRecord: StatesToRecord, cryptoService: CryptoService) : this(PersistentKey(key), txId = txId.toString(), senderPartyId = initiatorPartyId, - distributionList = cryptoService.encrypt(peerPartyIds.serialize(context = contextToUse().withEncoding(CordaSerializationEncoding.SNAPPY)).bytes), - receiverStatesToRecord = statesToRecord, - senderStatesToRecord = StatesToRecord.NONE // to be set in follow-up PR. + distributionList = cryptoService.encrypt(peersToStatesToRecord.serialize(context = contextToUse().withEncoding(CordaSerializationEncoding.SNAPPY)).bytes), + receiverStatesToRecord = receiverStatesToRecord, + senderStatesToRecord = senderStatesToRecord ) fun toReceiverDistributionRecord(cryptoService: CryptoService) = @@ -115,6 +115,7 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, this.senderPartyId, cryptoService.decrypt(this.distributionList).deserialize(context = contextToUse()), this.receiverStatesToRecord, + this.senderStatesToRecord, this.compositeKey.timestamp ) } @@ -141,17 +142,33 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, } } - override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean): Boolean { - return addTransaction(transaction, TransactionStatus.IN_FLIGHT) { - addTransactionRecoveryMetadata(transaction.id, metadata, isInitiator, clock) + @Suppress("IMPLICIT_CAST_TO_ANY") + override fun addTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata, caller: CordaX500Name) { + database.transaction { + if (caller == metadata.initiator) { + metadata.distributionList.peersToStatesToRecord.map { (peer, _) -> + val senderDistributionRecord = DBSenderDistributionRecord(PersistentKey(Key(clock.instant())), + id.toString(), + partyInfoCache.getPartyIdByCordaX500Name(peer), + metadata.distributionList.senderStatesToRecord) + session.save(senderDistributionRecord) + } + } else { + val receiverStatesToRecord = metadata.distributionList.peersToStatesToRecord[caller] ?: throw IllegalStateException("Missing peer $caller in distribution list of Receiver recovery metadata") + val receiverDistributionRecord = + DBReceiverDistributionRecord(Key(clock.instant()), + id, + partyInfoCache.getPartyIdByCordaX500Name(metadata.initiator), + metadata.distributionList.peersToStatesToRecord.map { (peer, statesToRecord) -> + partyInfoCache.getPartyIdByCordaX500Name(peer) to statesToRecord }.toMap(), + metadata.distributionList.senderStatesToRecord, + receiverStatesToRecord, + cryptoService) + session.save(receiverDistributionRecord) + } } } - override fun finalizeTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean) = - addTransaction(transaction) { - addTransactionRecoveryMetadata(transaction.id, metadata, isInitiator, clock) - } - override fun removeUnnotarisedTransaction(id: SecureHash): Boolean { return database.transaction { super.removeUnnotarisedTransaction(id) @@ -260,31 +277,6 @@ class DBTransactionStorageLedgerRecovery(private val database: CordaPersistence, results.map { it.toReceiverDistributionRecord(cryptoService) }.toList() } } - - @Suppress("IMPLICIT_CAST_TO_ANY") - private fun addTransactionRecoveryMetadata(txId: SecureHash, metadata: TransactionMetadata, isInitiator: Boolean, clock: CordaClock): Boolean { - database.transaction { - if (isInitiator) { - metadata.peers?.map { peer -> - val senderDistributionRecord = DBSenderDistributionRecord(PersistentKey(Key(clock.instant())), - txId.toString(), - partyInfoCache.getPartyIdByCordaX500Name(peer), - metadata.statesToRecord ?: StatesToRecord.ONLY_RELEVANT) - session.save(senderDistributionRecord) - } - } else { - val receiverDistributionRecord = - DBReceiverDistributionRecord(Key(clock.instant()), - txId, - partyInfoCache.getPartyIdByCordaX500Name(metadata.initiator), - metadata.peers?.map { partyInfoCache.getPartyIdByCordaX500Name(it) }?.toSet() ?: emptySet(), - metadata.statesToRecord ?: StatesToRecord.ONLY_RELEVANT, - cryptoService) - session.save(receiverDistributionRecord) - } - } - return false - } } // TO DO: https://r3-cev.atlassian.net/browse/ENT-9876 @@ -316,8 +308,9 @@ data class SenderDistributionRecord( data class ReceiverDistributionRecord( override val txId: SecureHash, val initiatorPartyId: Long, // CordaX500Name hashCode() - val peerPartyIds: Set, // CordaX500Name hashCode() + val peersToStatesToRecord: Map, // CordaX500Name hashCode() -> StatesToRecord override val statesToRecord: StatesToRecord, + val senderStatesToRecord: StatesToRecord, override val timestamp: Instant ) : DistributionRecord(txId, statesToRecord, timestamp) diff --git a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt index 6f87a4f525..bd25b3c512 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt @@ -801,23 +801,29 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) { return true } - override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean): Boolean { + override fun addUnnotarisedTransaction(transaction: SignedTransaction): Boolean { database.transaction { records.add(TxRecord.Add(transaction)) - delegate.addUnnotarisedTransaction(transaction, metadata, isInitiator) + delegate.addUnnotarisedTransaction(transaction) } return true } + override fun addTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata, caller: CordaX500Name) { + database.transaction { + delegate.addTransactionRecoveryMetadata(id, metadata, caller) + } + } + override fun removeUnnotarisedTransaction(id: SecureHash): Boolean { return database.transaction { delegate.removeUnnotarisedTransaction(id) } } - override fun finalizeTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean): Boolean { + override fun finalizeTransaction(transaction: SignedTransaction): Boolean { database.transaction { - delegate.finalizeTransaction(transaction, metadata, isInitiator) + delegate.finalizeTransaction(transaction) } return true } diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt index 4e17a0b6f9..3766e5629f 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageLedgerRecoveryTests.kt @@ -6,10 +6,13 @@ import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SignableData import net.corda.core.crypto.SignatureMetadata import net.corda.core.crypto.sign +import net.corda.core.flows.DistributionList import net.corda.core.flows.TransactionMetadata import net.corda.core.flows.RecoveryTimeWindow import net.corda.core.node.NodeInfo -import net.corda.core.node.StatesToRecord +import net.corda.core.node.StatesToRecord.ALL_VISIBLE +import net.corda.core.node.StatesToRecord.ONLY_RELEVANT +import net.corda.core.node.StatesToRecord.NONE import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.WireTransaction import net.corda.core.utilities.NetworkHostAndPort @@ -79,14 +82,18 @@ class DBTransactionStorageLedgerRecoveryTests { @Test(timeout = 300_000) fun `query local ledger for transactions with recovery peers within time window`() { val beforeFirstTxn = now() - transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME)), true) + val txn = newTransaction() + transactionRecovery.addUnnotarisedTransaction(txn) + transactionRecovery.addTransactionRecoveryMetadata(txn.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))), ALICE_NAME) val timeWindow = RecoveryTimeWindow(fromTime = beforeFirstTxn, untilTime = beforeFirstTxn.plus(1, ChronoUnit.MINUTES)) val results = transactionRecovery.querySenderDistributionRecords(timeWindow) assertEquals(1, results.size) val afterFirstTxn = now() - transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ONLY_RELEVANT, setOf(CHARLIE_NAME)), true) + val txn2 = newTransaction() + transactionRecovery.addUnnotarisedTransaction(txn2) + transactionRecovery.addTransactionRecoveryMetadata(txn2.id, TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to ONLY_RELEVANT))), ALICE_NAME) assertEquals(2, transactionRecovery.querySenderDistributionRecords(timeWindow).size) assertEquals(1, transactionRecovery.querySenderDistributionRecords(RecoveryTimeWindow(fromTime = afterFirstTxn)).size) } @@ -94,9 +101,11 @@ class DBTransactionStorageLedgerRecoveryTests { @Test(timeout = 300_000) fun `query local ledger for transactions within timeWindow and excluding remoteTransactionIds`() { val transaction1 = newTransaction() - transactionRecovery.addUnnotarisedTransaction(transaction1, TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME)), true) + transactionRecovery.addUnnotarisedTransaction(transaction1) + transactionRecovery.addTransactionRecoveryMetadata(transaction1.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))), ALICE_NAME) val transaction2 = newTransaction() - transactionRecovery.addUnnotarisedTransaction(transaction2, TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME)), true) + transactionRecovery.addUnnotarisedTransaction(transaction2) + transactionRecovery.addTransactionRecoveryMetadata(transaction2.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ONLY_RELEVANT))), ALICE_NAME) val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS)) val results = transactionRecovery.querySenderDistributionRecords(timeWindow, excludingTxnIds = setOf(transaction1.id)) assertEquals(1, results.size) @@ -106,18 +115,22 @@ class DBTransactionStorageLedgerRecoveryTests { fun `query local ledger by distribution record type`() { val transaction1 = newTransaction() // sender txn - transactionRecovery.addUnnotarisedTransaction(transaction1, TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME)), true) + transactionRecovery.addUnnotarisedTransaction(transaction1) + transactionRecovery.addTransactionRecoveryMetadata(transaction1.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ALL_VISIBLE))), ALICE_NAME) val transaction2 = newTransaction() // receiver txn - transactionRecovery.addUnnotarisedTransaction(transaction2, TransactionMetadata(BOB_NAME, StatesToRecord.ALL_VISIBLE, setOf(ALICE_NAME)), false) + transactionRecovery.addUnnotarisedTransaction(transaction2) + transactionRecovery.addTransactionRecoveryMetadata(transaction2.id, TransactionMetadata(BOB_NAME, DistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ALL_VISIBLE))), ALICE_NAME) val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS)) transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.SENDER).let { assertEquals(1, it.size) - assertEquals((it[0] as SenderDistributionRecord).peerPartyId, BOB_NAME.hashCode().toLong()) + assertEquals(BOB_NAME.hashCode().toLong(), (it[0] as SenderDistributionRecord).peerPartyId) + assertEquals(ALL_VISIBLE, (it[0] as SenderDistributionRecord).statesToRecord) } transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.RECEIVER).let { assertEquals(1, it.size) - assertEquals((it[0] as ReceiverDistributionRecord).initiatorPartyId, BOB_NAME.hashCode().toLong()) + assertEquals(BOB_NAME.hashCode().toLong(), (it[0] as ReceiverDistributionRecord).initiatorPartyId) + assertEquals(ALL_VISIBLE, (it[0] as ReceiverDistributionRecord).statesToRecord) } val resultsAll = transactionRecovery.queryDistributionRecords(timeWindow, recordType = DistributionRecordType.ALL) assertEquals(2, resultsAll.size) @@ -125,18 +138,28 @@ class DBTransactionStorageLedgerRecoveryTests { @Test(timeout = 300_000) fun `query for sender distribution records by peers`() { - transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME)), true) - transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ONLY_RELEVANT, setOf(CHARLIE_NAME)), true) - transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ONLY_RELEVANT, setOf(BOB_NAME, CHARLIE_NAME)), true) - transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(BOB_NAME, StatesToRecord.ONLY_RELEVANT, setOf(ALICE_NAME)), true) - transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(CHARLIE_NAME, StatesToRecord.ONLY_RELEVANT), true) + val txn1 = newTransaction() + transactionRecovery.addUnnotarisedTransaction(txn1) + transactionRecovery.addTransactionRecoveryMetadata(txn1.id, TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ALL_VISIBLE))), ALICE_NAME) + val txn2 = newTransaction() + transactionRecovery.addUnnotarisedTransaction(txn2) + transactionRecovery.addTransactionRecoveryMetadata(txn2.id, TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to ONLY_RELEVANT))), ALICE_NAME) + val txn3 = newTransaction() + transactionRecovery.addUnnotarisedTransaction(txn3) + transactionRecovery.addTransactionRecoveryMetadata(txn3.id, TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT, CHARLIE_NAME to ALL_VISIBLE))), ALICE_NAME) + val txn4 = newTransaction() + transactionRecovery.addUnnotarisedTransaction(txn4) + transactionRecovery.addTransactionRecoveryMetadata(txn4.id, TransactionMetadata(BOB_NAME, DistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ONLY_RELEVANT))), BOB_NAME) + val txn5 = newTransaction() + transactionRecovery.addUnnotarisedTransaction(txn5) + transactionRecovery.addTransactionRecoveryMetadata(txn5.id, TransactionMetadata(CHARLIE_NAME, DistributionList(ONLY_RELEVANT, emptyMap())), CHARLIE_NAME) assertEquals(5, readSenderDistributionRecordFromDB().size) val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS)) transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(BOB_NAME)).let { assertEquals(2, it.size) - assertEquals(it[0].statesToRecord, StatesToRecord.ALL_VISIBLE) - assertEquals(it[1].statesToRecord, StatesToRecord.ONLY_RELEVANT) + assertEquals(it[0].statesToRecord, ALL_VISIBLE) + assertEquals(it[1].statesToRecord, ONLY_RELEVANT) } assertEquals(1, transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(ALICE_NAME)).size) assertEquals(2, transactionRecovery.querySenderDistributionRecords(timeWindow, peers = setOf(CHARLIE_NAME)).size) @@ -144,59 +167,93 @@ class DBTransactionStorageLedgerRecoveryTests { @Test(timeout = 300_000) fun `query for receiver distribution records by initiator`() { - transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME, CHARLIE_NAME)), false) - transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.ONLY_RELEVANT, setOf(BOB_NAME)), false) - transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(ALICE_NAME, StatesToRecord.NONE, setOf(CHARLIE_NAME)), false) - transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(BOB_NAME, StatesToRecord.ALL_VISIBLE, setOf(ALICE_NAME)), false) - transactionRecovery.addUnnotarisedTransaction(newTransaction(), TransactionMetadata(CHARLIE_NAME, StatesToRecord.ONLY_RELEVANT), false) + val txn1 = newTransaction() + transactionRecovery.addUnnotarisedTransaction(txn1) + transactionRecovery.addTransactionRecoveryMetadata(txn1.id, TransactionMetadata(ALICE_NAME, + DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ALL_VISIBLE, CHARLIE_NAME to ALL_VISIBLE))), BOB_NAME) + val txn2 = newTransaction() + transactionRecovery.addUnnotarisedTransaction(txn2) + transactionRecovery.addTransactionRecoveryMetadata(txn2.id, TransactionMetadata(ALICE_NAME, + DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT))), BOB_NAME) + val txn3 = newTransaction() + transactionRecovery.addUnnotarisedTransaction(txn3) + transactionRecovery.addTransactionRecoveryMetadata(txn3.id, TransactionMetadata(ALICE_NAME, + DistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to NONE))), CHARLIE_NAME) + val txn4 = newTransaction() + transactionRecovery.addUnnotarisedTransaction(txn4) + transactionRecovery.addTransactionRecoveryMetadata(txn4.id, TransactionMetadata(BOB_NAME, + DistributionList(ONLY_RELEVANT, mapOf(ALICE_NAME to ALL_VISIBLE))), ALICE_NAME) + val txn5 = newTransaction() + transactionRecovery.addUnnotarisedTransaction(txn5) + transactionRecovery.addTransactionRecoveryMetadata(txn5.id, TransactionMetadata(CHARLIE_NAME, + DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ONLY_RELEVANT))), BOB_NAME) val timeWindow = RecoveryTimeWindow(fromTime = now().minus(1, ChronoUnit.DAYS)) transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(ALICE_NAME)).let { assertEquals(3, it.size) - assertEquals(it[0].statesToRecord, StatesToRecord.ALL_VISIBLE) - assertEquals(it[1].statesToRecord, StatesToRecord.ONLY_RELEVANT) - assertEquals(it[2].statesToRecord, StatesToRecord.NONE) + assertEquals(it[0].statesToRecord, ALL_VISIBLE) + assertEquals(it[1].statesToRecord, ONLY_RELEVANT) + assertEquals(it[2].statesToRecord, NONE) } assertEquals(1, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(BOB_NAME)).size) assertEquals(1, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(CHARLIE_NAME)).size) assertEquals(2, transactionRecovery.queryReceiverDistributionRecords(timeWindow, initiators = setOf(BOB_NAME, CHARLIE_NAME)).size) } + @Test(timeout = 300_000) + fun `transaction without peers does not store recovery metadata in database`() { + val senderTransaction = newTransaction() + transactionRecovery.addUnnotarisedTransaction(senderTransaction) + transactionRecovery.addTransactionRecoveryMetadata(senderTransaction.id, TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, emptyMap())), ALICE_NAME) + assertEquals(IN_FLIGHT, readTransactionFromDB(senderTransaction.id).status) + assertEquals(0, readSenderDistributionRecordFromDB(senderTransaction.id).size) + } + @Test(timeout = 300_000) fun `create un-notarised transaction with flow metadata and validate status in db`() { val senderTransaction = newTransaction() - transactionRecovery.addUnnotarisedTransaction(senderTransaction, TransactionMetadata(ALICE_NAME, StatesToRecord.ALL_VISIBLE, setOf(BOB_NAME)), true) + transactionRecovery.addUnnotarisedTransaction(senderTransaction) + transactionRecovery.addTransactionRecoveryMetadata(senderTransaction.id, + TransactionMetadata(ALICE_NAME, DistributionList(ALL_VISIBLE, mapOf(BOB_NAME to ALL_VISIBLE))), ALICE_NAME) assertEquals(IN_FLIGHT, readTransactionFromDB(senderTransaction.id).status) readSenderDistributionRecordFromDB(senderTransaction.id).let { assertEquals(1, it.size) - assertEquals(StatesToRecord.ALL_VISIBLE, it[0].statesToRecord) + assertEquals(ALL_VISIBLE, it[0].statesToRecord) assertEquals(BOB_NAME, partyInfoCache.getCordaX500NameByPartyId(it[0].peerPartyId)) } val receiverTransaction = newTransaction() - transactionRecovery.addUnnotarisedTransaction(receiverTransaction, TransactionMetadata(ALICE_NAME, StatesToRecord.ONLY_RELEVANT, setOf(BOB_NAME)), false) + transactionRecovery.addUnnotarisedTransaction(receiverTransaction) + transactionRecovery.addTransactionRecoveryMetadata(receiverTransaction.id, + TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, mapOf(BOB_NAME to ALL_VISIBLE))), BOB_NAME) assertEquals(IN_FLIGHT, readTransactionFromDB(receiverTransaction.id).status) readReceiverDistributionRecordFromDB(receiverTransaction.id).let { - assertEquals(StatesToRecord.ONLY_RELEVANT, it.statesToRecord) + assertEquals(ALL_VISIBLE, it.statesToRecord) + assertEquals(ONLY_RELEVANT, it.senderStatesToRecord) assertEquals(ALICE_NAME, partyInfoCache.getCordaX500NameByPartyId(it.initiatorPartyId)) - assertEquals(setOf(BOB_NAME), it.peerPartyIds.map { partyInfoCache.getCordaX500NameByPartyId(it) }.toSet() ) + assertEquals(setOf(BOB_NAME), it.peersToStatesToRecord.map { (peer, _) -> partyInfoCache.getCordaX500NameByPartyId(peer) }.toSet() ) } } @Test(timeout = 300_000) fun `finalize transaction with recovery metadata`() { val transaction = newTransaction(notarySig = false) - transactionRecovery.finalizeTransaction(transaction, - TransactionMetadata(ALICE_NAME), false) - + transactionRecovery.finalizeTransaction(transaction) + transactionRecovery.addTransactionRecoveryMetadata(transaction.id, + TransactionMetadata(ALICE_NAME, DistributionList(ONLY_RELEVANT, mapOf(CHARLIE_NAME to ALL_VISIBLE))), ALICE_NAME) assertEquals(VERIFIED, readTransactionFromDB(transaction.id).status) - assertEquals(StatesToRecord.ONLY_RELEVANT, readReceiverDistributionRecordFromDB(transaction.id).statesToRecord) + readSenderDistributionRecordFromDB(transaction.id).apply { + assertEquals(1, this.size) + assertEquals(ONLY_RELEVANT, this[0].statesToRecord) + } } @Test(timeout = 300_000) fun `remove un-notarised transaction and associated recovery metadata`() { val senderTransaction = newTransaction(notarySig = false) - transactionRecovery.addUnnotarisedTransaction(senderTransaction, TransactionMetadata(ALICE.name, peers = setOf(BOB.name, CHARLIE_NAME)), true) + transactionRecovery.addUnnotarisedTransaction(senderTransaction) + transactionRecovery.addTransactionRecoveryMetadata(senderTransaction.id, TransactionMetadata(ALICE.name, + DistributionList(ONLY_RELEVANT, mapOf(BOB.name to ONLY_RELEVANT, CHARLIE_NAME to ONLY_RELEVANT))), BOB.name) assertNull(transactionRecovery.getTransaction(senderTransaction.id)) assertEquals(IN_FLIGHT, readTransactionFromDB(senderTransaction.id).status) @@ -206,7 +263,9 @@ class DBTransactionStorageLedgerRecoveryTests { assertNull(transactionRecovery.getTransactionInternal(senderTransaction.id)) val receiverTransaction = newTransaction(notarySig = false) - transactionRecovery.addUnnotarisedTransaction(receiverTransaction, TransactionMetadata(ALICE.name), false) + transactionRecovery.addUnnotarisedTransaction(receiverTransaction) + transactionRecovery.addTransactionRecoveryMetadata(receiverTransaction.id, TransactionMetadata(ALICE.name, + DistributionList(ONLY_RELEVANT, mapOf(BOB.name to ONLY_RELEVANT))), ALICE.name) assertNull(transactionRecovery.getTransaction(receiverTransaction.id)) assertEquals(IN_FLIGHT, readTransactionFromDB(receiverTransaction.id).status) diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt index 3a4b8615b3..0d4b380375 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt @@ -10,7 +10,6 @@ import net.corda.core.crypto.SignableData import net.corda.core.crypto.SignatureMetadata import net.corda.core.crypto.TransactionSignature import net.corda.core.crypto.sign -import net.corda.core.flows.TransactionMetadata import net.corda.core.serialization.deserialize import net.corda.core.toFuture import net.corda.core.transactions.SignedTransaction @@ -109,7 +108,7 @@ class DBTransactionStorageTests { val transactionClock = TransactionClock(now) newTransactionStorage(clock = transactionClock) val transaction = newTransaction() - transactionStorage.addUnnotarisedTransaction(transaction, TransactionMetadata(ALICE.party.name), true) + transactionStorage.addUnnotarisedTransaction(transaction) assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status) } @@ -132,7 +131,7 @@ class DBTransactionStorageTests { val transactionClock = TransactionClock(now) newTransactionStorage(clock = transactionClock) val transaction = newTransaction(notarySig = false) - transactionStorage.addUnnotarisedTransaction(transaction, TransactionMetadata(ALICE.party.name), true) + transactionStorage.addUnnotarisedTransaction(transaction) assertNull(transactionStorage.getTransaction(transaction.id)) assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status) transactionStorage.finalizeTransactionWithExtraSignatures(transaction, emptyList()) @@ -148,7 +147,7 @@ class DBTransactionStorageTests { val transactionClock = TransactionClock(now) newTransactionStorage(clock = transactionClock) val transaction = newTransaction(notarySig = false) - transactionStorage.addUnnotarisedTransaction(transaction, TransactionMetadata(ALICE.party.name), true) + transactionStorage.addUnnotarisedTransaction(transaction) assertNull(transactionStorage.getTransaction(transaction.id)) assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status) val notarySig = notarySig(transaction.id) @@ -165,7 +164,7 @@ class DBTransactionStorageTests { val transactionClock = TransactionClock(now) newTransactionStorage(clock = transactionClock) val transaction = newTransaction(notarySig = false) - transactionStorage.addUnnotarisedTransaction(transaction, TransactionMetadata(ALICE.party.name), true) + transactionStorage.addUnnotarisedTransaction(transaction) assertNull(transactionStorage.getTransaction(transaction.id)) assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status) @@ -199,7 +198,7 @@ class DBTransactionStorageTests { val transactionWithoutNotarySig = newTransaction(notarySig = false) // txn recorded as un-notarised (simulate ReceiverFinalityFlow in initial flow) - transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySig, TransactionMetadata(ALICE.party.name), false) + transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySig) assertEquals(IN_FLIGHT, readTransactionFromDB(transactionWithoutNotarySig.id).status) // txn then recorded as unverified (simulate ResolveTransactionFlow in follow-up flow) @@ -230,7 +229,7 @@ class DBTransactionStorageTests { val transactionWithoutNotarySigs = newTransaction(notarySig = false) // txn recorded as un-notarised (simulate ReceiverFinalityFlow in initial flow) - transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySigs, TransactionMetadata(ALICE.party.name), false) + transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySigs) assertEquals(IN_FLIGHT, readTransactionFromDB(transactionWithoutNotarySigs.id).status) // txn then recorded as unverified (simulate ResolveTransactionFlow in follow-up flow) diff --git a/testing/cordapps/cashobservers/src/main/kotlin/net/corda/finance/test/flows/CashIssueWithObserversFlow.kt b/testing/cordapps/cashobservers/src/main/kotlin/net/corda/finance/test/flows/CashIssueWithObserversFlow.kt index 9829f0e4c5..c860617078 100644 --- a/testing/cordapps/cashobservers/src/main/kotlin/net/corda/finance/test/flows/CashIssueWithObserversFlow.kt +++ b/testing/cordapps/cashobservers/src/main/kotlin/net/corda/finance/test/flows/CashIssueWithObserversFlow.kt @@ -2,17 +2,22 @@ package net.corda.finance.test.flows import co.paralleluniverse.fibers.Suspendable import net.corda.core.contracts.Amount +import net.corda.core.flows.FinalityFlow import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowSession import net.corda.core.flows.InitiatedBy import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.NotaryException import net.corda.core.flows.ReceiveFinalityFlow import net.corda.core.flows.StartableByRPC import net.corda.core.identity.Party +import net.corda.core.node.StatesToRecord +import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.OpaqueBytes import net.corda.finance.contracts.asset.Cash import net.corda.finance.flows.AbstractCashFlow +import net.corda.finance.flows.CashException import net.corda.finance.issuedBy import java.util.Currency @@ -32,9 +37,18 @@ class CashIssueWithObserversFlow(private val amount: Amount, val tx = serviceHub.signInitialTransaction(builder, signers) progressTracker.currentStep = Companion.FINALISING_TX val observerSessions = observers.map { initiateFlow(it) } - val notarised = finaliseTx(tx, observerSessions, "Unable to notarise issue") + val notarised = finalise(tx, observerSessions, "Unable to notarise issue") return Result(notarised, ourIdentity) } + + @Suspendable + private fun finalise(tx: SignedTransaction, sessions: Collection, message: String): SignedTransaction { + try { + return subFlow(FinalityFlow(tx, sessions)) + } catch (e: NotaryException) { + throw CashException(message, e) + } + } } @InitiatedBy(CashIssueWithObserversFlow::class) @@ -42,7 +56,7 @@ class CashIssueReceiverFlowWithObservers(private val otherSide: FlowSession) : F @Suspendable override fun call() { if (!serviceHub.myInfo.isLegalIdentity(otherSide.counterparty)) { - subFlow(ReceiveFinalityFlow(otherSide)) + subFlow(ReceiveFinalityFlow(otherSide, statesToRecord = StatesToRecord.ALL_VISIBLE)) } } } \ No newline at end of file diff --git a/testing/cordapps/cashobservers/src/main/kotlin/net/corda/finance/test/flows/CashPaymentWithObserversFlow.kt b/testing/cordapps/cashobservers/src/main/kotlin/net/corda/finance/test/flows/CashPaymentWithObserversFlow.kt new file mode 100644 index 0000000000..5192ac3ad4 --- /dev/null +++ b/testing/cordapps/cashobservers/src/main/kotlin/net/corda/finance/test/flows/CashPaymentWithObserversFlow.kt @@ -0,0 +1,82 @@ +package net.corda.finance.test.flows + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.contracts.Amount +import net.corda.core.contracts.InsufficientBalanceException +import net.corda.core.flows.FinalityFlow +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.NotaryException +import net.corda.core.flows.ReceiveFinalityFlow +import net.corda.core.flows.StartableByRPC +import net.corda.core.identity.Party +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.finance.flows.AbstractCashFlow +import net.corda.finance.flows.CashException +import net.corda.finance.workflows.asset.CashUtils +import java.util.Currency + +@StartableByRPC +@InitiatingFlow +open class CashPaymentWithObserversFlow( + val amount: Amount, + val recipient: Party, + val observers: Set, + private val useObserverSessions: Boolean = false +) : AbstractCashFlow(tracker()) { + + @Suspendable + override fun call(): SignedTransaction { + val recipientSession = initiateFlow(recipient) + val observerSessions = observers.map { initiateFlow(it) } + val builder = TransactionBuilder(notary = serviceHub.networkMapCache.notaryIdentities.first()) + logger.info("Generating spend for: ${builder.lockId}") + val (spendTX, keysForSigning) = try { + CashUtils.generateSpend( + serviceHub, + builder, + amount, + ourIdentityAndCert, + recipient + ) + } catch (e: InsufficientBalanceException) { + throw CashException("Insufficient cash for spend: ${e.message}", e) + } + + logger.info("Signing transaction for: ${spendTX.lockId}") + val tx = serviceHub.signInitialTransaction(spendTX, keysForSigning) + + logger.info("Finalising transaction for: ${tx.id}") + val sessionsForFinality = if (serviceHub.myInfo.isLegalIdentity(recipient)) emptyList() else listOf(recipientSession) + val notarised = finalise(tx, sessionsForFinality, observerSessions) + logger.info("Finalised transaction for: ${notarised.id}") + return notarised + } + + @Suspendable + private fun finalise(tx: SignedTransaction, + sessions: Collection, + observerSessions: Collection): SignedTransaction { + try { + return if (useObserverSessions) + subFlow(FinalityFlow(tx, sessions, observerSessions = observerSessions)) + else + subFlow(FinalityFlow(tx, sessions + observerSessions)) + } catch (e: NotaryException) { + throw CashException("Unable to notarise spend", e) + } + } +} + +@InitiatedBy(CashPaymentWithObserversFlow::class) +class CashPaymentReceiverWithObserversFlow(private val otherSide: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + if (!serviceHub.myInfo.isLegalIdentity(otherSide.counterparty)) { + subFlow(ReceiveFinalityFlow(otherSide)) + } + } +} diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt index 156651efd7..d9f26e9469 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt @@ -11,6 +11,7 @@ import net.corda.core.transactions.SignedTransaction import net.corda.node.services.api.WritableTransactionStorage import net.corda.core.flows.TransactionMetadata import net.corda.core.flows.TransactionStatus +import net.corda.core.identity.CordaX500Name import net.corda.testing.node.MockServices import rx.Observable import rx.subjects.PublishSubject @@ -55,15 +56,17 @@ open class MockTransactionStorage : WritableTransactionStorage, SingletonSeriali } } - override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean): Boolean { + override fun addUnnotarisedTransaction(transaction: SignedTransaction): Boolean { return txns.putIfAbsent(transaction.id, TxHolder(transaction, status = TransactionStatus.IN_FLIGHT)) == null } + override fun addTransactionRecoveryMetadata(id: SecureHash, metadata: TransactionMetadata, caller: CordaX500Name) { } + override fun removeUnnotarisedTransaction(id: SecureHash): Boolean { return txns.remove(id) != null } - override fun finalizeTransaction(transaction: SignedTransaction, metadata: TransactionMetadata, isInitiator: Boolean) = + override fun finalizeTransaction(transaction: SignedTransaction) = addTransaction(transaction) override fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection): Boolean { diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/dsl/TestDSL.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/dsl/TestDSL.kt index 5d0f74d25e..f7858056e5 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/dsl/TestDSL.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/dsl/TestDSL.kt @@ -9,6 +9,7 @@ import net.corda.core.crypto.SecureHash import net.corda.core.crypto.TransactionSignature import net.corda.core.flows.FlowException import net.corda.core.flows.TransactionMetadata +import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.* import net.corda.core.internal.notary.NotaryService @@ -139,13 +140,15 @@ data class TestTransactionDSLInterpreter private constructor( override val attachmentsClassLoaderCache: AttachmentsClassLoaderCache = AttachmentsClassLoaderCacheImpl(TestingNamedCacheFactory()) - override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: TransactionMetadata) {} + override fun recordUnnotarisedTransaction(txn: SignedTransaction) {} override fun removeUnnotarisedTransaction(id: SecureHash) {} override fun finalizeTransactionWithExtraSignatures(txn: SignedTransaction, sigs: Collection, statesToRecord: StatesToRecord) {} - override fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord, metadata: TransactionMetadata) {} + override fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord) {} + + override fun recordTransactionRecoveryMetadata(txnId: SecureHash, txnMetadata: TransactionMetadata, caller: CordaX500Name) {} } private fun copy(): TestTransactionDSLInterpreter =