From c3e39a7052d405e92ef1a846dc99809157d75dce Mon Sep 17 00:00:00 2001 From: Jose Coll Date: Thu, 27 Apr 2023 16:58:17 +0100 Subject: [PATCH] ENT-9842 Re-factor 2PF to support issuance transactions (no notarisation) with observers. (#7349) Re-factor 2PF to support issuance transactions (no notarisation) with observers. --- core-tests/build.gradle | 2 + .../coretests/flows/FinalityFlowTests.kt | 23 ++- .../net/corda/core/flows/FinalityFlow.kt | 187 +++++++++++------- .../net/corda/core/flows/FlowTransaction.kt | 2 +- .../core/internal/ServiceHubCoreInternal.kt | 16 +- node/build.gradle | 3 + .../flows/FinalityFlowErrorHandlingTest.kt | 101 ++++++++++ .../StateMachineErrorHandlingTest.kt | 7 +- .../services/statemachine/FlowHospitalTest.kt | 1 + .../node/services/api/ServiceHubInternal.kt | 30 ++- .../persistence/DBTransactionStorage.kt | 50 ++--- .../node/messaging/TwoPartyTradeFlowTests.kt | 11 +- .../persistence/DBTransactionStorageTests.kt | 55 ++++-- settings.gradle | 1 + testing/cordapps/cashobservers/build.gradle | 17 ++ .../test/flows/CashIssueWithObserversFlow.kt | 48 +++++ .../node/internal/MockTransactionStorage.kt | 7 +- .../kotlin/net/corda/testing/dsl/TestDSL.kt | 4 +- 18 files changed, 433 insertions(+), 132 deletions(-) create mode 100644 node/src/integration-test-slow/kotlin/net/corda/node/flows/FinalityFlowErrorHandlingTest.kt create mode 100644 testing/cordapps/cashobservers/build.gradle create mode 100644 testing/cordapps/cashobservers/src/main/kotlin/net/corda/finance/test/flows/CashIssueWithObserversFlow.kt diff --git a/core-tests/build.gradle b/core-tests/build.gradle index c5e184e448..ea5f7d4ecf 100644 --- a/core-tests/build.gradle +++ b/core-tests/build.gradle @@ -92,6 +92,8 @@ dependencies { smokeTestCompile project(':smoke-test-utils') smokeTestCompile "org.assertj:assertj-core:${assertj_version}" + // used by FinalityFlowTests + testCompile project(':testing:cordapps:cashobservers') } configurations { diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt index d9857a3350..91c5cd3f15 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt @@ -46,6 +46,7 @@ import net.corda.finance.contracts.asset.Cash import net.corda.finance.flows.CashIssueFlow import net.corda.finance.flows.CashPaymentFlow import net.corda.finance.issuedBy +import net.corda.finance.test.flows.CashIssueWithObserversFlow import net.corda.node.services.persistence.DBTransactionStorage import net.corda.testing.contracts.DummyContract import net.corda.testing.core.ALICE_NAME @@ -64,6 +65,7 @@ import net.corda.testing.node.internal.TestCordappInternal import net.corda.testing.node.internal.TestStartedNode import net.corda.testing.node.internal.cordappWithPackages import net.corda.testing.node.internal.enclosedCordapp +import net.corda.testing.node.internal.findCordapp import org.assertj.core.api.Assertions.assertThat import org.junit.After import org.junit.Test @@ -79,6 +81,7 @@ class FinalityFlowTests : WithFinality { } override val mockNet = InternalMockNetwork(cordappsForAllNodes = setOf(FINANCE_CONTRACTS_CORDAPP, FINANCE_WORKFLOWS_CORDAPP, DUMMY_CONTRACTS_CORDAPP, enclosedCordapp(), + findCordapp("net.corda.finance.test.flows"), CustomCordapp(targetPlatformVersion = 3, classes = setOf(FinalityFlow::class.java)))) private val aliceNode = makeNode(ALICE_NAME) @@ -223,7 +226,7 @@ class FinalityFlowTests : WithFinality { assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) assertTxnRemovedFromDatabase(aliceNode, stxId) val (_, txnStatus) = bobNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail() - assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnStatus) + assertEquals(TransactionStatus.IN_FLIGHT, txnStatus) } } @@ -295,7 +298,7 @@ class FinalityFlowTests : WithFinality { val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail() assertEquals(TransactionStatus.VERIFIED, txnStatusAlice) val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail() - assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnStatusBob) + assertEquals(TransactionStatus.IN_FLIGHT, txnStatusBob) // now lets attempt a new spend with the new output of the previous transaction val newStateRef = notarisedStxn1.coreTransaction.outRef(1) @@ -309,7 +312,7 @@ class FinalityFlowTests : WithFinality { val (_, txnStatusAlice2) = aliceNode.services.validatedTransactions.getTransactionInternal(notarisedStxn2.id) ?: fail() assertEquals(TransactionStatus.VERIFIED, txnStatusAlice2) val (_, txnStatusBob2) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn2.id) ?: fail() - assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnStatusBob2) + assertEquals(TransactionStatus.IN_FLIGHT, txnStatusBob2) // Validate attempt at flow finalisation by Bob has no effect on outcome. val finaliseStxn1 = bobNode.startFlowAndRunNetwork(FinaliseSpeedySpendFlow(notarisedStxn1.id, notarisedStxn1.sigs)).resultFuture.getOrThrow() @@ -328,10 +331,22 @@ class FinalityFlowTests : WithFinality { catch (e: UnexpectedFlowEndException) { val stxId = SecureHash.parse(e.message) val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail() - assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnStatusBob) + assertEquals(TransactionStatus.IN_FLIGHT, txnStatusBob) } } + @Test(timeout=300_000) + fun `two phase finality flow issuance transaction with observers`() { + val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY) + + val stx = aliceNode.startFlowAndRunNetwork(CashIssueWithObserversFlow( + Amount(1000L, GBP), OpaqueBytes.of(1), notary, + observers = setOf(bobNode.info.singleIdentity()))).resultFuture.getOrThrow().stx + + assertThat(aliceNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull + assertThat(bobNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull + } + @StartableByRPC class IssueFlow(val notary: Party) : FlowLogic>() { diff --git a/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt b/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt index 6cdbc2b832..57639dcd22 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt @@ -27,7 +27,7 @@ import java.time.Duration /** * Verifies the given transaction, then sends it to the named notary. If the notary agrees that the transaction * is acceptable then it is from that point onwards committed to the ledger, and will be written through to the - * vault. Additionally it will be distributed to the parties reflected in the participants list of the states. + * vault. Additionally, it will be distributed to the parties reflected in the participants list of the states. * * By default, the initiating flow will commit states that are relevant to the initiating party as indicated by * [StatesToRecord.ONLY_RELEVANT]. Relevance is determined by the union of all participants to states which have been @@ -159,6 +159,7 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, } @Suspendable + @Suppress("ComplexMethod", "NestedBlockDepth") @Throws(NotaryException::class) override fun call(): SignedTransaction { if (!newApi) { @@ -181,12 +182,12 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, val externalTxParticipants = extractExternalParticipants(ledgerTransaction) if (newApi) { - val sessionParties = sessions.map { it.counterparty } - val missingRecipients = externalTxParticipants - sessionParties - oldParticipants + val sessionParties = sessions.map { it.counterparty }.toSet() + val missingRecipients = externalTxParticipants - sessionParties - oldParticipants.toSet() require(missingRecipients.isEmpty()) { "Flow sessions were not provided for the following transaction participants: $missingRecipients" } - sessionParties.intersect(oldParticipants).let { + sessionParties.intersect(oldParticipants.toSet()).let { require(it.isEmpty()) { "The following parties are specified both in flow sessions and in the oldParticipants list: $it" } } } @@ -202,41 +203,48 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, serviceHub.networkMapCache.getNodeByLegalIdentity(it.counterparty)?.platformVersion!! < PlatformVersionSwitches.TWO_PHASE_FINALITY } + val requiresNotarisation = needsNotarySignature(transaction) val useTwoPhaseFinality = serviceHub.myInfo.platformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY - if (useTwoPhaseFinality && needsNotarySignature(transaction)) { - recordLocallyAndBroadcast(newPlatformSessions, transaction) - } - - try { - val stxn = notariseOrRecord() - val notarySignatures = stxn.sigs - transaction.sigs.toSet() - if (notarySignatures.isNotEmpty()) { - if (useTwoPhaseFinality && newPlatformSessions.isNotEmpty()) { - broadcastSignaturesAndFinalise(newPlatformSessions, notarySignatures) - } else { - progressTracker.currentStep = FINALISING_TRANSACTION - serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) { - (serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(transaction, notarySignatures, statesToRecord) - logger.info("Finalised transaction locally.") + if (useTwoPhaseFinality) { + val stxn = if (requiresNotarisation) { + recordLocallyAndBroadcast(newPlatformSessions, transaction) + try { + val (notarisedTxn, notarySignatures) = notarise() + if (newPlatformSessions.isNotEmpty()) { + broadcastSignaturesAndFinalise(newPlatformSessions, notarySignatures) + } else { + finaliseLocally(notarisedTxn, notarySignatures) } + notarisedTxn + } catch (e: NotaryException) { + (serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(transaction.id) + if (newPlatformSessions.isNotEmpty()) { + broadcastNotaryError(newPlatformSessions, e) + } else sleep(Duration.ZERO) // force checkpoint to persist db update. + throw e } } - - if (!useTwoPhaseFinality || !needsNotarySignature(transaction)) { - broadcastToOtherParticipants(externalTxParticipants, newPlatformSessions + oldPlatformSessions, stxn) - } else if (useTwoPhaseFinality && oldPlatformSessions.isNotEmpty()) { - broadcastToOtherParticipants(externalTxParticipants, oldPlatformSessions, stxn) + else { + if (newPlatformSessions.isNotEmpty()) + finaliseLocallyAndBroadcast(newPlatformSessions, transaction, + FlowTransactionMetadata( + serviceHub.myInfo.legalIdentities.first().name, + statesToRecord, + sessions.map { it.counterparty.name }.toSet())) + else + recordTransactionLocally(transaction) + transaction } + broadcastToOtherParticipants(externalTxParticipants, oldPlatformSessions, stxn) return stxn } - catch (e: NotaryException) { - if (useTwoPhaseFinality) { - (serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(transaction.id) - if (newPlatformSessions.isNotEmpty()) { - broadcastNotaryError(newPlatformSessions, e) - } else sleep(Duration.ZERO) // force checkpoint to persist db update. - } - throw e + else { + val stxn = if (requiresNotarisation) { + notarise().first + } else transaction + recordTransactionLocally(stxn) + broadcastToOtherParticipants(externalTxParticipants, newPlatformSessions + oldPlatformSessions, stxn) + return stxn } } @@ -244,16 +252,30 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, private fun recordLocallyAndBroadcast(sessions: Collection, tx: SignedTransaction) { serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordLocallyAndBroadcast", flowLogic = this) { recordUnnotarisedTransaction(tx) - logger.info("Recorded transaction without notary signature locally.") - if (sessions.isEmpty()) return progressTracker.currentStep = BROADCASTING_PRE_NOTARISATION + broadcast(sessions, tx) + } + } + + @Suspendable + private fun finaliseLocallyAndBroadcast(sessions: Collection, tx: SignedTransaction, metadata: FlowTransactionMetadata) { + serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finaliseLocallyAndBroadcast", flowLogic = this) { + finaliseLocally(tx, metadata = metadata) + progressTracker.currentStep = BROADCASTING + broadcast(sessions, tx) + } + } + + @Suspendable + private fun broadcast(sessions: Collection, tx: SignedTransaction) { + serviceHub.telemetryServiceInternal.span("${this::class.java.name}#broadcast", flowLogic = this) { sessions.forEach { session -> try { logger.debug { "Sending transaction to party $session." } subFlow(SendTransactionFlow(session, tx)) } catch (e: UnexpectedFlowEndException) { throw UnexpectedFlowEndException( - "${session.counterparty} has finished prematurely and we're trying to send them a transaction without notary signature. " + + "${session.counterparty} has finished prematurely and we're trying to send them a transaction." + "Did they forget to call ReceiveFinalityFlow? (${e.message})", e.cause, e.originalErrorId @@ -282,9 +304,20 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, ) } } - progressTracker.currentStep = FINALISING_TRANSACTION - serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) { - (serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(transaction, notarySignatures, statesToRecord) + finaliseLocally(transaction, notarySignatures) + } + } + + @Suspendable + private fun finaliseLocally(stx: SignedTransaction, notarySignatures: List = emptyList(), + metadata: FlowTransactionMetadata? = null) { + progressTracker.currentStep = FINALISING_TRANSACTION + serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finaliseLocally", flowLogic = this) { + if (notarySignatures.isEmpty()) { + (serviceHub as ServiceHubCoreInternal).finalizeTransaction(stx, statesToRecord, metadata!!) + logger.info("Finalised transaction locally.") + } else { + (serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx, notarySignatures, statesToRecord) logger.info("Finalised transaction locally with notary signature.") } } @@ -376,22 +409,17 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, serviceHub.myInfo.legalIdentities.first().name, statesToRecord, sessions.map { it.counterparty.name }.toSet())) + logger.info("Recorded un-notarised transaction locally.") return tx } } @Suspendable - private fun notariseOrRecord(): SignedTransaction { - serviceHub.telemetryServiceInternal.span("${this::class.java.name}#notariseOrRecord", flowLogic = this) { - return if (needsNotarySignature(transaction)) { - progressTracker.currentStep = NOTARISING - val notarySignatures = subFlow(NotaryFlow.Client(transaction, skipVerification = true)) - transaction + notarySignatures - } else { - logger.info("No need to notarise this transaction. Recording locally.") - recordTransactionLocally(transaction) - transaction - } + private fun notarise(): Pair> { + return serviceHub.telemetryServiceInternal.span("${this::class.java.name}#notariseOrRecord", flowLogic = this) { + progressTracker.currentStep = NOTARISING + val notarySignatures = subFlow(NotaryFlow.Client(transaction, skipVerification = true)) + Pair(transaction + notarySignatures, notarySignatures) } } @@ -409,7 +437,7 @@ class FinalityFlow private constructor(val transaction: SignedTransaction, private fun extractExternalParticipants(ltx: LedgerTransaction): Set { val participants = ltx.outputStates.flatMap { it.participants } + ltx.inputStates.flatMap { it.participants } - return groupAbstractPartyByWellKnownParty(serviceHub, participants).keys - serviceHub.myInfo.legalIdentities + return groupAbstractPartyByWellKnownParty(serviceHub, participants).keys - serviceHub.myInfo.legalIdentities.toSet() } private fun verifyTx(): LedgerTransaction { @@ -456,40 +484,49 @@ class ReceiveFinalityFlow @JvmOverloads constructor(private val otherSideSession private val expectedTxId: SecureHash? = null, private val statesToRecord: StatesToRecord = ONLY_RELEVANT, private val handlePropagatedNotaryError: Boolean? = null) : FlowLogic() { - @Suppress("ComplexMethod") + @Suppress("ComplexMethod", "NestedBlockDepth") @Suspendable override fun call(): SignedTransaction { val stx = subFlow(ReceiveTransactionFlow(otherSideSession, checkSufficientSignatures = false, statesToRecord = statesToRecord, deferredAck = true)) + val requiresNotarisation = needsNotarySignature(stx) val fromTwoPhaseFinalityNode = serviceHub.networkMapCache.getNodeByLegalIdentity(otherSideSession.counterparty)?.platformVersion!! >= PlatformVersionSwitches.TWO_PHASE_FINALITY - if (fromTwoPhaseFinalityNode && needsNotarySignature(stx)) { - serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) { - logger.debug { "Peer recording transaction without notary signature." } - (serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx, - FlowTransactionMetadata(otherSideSession.counterparty.name, statesToRecord)) - } - otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck) - logger.info("Peer recorded transaction without notary signature. Waiting to receive notary signature.") - - try { - val notarySignatures = otherSideSession.receive>>().unwrap { it.getOrThrow() } - serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) { - logger.debug { "Peer received notarised signature." } - (serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx, notarySignatures, statesToRecord) - logger.info("Peer finalised transaction with notary signature.") + if (fromTwoPhaseFinalityNode) { + if (requiresNotarisation) { + serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) { + logger.debug { "Peer recording transaction without notary signature." } + (serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx, + FlowTransactionMetadata(otherSideSession.counterparty.name, statesToRecord)) } - } catch (e: NotaryException) { - logger.info("Peer received notary error.") - val overrideHandlePropagatedNotaryError = handlePropagatedNotaryError ?: + otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck) + logger.info("Peer recorded transaction without notary signature. Waiting to receive notary signature.") + try { + val notarySignatures = otherSideSession.receive>>().unwrap { it.getOrThrow() } + serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) { + logger.debug { "Peer received notarised signature." } + (serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx, notarySignatures, statesToRecord) + logger.info("Peer finalised transaction with notary signature.") + } + } catch (e: NotaryException) { + logger.info("Peer received notary error.") + val overrideHandlePropagatedNotaryError = handlePropagatedNotaryError ?: (serviceHub.cordappProvider.getAppContext().cordapp.targetPlatformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY) - if (overrideHandlePropagatedNotaryError) { - (serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(stx.id) - sleep(Duration.ZERO) // force checkpoint to persist db update. - throw e + if (overrideHandlePropagatedNotaryError) { + (serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(stx.id) + sleep(Duration.ZERO) // force checkpoint to persist db update. + throw e + } + else { + otherSideSession.receive() // simulate unexpected flow end + } } - else { - otherSideSession.receive() // simulate unexpected flow end + } else { + serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransaction", flowLogic = this) { + (serviceHub as ServiceHubCoreInternal).finalizeTransaction(stx, statesToRecord, + FlowTransactionMetadata(otherSideSession.counterparty.name, statesToRecord)) + logger.info("Peer recorded transaction with recovery metadata.") } + otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck) } } else { serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordTransactions", flowLogic = this) { diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowTransaction.kt b/core/src/main/kotlin/net/corda/core/flows/FlowTransaction.kt index 49ecae6151..f66d6990ea 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowTransaction.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowTransaction.kt @@ -33,5 +33,5 @@ data class FlowTransactionMetadata( enum class TransactionStatus { UNVERIFIED, VERIFIED, - MISSING_NOTARY_SIG; + IN_FLIGHT; } \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt b/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt index 36b44ed0e1..5e63caebec 100644 --- a/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt +++ b/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt @@ -30,13 +30,14 @@ interface ServiceHubCoreInternal : ServiceHub { val attachmentsClassLoaderCache: AttachmentsClassLoaderCache /** - * Stores [SignedTransaction] and participant signatures without the notary signature in the local transaction storage. - * Optionally add finality flow recovery metadata. + * Stores [SignedTransaction] and participant signatures without the notary signature in the local transaction storage, + * inclusive of flow recovery metadata. * This is expected to be run within a database transaction. * * @param txn The transaction to record. + * @param metadata Finality flow recovery metadata. */ - fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata?= null) + fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata) /** * Removes transaction from data store. @@ -54,6 +55,15 @@ interface ServiceHubCoreInternal : ServiceHub { * @param statesToRecord how the vault should treat the output states of the transaction. */ fun finalizeTransactionWithExtraSignatures(txn: SignedTransaction, sigs: Collection, statesToRecord: StatesToRecord) + + /** + * Records a [SignedTransaction] as VERIFIED with flow recovery metadata. + * + * @param txn The transaction to record. + * @param statesToRecord how the vault should treat the output states of the transaction. + * @param metadata Finality flow recovery metadata. + */ + fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord, metadata: FlowTransactionMetadata) } interface TransactionsResolver { diff --git a/node/build.gradle b/node/build.gradle index 2f2ba885ad..05dc5e58a5 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -251,6 +251,9 @@ dependencies { integrationTestCompile(project(":testing:cordapps:missingmigration")) testCompile project(':testing:cordapps:dbfailure:dbfworkflows') + + // used by FinalityFlowErrorHandlingTest + slowIntegrationTestCompile project(':testing:cordapps:cashobservers') } tasks.withType(JavaCompile).configureEach { diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/flows/FinalityFlowErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/flows/FinalityFlowErrorHandlingTest.kt new file mode 100644 index 0000000000..bb73f7da04 --- /dev/null +++ b/node/src/integration-test-slow/kotlin/net/corda/node/flows/FinalityFlowErrorHandlingTest.kt @@ -0,0 +1,101 @@ +package net.corda.node.flows + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.CordaRuntimeException +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FinalityFlow +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds +import net.corda.finance.DOLLARS +import net.corda.finance.test.flows.CashIssueWithObserversFlow +import net.corda.node.services.statemachine.StateMachineErrorHandlingTest +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.CHARLIE_NAME +import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.core.singleIdentity +import net.corda.testing.flows.waitForAllFlowsToComplete +import net.corda.testing.node.NotarySpec +import net.corda.testing.node.internal.FINANCE_CORDAPPS +import net.corda.testing.node.internal.enclosedCordapp +import org.junit.Test +import kotlin.test.assertEquals +import kotlin.test.fail + +class FinalityFlowErrorHandlingTest : StateMachineErrorHandlingTest() { + + /** + * Throws an exception after recording an issuance transaction but before broadcasting the transaction to observer sessions. + * + */ + @Test(timeout = 300_000) + fun `error after recording an issuance transaction inside of FinalityFlow generates recovery metadata`() { + startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false), + extraCordappPackagesToScan = listOf("net.corda.node.flows", "net.corda.finance.test.flows")) { + val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME, FINANCE_CORDAPPS + enclosedCordapp()) + + val rules = """ + RULE Set flag when entering receive finality flow + CLASS ${FinalityFlow::class.java.name} + METHOD call + AT ENTRY + IF !flagged("finality_flag") + DO flag("finality_flag"); traceln("Setting finality flag") + ENDRULE + + RULE Throw exception when recording transaction + CLASS ${FinalityFlow::class.java.name} + METHOD finaliseLocallyAndBroadcast + AT EXIT + IF flagged("finality_flag") + DO traceln("Throwing exception"); + throw new java.lang.RuntimeException("die dammit die") + ENDRULE + """.trimIndent() + + submitBytemanRules(rules, port) + + try { + alice.rpc.startFlow( + ::CashIssueWithObserversFlow, + 500.DOLLARS, + OpaqueBytes.of(0x01), + defaultNotaryIdentity, + setOf(charlie.nodeInfo.singleIdentity()) + ).returnValue.getOrThrow(30.seconds) + fail() + } + catch (e: CordaRuntimeException) { + waitForAllFlowsToComplete(alice) + val txId = alice.rpc.stateMachineRecordedTransactionMappingSnapshot().single().transactionId + + alice.rpc.startFlow(::GetFlowTransaction, txId).returnValue.getOrThrow().apply { + assertEquals("V", this.first) // "V" -> VERIFIED + assertEquals(ALICE_NAME.toString(), this.second) // initiator + assertEquals(CHARLIE_NAME.toString(), this.third) // peers + } + } + } + } +} + +// Internal use for testing only!! +@StartableByRPC +class GetFlowTransaction(private val txId: SecureHash) : FlowLogic>() { + @Suspendable + override fun call(): Triple { + return serviceHub.jdbcSession().prepareStatement("select * from node_transactions where tx_id = ?") + .apply { setString(1, txId.toString()) } + .use { ps -> + ps.executeQuery().use { rs -> + rs.next() + Triple(rs.getString(4), // TransactionStatus + rs.getString(7), // initiator + rs.getString(8)) // participants + } + } + } +} \ No newline at end of file diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineErrorHandlingTest.kt index 3e0882e62e..8885b936ee 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineErrorHandlingTest.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineErrorHandlingTest.kt @@ -49,13 +49,16 @@ abstract class StateMachineErrorHandlingTest { counter = 0 } - internal fun startDriver(notarySpec: NotarySpec = NotarySpec(DUMMY_NOTARY_NAME), dsl: DriverDSL.() -> Unit) { + internal fun startDriver(notarySpec: NotarySpec = NotarySpec(DUMMY_NOTARY_NAME), + extraCordappPackagesToScan: List = emptyList(), + dsl: DriverDSL.() -> Unit) { driver( DriverParameters( notarySpecs = listOf(notarySpec), startNodesInProcess = false, inMemoryDB = false, - systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true") + systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true"), + extraCordappPackagesToScan = extraCordappPackagesToScan ) ) { dsl() diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt index 67a8f8144b..f14f60cc5b 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt @@ -354,6 +354,7 @@ class FlowHospitalTest { it.startFlow(::CreateTransactionButDontFinalizeFlow, nodeBHandle.nodeInfo.singleIdentity(), ref3).returnValue.getOrThrow(20.seconds) } waitForAllFlowsToComplete(nodeAHandle) + waitForAllFlowsToComplete(nodeBHandle) } assertEquals(0, dischargedCounter) assertEquals(1, observationCounter) diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index cb917e0b51..60df01f809 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -160,7 +160,6 @@ interface ServiceHubInternal : ServiceHubCoreInternal { vaultService: VaultServiceInternal, database: CordaPersistence) { database.transaction { - require(sigs.isNotEmpty()) { "No signatures passed in for recording" } recordTransactions(statesToRecord, listOf(txn), validatedTransactions, stateMachineRecordedTransactionMapping, vaultService, database) { validatedTransactions.finalizeTransactionWithExtraSignatures(it, sigs) } @@ -227,6 +226,7 @@ interface ServiceHubInternal : ServiceHubCoreInternal { override fun finalizeTransactionWithExtraSignatures(txn: SignedTransaction, sigs: Collection, statesToRecord: StatesToRecord) { requireSupportedHashType(txn) + require(sigs.isNotEmpty()) { "No signatures passed in for recording" } if (txn.coreTransaction is WireTransaction) (txn + sigs).verifyRequiredSignatures() finalizeTransactionWithExtraSignatures( @@ -240,7 +240,18 @@ interface ServiceHubInternal : ServiceHubCoreInternal { ) } - override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata?) { + override fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord, metadata: FlowTransactionMetadata) { + requireSupportedHashType(txn) + if (txn.coreTransaction is WireTransaction) + txn.verifyRequiredSignatures() + database.transaction { + recordTransactions(statesToRecord, listOf(txn), validatedTransactions, stateMachineRecordedTransactionMapping, vaultService, database) { + validatedTransactions.finalizeTransaction(txn, metadata) + } + } + } + + override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata) { if (txn.coreTransaction is WireTransaction) { txn.notary?.let { notary -> txn.verifySignaturesExcept(notary.owningKey) @@ -344,13 +355,13 @@ interface WritableTransactionStorage : TransactionStorage { fun addTransaction(transaction: SignedTransaction): Boolean /** - * Add an un-notarised transaction to the store with a status of *MISSING_TRANSACTION_SIG*. - * Optionally add finality flow recovery metadata. + * Add an un-notarised transaction to the store with a status of *MISSING_TRANSACTION_SIG* and inclusive of flow recovery metadata. + * * @param transaction The transaction to be recorded. * @param metadata Finality flow recovery metadata. * @return true if the transaction was recorded as a *new* transaction, false if the transaction already exists. */ - fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata? = null): Boolean + fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata): Boolean /** * Removes an un-notarised transaction (with a status of *MISSING_TRANSACTION_SIG*) from the data store. @@ -358,6 +369,15 @@ interface WritableTransactionStorage : TransactionStorage { */ fun removeUnnotarisedTransaction(id: SecureHash): Boolean + /** + * Add a finalised transaction to the store with flow recovery metadata. + * + * @param transaction The transaction to be recorded. + * @param metadata Finality flow recovery metadata. + * @return true if the transaction was recorded as a *new* transaction, false if the transaction already exists. + */ + fun finalizeTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata): Boolean + /** * Update a previously un-notarised transaction including associated notary signatures. * @param transaction The notarised transaction to be finalized. diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt index 7773fdcd22..dd69c26fc3 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt @@ -103,13 +103,13 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: enum class TransactionStatus { UNVERIFIED, VERIFIED, - MISSING_NOTARY_SIG; + IN_FLIGHT; fun toDatabaseValue(): String { return when (this) { UNVERIFIED -> "U" VERIFIED -> "V" - MISSING_NOTARY_SIG -> "M" + IN_FLIGHT -> "F" } } @@ -121,7 +121,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: return when(this) { UNVERIFIED -> net.corda.core.flows.TransactionStatus.UNVERIFIED VERIFIED -> net.corda.core.flows.TransactionStatus.VERIFIED - MISSING_NOTARY_SIG -> net.corda.core.flows.TransactionStatus.MISSING_NOTARY_SIG + IN_FLIGHT -> net.corda.core.flows.TransactionStatus.IN_FLIGHT } } @@ -130,7 +130,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: return when (databaseValue) { "V" -> VERIFIED "U" -> UNVERIFIED - "M" -> MISSING_NOTARY_SIG + "F" -> IN_FLIGHT else -> throw UnexpectedStatusValueException(databaseValue) } } @@ -241,7 +241,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: criteriaUpdate.set(updateRoot.get(DBTransaction::status.name), TransactionStatus.VERIFIED) criteriaUpdate.where(criteriaBuilder.and( criteriaBuilder.equal(updateRoot.get(DBTransaction::txId.name), txId.toString()), - criteriaBuilder.and(updateRoot.get(DBTransaction::status.name).`in`(setOf(TransactionStatus.UNVERIFIED, TransactionStatus.MISSING_NOTARY_SIG)) + criteriaBuilder.and(updateRoot.get(DBTransaction::status.name).`in`(setOf(TransactionStatus.UNVERIFIED, TransactionStatus.IN_FLIGHT)) ))) criteriaUpdate.set(updateRoot.get(DBTransaction::timestamp.name), clock.instant()) val update = session.createQuery(criteriaUpdate) @@ -254,20 +254,15 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: updateTransaction(transaction.id) } - override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata?) = - database.transaction { - txStorage.locked { - val cacheValue = TxCacheValue(transaction, status = TransactionStatus.MISSING_NOTARY_SIG, metadata = metadata) - val added = addWithDuplicatesAllowed(transaction.id, cacheValue) - if (added) { - logger.info ("Transaction ${transaction.id} recorded as un-notarised.") - } else { - logger.info("Transaction ${transaction.id} (un-notarised) already exists so no need to record.") - } - added - } + override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata) = + addTransaction(transaction, metadata, TransactionStatus.IN_FLIGHT) { + false } + override fun finalizeTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata) = + addTransaction(transaction, metadata) { + false + } override fun removeUnnotarisedTransaction(id: SecureHash): Boolean { return database.transaction { val session = currentDBSession() @@ -276,7 +271,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: val root = delete.from(DBTransaction::class.java) delete.where(criteriaBuilder.and( criteriaBuilder.equal(root.get(DBTransaction::txId.name), id.toString()), - criteriaBuilder.equal(root.get(DBTransaction::status.name), TransactionStatus.MISSING_NOTARY_SIG) + criteriaBuilder.equal(root.get(DBTransaction::status.name), TransactionStatus.IN_FLIGHT) )) if (session.createQuery(delete).executeUpdate() != 0) { txStorage.locked { @@ -294,16 +289,21 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: finalizeTransactionWithExtraSignatures(transaction.id, signatures) } - private fun addTransaction(transaction: SignedTransaction, updateFn: (SecureHash) -> Boolean): Boolean { + private fun addTransaction(transaction: SignedTransaction, + metadata: FlowTransactionMetadata? = null, + status: TransactionStatus = TransactionStatus.VERIFIED, + updateFn: (SecureHash) -> Boolean): Boolean { return database.transaction { txStorage.locked { - val cachedValue = TxCacheValue(transaction, TransactionStatus.VERIFIED) + val cachedValue = TxCacheValue(transaction, status, metadata) val addedOrUpdated = addOrUpdate(transaction.id, cachedValue) { k, _ -> updateFn(k) } if (addedOrUpdated) { - logger.debug { "Transaction ${transaction.id} has been recorded as verified" } - onNewTx(transaction) + logger.debug { "Transaction ${transaction.id} has been recorded as $status" } + if (status.isVerified()) + onNewTx(transaction) + true } else { - logger.debug { "Transaction ${transaction.id} is already recorded as verified, so no need to re-record" } + logger.debug { "Transaction ${transaction.id} is already recorded as $status, so no need to re-record" } false } } @@ -320,7 +320,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: criteriaUpdate.set(updateRoot.get(DBTransaction::status.name), TransactionStatus.VERIFIED) criteriaUpdate.where(criteriaBuilder.and( criteriaBuilder.equal(updateRoot.get(DBTransaction::txId.name), txId.toString()), - criteriaBuilder.equal(updateRoot.get(DBTransaction::status.name), TransactionStatus.MISSING_NOTARY_SIG) + criteriaBuilder.equal(updateRoot.get(DBTransaction::status.name), TransactionStatus.IN_FLIGHT) )) criteriaUpdate.set(updateRoot.get(DBTransaction::timestamp.name), clock.instant()) val update = session.createQuery(criteriaUpdate) @@ -360,7 +360,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: txStorage.locked { val cacheValue = TxCacheValue(transaction, status = TransactionStatus.UNVERIFIED) val added = addWithDuplicatesAllowed(transaction.id, cacheValue) { k, v, existingEntry -> - if (existingEntry.status == TransactionStatus.MISSING_NOTARY_SIG) { + if (existingEntry.status == TransactionStatus.IN_FLIGHT) { session.merge(toPersistentEntity(k, v)) true } else false diff --git a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt index 7e3ed4f2f9..d1436c94da 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt @@ -801,10 +801,10 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) { return true } - override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata?): Boolean { + override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata): Boolean { database.transaction { records.add(TxRecord.Add(transaction)) - delegate.addUnnotarisedTransaction(transaction) + delegate.addUnnotarisedTransaction(transaction, metadata) } return true } @@ -815,6 +815,13 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) { } } + override fun finalizeTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata): Boolean { + database.transaction { + delegate.finalizeTransaction(transaction, metadata) + } + return true + } + override fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection) : Boolean { database.transaction { delegate.finalizeTransactionWithExtraSignatures(transaction, signatures) diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt index 16889e886e..cfc9c3ecf1 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt @@ -19,7 +19,7 @@ import net.corda.core.transactions.WireTransaction import net.corda.node.CordaClock import net.corda.node.MutableClock import net.corda.node.SimpleClock -import net.corda.node.services.persistence.DBTransactionStorage.TransactionStatus.MISSING_NOTARY_SIG +import net.corda.node.services.persistence.DBTransactionStorage.TransactionStatus.IN_FLIGHT import net.corda.node.services.persistence.DBTransactionStorage.TransactionStatus.UNVERIFIED import net.corda.node.services.persistence.DBTransactionStorage.TransactionStatus.VERIFIED import net.corda.node.services.transactions.PersistentUniquenessProvider @@ -113,8 +113,8 @@ class DBTransactionStorageTests { val transactionClock = TransactionClock(now) newTransactionStorage(clock = transactionClock) val transaction = newTransaction() - transactionStorage.addUnnotarisedTransaction(transaction) - assertEquals(MISSING_NOTARY_SIG, readTransactionFromDB(transaction.id).status) + transactionStorage.addUnnotarisedTransaction(transaction, FlowTransactionMetadata(ALICE.party.name)) + assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status) } @Test(timeout = 300_000) @@ -125,7 +125,7 @@ class DBTransactionStorageTests { val transaction = newTransaction() transactionStorage.addUnnotarisedTransaction(transaction, FlowTransactionMetadata(ALICE.party.name, StatesToRecord.ALL_VISIBLE, setOf(BOB_PARTY.name))) val txn = readTransactionFromDB(transaction.id) - assertEquals(MISSING_NOTARY_SIG, txn.status) + assertEquals(IN_FLIGHT, txn.status) assertEquals(StatesToRecord.ALL_VISIBLE, txn.statesToRecord) assertEquals(ALICE_NAME.toString(), txn.initiator) assertEquals(listOf(BOB_NAME.toString()), txn.participants) @@ -144,15 +144,46 @@ class DBTransactionStorageTests { } } + @Test(timeout = 300_000) + fun `finalize transaction after recording transaction as un-notarised`() { + val now = Instant.ofEpochSecond(333444555L) + val transactionClock = TransactionClock(now) + newTransactionStorage(clock = transactionClock) + val transaction = newTransaction(notarySig = false) + transactionStorage.addUnnotarisedTransaction(transaction, FlowTransactionMetadata(ALICE.party.name)) + assertNull(transactionStorage.getTransaction(transaction.id)) + assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status) + transactionStorage.finalizeTransactionWithExtraSignatures(transaction, emptyList()) + readTransactionFromDB(transaction.id).let { + assertSignatures(it.transaction, it.signatures, transaction.sigs) + assertEquals(VERIFIED, it.status) + } + } + + @Test(timeout = 300_000) + fun `finalize transaction with recovery metadata`() { + val now = Instant.ofEpochSecond(333444555L) + val transactionClock = TransactionClock(now) + newTransactionStorage(clock = transactionClock) + val transaction = newTransaction(notarySig = false) + transactionStorage.finalizeTransaction(transaction, + FlowTransactionMetadata(ALICE_NAME)) + readTransactionFromDB(transaction.id).let { + assertEquals(VERIFIED, it.status) + assertEquals(ALICE_NAME.toString(), it.initiator) + assertEquals(StatesToRecord.ONLY_RELEVANT, it.statesToRecord) + } + } + @Test(timeout = 300_000) fun `finalize transaction with extra signatures after recording transaction as un-notarised`() { val now = Instant.ofEpochSecond(333444555L) val transactionClock = TransactionClock(now) newTransactionStorage(clock = transactionClock) val transaction = newTransaction(notarySig = false) - transactionStorage.addUnnotarisedTransaction(transaction) + transactionStorage.addUnnotarisedTransaction(transaction, FlowTransactionMetadata(ALICE.party.name)) assertNull(transactionStorage.getTransaction(transaction.id)) - assertEquals(MISSING_NOTARY_SIG, readTransactionFromDB(transaction.id).status) + assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status) val notarySig = notarySig(transaction.id) transactionStorage.finalizeTransactionWithExtraSignatures(transaction, listOf(notarySig)) readTransactionFromDB(transaction.id).let { @@ -167,9 +198,9 @@ class DBTransactionStorageTests { val transactionClock = TransactionClock(now) newTransactionStorage(clock = transactionClock) val transaction = newTransaction(notarySig = false) - transactionStorage.addUnnotarisedTransaction(transaction) + transactionStorage.addUnnotarisedTransaction(transaction, FlowTransactionMetadata(ALICE.party.name)) assertNull(transactionStorage.getTransaction(transaction.id)) - assertEquals(MISSING_NOTARY_SIG, readTransactionFromDB(transaction.id).status) + assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status) assertEquals(true, transactionStorage.removeUnnotarisedTransaction(transaction.id)) assertFailsWith { readTransactionFromDB(transaction.id).status } @@ -201,8 +232,8 @@ class DBTransactionStorageTests { val transactionWithoutNotarySig = newTransaction(notarySig = false) // txn recorded as un-notarised (simulate ReceiverFinalityFlow in initial flow) - transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySig) - assertEquals(MISSING_NOTARY_SIG, readTransactionFromDB(transactionWithoutNotarySig.id).status) + transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySig, FlowTransactionMetadata(ALICE.party.name)) + assertEquals(IN_FLIGHT, readTransactionFromDB(transactionWithoutNotarySig.id).status) // txn then recorded as unverified (simulate ResolveTransactionFlow in follow-up flow) val notarySig = notarySig(transactionWithoutNotarySig.id) @@ -232,8 +263,8 @@ class DBTransactionStorageTests { val transactionWithoutNotarySigs = newTransaction(notarySig = false) // txn recorded as un-notarised (simulate ReceiverFinalityFlow in initial flow) - transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySigs) - assertEquals(MISSING_NOTARY_SIG, readTransactionFromDB(transactionWithoutNotarySigs.id).status) + transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySigs, FlowTransactionMetadata(ALICE.party.name)) + assertEquals(IN_FLIGHT, readTransactionFromDB(transactionWithoutNotarySigs.id).status) // txn then recorded as unverified (simulate ResolveTransactionFlow in follow-up flow) val notarySig = notarySig(transactionWithoutNotarySigs.id) diff --git a/settings.gradle b/settings.gradle index 395b3a40b6..9c1a33ad3e 100644 --- a/settings.gradle +++ b/settings.gradle @@ -102,6 +102,7 @@ include 'testing:cordapps:dbfailure:dbfcontracts' include 'testing:cordapps:dbfailure:dbfworkflows' include 'testing:cordapps:missingmigration' include 'testing:cordapps:sleeping' +include 'testing:cordapps:cashobservers' // Common libraries - start include 'common-validation' diff --git a/testing/cordapps/cashobservers/build.gradle b/testing/cordapps/cashobservers/build.gradle new file mode 100644 index 0000000000..e8f0d47d5f --- /dev/null +++ b/testing/cordapps/cashobservers/build.gradle @@ -0,0 +1,17 @@ +apply plugin: 'kotlin' +//apply plugin: 'net.corda.plugins.cordapp' +//apply plugin: 'net.corda.plugins.quasar-utils' + +dependencies { + compile project(":core") + compile project(':finance:workflows') +} + +jar { + baseName "testing-cashobservers-cordapp" + manifest { + // This JAR is part of Corda's testing framework. + // Driver will not include it as part of an out-of-process node. + attributes('Corda-Testing': true) + } +} \ No newline at end of file diff --git a/testing/cordapps/cashobservers/src/main/kotlin/net/corda/finance/test/flows/CashIssueWithObserversFlow.kt b/testing/cordapps/cashobservers/src/main/kotlin/net/corda/finance/test/flows/CashIssueWithObserversFlow.kt new file mode 100644 index 0000000000..9829f0e4c5 --- /dev/null +++ b/testing/cordapps/cashobservers/src/main/kotlin/net/corda/finance/test/flows/CashIssueWithObserversFlow.kt @@ -0,0 +1,48 @@ +package net.corda.finance.test.flows + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.contracts.Amount +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.ReceiveFinalityFlow +import net.corda.core.flows.StartableByRPC +import net.corda.core.identity.Party +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.OpaqueBytes +import net.corda.finance.contracts.asset.Cash +import net.corda.finance.flows.AbstractCashFlow +import net.corda.finance.issuedBy +import java.util.Currency + +@StartableByRPC +@InitiatingFlow +class CashIssueWithObserversFlow(private val amount: Amount, + private val issuerBankPartyRef: OpaqueBytes, + private val notary: Party, + private val observers: Set) : AbstractCashFlow(tracker()) { + @Suspendable + override fun call(): Result { + progressTracker.currentStep = Companion.GENERATING_TX + val builder = TransactionBuilder(notary) + val issuer = ourIdentity.ref(issuerBankPartyRef) + val signers = Cash().generateIssue(builder, amount.issuedBy(issuer), ourIdentity, notary) + progressTracker.currentStep = Companion.SIGNING_TX + val tx = serviceHub.signInitialTransaction(builder, signers) + progressTracker.currentStep = Companion.FINALISING_TX + val observerSessions = observers.map { initiateFlow(it) } + val notarised = finaliseTx(tx, observerSessions, "Unable to notarise issue") + return Result(notarised, ourIdentity) + } +} + +@InitiatedBy(CashIssueWithObserversFlow::class) +class CashIssueReceiverFlowWithObservers(private val otherSide: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + if (!serviceHub.myInfo.isLegalIdentity(otherSide.counterparty)) { + subFlow(ReceiveFinalityFlow(otherSide)) + } + } +} \ No newline at end of file diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt index e0beb1ec00..00593c35ee 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt @@ -55,14 +55,17 @@ open class MockTransactionStorage : WritableTransactionStorage, SingletonSeriali } } - override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata?): Boolean { - return txns.putIfAbsent(transaction.id, TxHolder(transaction, status = TransactionStatus.MISSING_NOTARY_SIG)) == null + override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata): Boolean { + return txns.putIfAbsent(transaction.id, TxHolder(transaction, status = TransactionStatus.IN_FLIGHT)) == null } override fun removeUnnotarisedTransaction(id: SecureHash): Boolean { return txns.remove(id) != null } + override fun finalizeTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata) = + addTransaction(transaction) + override fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection): Boolean { val current = txns.replace(transaction.id, TxHolder(transaction, status = TransactionStatus.VERIFIED)) return if (current != null) { diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/dsl/TestDSL.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/dsl/TestDSL.kt index 0a63b813b1..103e954b5d 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/dsl/TestDSL.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/dsl/TestDSL.kt @@ -139,11 +139,13 @@ data class TestTransactionDSLInterpreter private constructor( override val attachmentsClassLoaderCache: AttachmentsClassLoaderCache = AttachmentsClassLoaderCacheImpl(TestingNamedCacheFactory()) - override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata?) {} + override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata) {} override fun removeUnnotarisedTransaction(id: SecureHash) {} override fun finalizeTransactionWithExtraSignatures(txn: SignedTransaction, sigs: Collection, statesToRecord: StatesToRecord) {} + + override fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord, metadata: FlowTransactionMetadata) {} } private fun copy(): TestTransactionDSLInterpreter =