ENT-9842 Re-factor 2PF to support issuance transactions (no notarisation) with observers. (#7349)

Re-factor 2PF to support issuance transactions (no notarisation) with observers.
This commit is contained in:
Jose Coll
2023-04-27 16:58:17 +01:00
committed by GitHub
parent 9ba3919980
commit c3e39a7052
18 changed files with 433 additions and 132 deletions

View File

@ -92,6 +92,8 @@ dependencies {
smokeTestCompile project(':smoke-test-utils') smokeTestCompile project(':smoke-test-utils')
smokeTestCompile "org.assertj:assertj-core:${assertj_version}" smokeTestCompile "org.assertj:assertj-core:${assertj_version}"
// used by FinalityFlowTests
testCompile project(':testing:cordapps:cashobservers')
} }
configurations { configurations {

View File

@ -46,6 +46,7 @@ import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.CashIssueFlow import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow import net.corda.finance.flows.CashPaymentFlow
import net.corda.finance.issuedBy import net.corda.finance.issuedBy
import net.corda.finance.test.flows.CashIssueWithObserversFlow
import net.corda.node.services.persistence.DBTransactionStorage import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.testing.contracts.DummyContract import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.ALICE_NAME
@ -64,6 +65,7 @@ import net.corda.testing.node.internal.TestCordappInternal
import net.corda.testing.node.internal.TestStartedNode import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.cordappWithPackages import net.corda.testing.node.internal.cordappWithPackages
import net.corda.testing.node.internal.enclosedCordapp import net.corda.testing.node.internal.enclosedCordapp
import net.corda.testing.node.internal.findCordapp
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.After import org.junit.After
import org.junit.Test import org.junit.Test
@ -79,6 +81,7 @@ class FinalityFlowTests : WithFinality {
} }
override val mockNet = InternalMockNetwork(cordappsForAllNodes = setOf(FINANCE_CONTRACTS_CORDAPP, FINANCE_WORKFLOWS_CORDAPP, DUMMY_CONTRACTS_CORDAPP, enclosedCordapp(), override val mockNet = InternalMockNetwork(cordappsForAllNodes = setOf(FINANCE_CONTRACTS_CORDAPP, FINANCE_WORKFLOWS_CORDAPP, DUMMY_CONTRACTS_CORDAPP, enclosedCordapp(),
findCordapp("net.corda.finance.test.flows"),
CustomCordapp(targetPlatformVersion = 3, classes = setOf(FinalityFlow::class.java)))) CustomCordapp(targetPlatformVersion = 3, classes = setOf(FinalityFlow::class.java))))
private val aliceNode = makeNode(ALICE_NAME) private val aliceNode = makeNode(ALICE_NAME)
@ -223,7 +226,7 @@ class FinalityFlowTests : WithFinality {
assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId)) assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId))
assertTxnRemovedFromDatabase(aliceNode, stxId) assertTxnRemovedFromDatabase(aliceNode, stxId)
val (_, txnStatus) = bobNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail() val (_, txnStatus) = bobNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail()
assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnStatus) assertEquals(TransactionStatus.IN_FLIGHT, txnStatus)
} }
} }
@ -295,7 +298,7 @@ class FinalityFlowTests : WithFinality {
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail() val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail()
assertEquals(TransactionStatus.VERIFIED, txnStatusAlice) assertEquals(TransactionStatus.VERIFIED, txnStatusAlice)
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail() val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail()
assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnStatusBob) assertEquals(TransactionStatus.IN_FLIGHT, txnStatusBob)
// now lets attempt a new spend with the new output of the previous transaction // now lets attempt a new spend with the new output of the previous transaction
val newStateRef = notarisedStxn1.coreTransaction.outRef<DummyContract.SingleOwnerState>(1) val newStateRef = notarisedStxn1.coreTransaction.outRef<DummyContract.SingleOwnerState>(1)
@ -309,7 +312,7 @@ class FinalityFlowTests : WithFinality {
val (_, txnStatusAlice2) = aliceNode.services.validatedTransactions.getTransactionInternal(notarisedStxn2.id) ?: fail() val (_, txnStatusAlice2) = aliceNode.services.validatedTransactions.getTransactionInternal(notarisedStxn2.id) ?: fail()
assertEquals(TransactionStatus.VERIFIED, txnStatusAlice2) assertEquals(TransactionStatus.VERIFIED, txnStatusAlice2)
val (_, txnStatusBob2) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn2.id) ?: fail() val (_, txnStatusBob2) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn2.id) ?: fail()
assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnStatusBob2) assertEquals(TransactionStatus.IN_FLIGHT, txnStatusBob2)
// Validate attempt at flow finalisation by Bob has no effect on outcome. // Validate attempt at flow finalisation by Bob has no effect on outcome.
val finaliseStxn1 = bobNode.startFlowAndRunNetwork(FinaliseSpeedySpendFlow(notarisedStxn1.id, notarisedStxn1.sigs)).resultFuture.getOrThrow() val finaliseStxn1 = bobNode.startFlowAndRunNetwork(FinaliseSpeedySpendFlow(notarisedStxn1.id, notarisedStxn1.sigs)).resultFuture.getOrThrow()
@ -328,10 +331,22 @@ class FinalityFlowTests : WithFinality {
catch (e: UnexpectedFlowEndException) { catch (e: UnexpectedFlowEndException) {
val stxId = SecureHash.parse(e.message) val stxId = SecureHash.parse(e.message)
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail() val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail()
assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnStatusBob) assertEquals(TransactionStatus.IN_FLIGHT, txnStatusBob)
} }
} }
@Test(timeout=300_000)
fun `two phase finality flow issuance transaction with observers`() {
val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY)
val stx = aliceNode.startFlowAndRunNetwork(CashIssueWithObserversFlow(
Amount(1000L, GBP), OpaqueBytes.of(1), notary,
observers = setOf(bobNode.info.singleIdentity()))).resultFuture.getOrThrow().stx
assertThat(aliceNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
assertThat(bobNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
}
@StartableByRPC @StartableByRPC
class IssueFlow(val notary: Party) : FlowLogic<StateAndRef<DummyContract.SingleOwnerState>>() { class IssueFlow(val notary: Party) : FlowLogic<StateAndRef<DummyContract.SingleOwnerState>>() {

View File

@ -27,7 +27,7 @@ import java.time.Duration
/** /**
* Verifies the given transaction, then sends it to the named notary. If the notary agrees that the transaction * Verifies the given transaction, then sends it to the named notary. If the notary agrees that the transaction
* is acceptable then it is from that point onwards committed to the ledger, and will be written through to the * is acceptable then it is from that point onwards committed to the ledger, and will be written through to the
* vault. Additionally it will be distributed to the parties reflected in the participants list of the states. * vault. Additionally, it will be distributed to the parties reflected in the participants list of the states.
* *
* By default, the initiating flow will commit states that are relevant to the initiating party as indicated by * By default, the initiating flow will commit states that are relevant to the initiating party as indicated by
* [StatesToRecord.ONLY_RELEVANT]. Relevance is determined by the union of all participants to states which have been * [StatesToRecord.ONLY_RELEVANT]. Relevance is determined by the union of all participants to states which have been
@ -159,6 +159,7 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
} }
@Suspendable @Suspendable
@Suppress("ComplexMethod", "NestedBlockDepth")
@Throws(NotaryException::class) @Throws(NotaryException::class)
override fun call(): SignedTransaction { override fun call(): SignedTransaction {
if (!newApi) { if (!newApi) {
@ -181,12 +182,12 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
val externalTxParticipants = extractExternalParticipants(ledgerTransaction) val externalTxParticipants = extractExternalParticipants(ledgerTransaction)
if (newApi) { if (newApi) {
val sessionParties = sessions.map { it.counterparty } val sessionParties = sessions.map { it.counterparty }.toSet()
val missingRecipients = externalTxParticipants - sessionParties - oldParticipants val missingRecipients = externalTxParticipants - sessionParties - oldParticipants.toSet()
require(missingRecipients.isEmpty()) { require(missingRecipients.isEmpty()) {
"Flow sessions were not provided for the following transaction participants: $missingRecipients" "Flow sessions were not provided for the following transaction participants: $missingRecipients"
} }
sessionParties.intersect(oldParticipants).let { sessionParties.intersect(oldParticipants.toSet()).let {
require(it.isEmpty()) { "The following parties are specified both in flow sessions and in the oldParticipants list: $it" } require(it.isEmpty()) { "The following parties are specified both in flow sessions and in the oldParticipants list: $it" }
} }
} }
@ -202,41 +203,48 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
serviceHub.networkMapCache.getNodeByLegalIdentity(it.counterparty)?.platformVersion!! < PlatformVersionSwitches.TWO_PHASE_FINALITY serviceHub.networkMapCache.getNodeByLegalIdentity(it.counterparty)?.platformVersion!! < PlatformVersionSwitches.TWO_PHASE_FINALITY
} }
val requiresNotarisation = needsNotarySignature(transaction)
val useTwoPhaseFinality = serviceHub.myInfo.platformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY val useTwoPhaseFinality = serviceHub.myInfo.platformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY
if (useTwoPhaseFinality && needsNotarySignature(transaction)) { if (useTwoPhaseFinality) {
recordLocallyAndBroadcast(newPlatformSessions, transaction) val stxn = if (requiresNotarisation) {
} recordLocallyAndBroadcast(newPlatformSessions, transaction)
try {
try { val (notarisedTxn, notarySignatures) = notarise()
val stxn = notariseOrRecord() if (newPlatformSessions.isNotEmpty()) {
val notarySignatures = stxn.sigs - transaction.sigs.toSet() broadcastSignaturesAndFinalise(newPlatformSessions, notarySignatures)
if (notarySignatures.isNotEmpty()) { } else {
if (useTwoPhaseFinality && newPlatformSessions.isNotEmpty()) { finaliseLocally(notarisedTxn, notarySignatures)
broadcastSignaturesAndFinalise(newPlatformSessions, notarySignatures)
} else {
progressTracker.currentStep = FINALISING_TRANSACTION
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) {
(serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(transaction, notarySignatures, statesToRecord)
logger.info("Finalised transaction locally.")
} }
notarisedTxn
} catch (e: NotaryException) {
(serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(transaction.id)
if (newPlatformSessions.isNotEmpty()) {
broadcastNotaryError(newPlatformSessions, e)
} else sleep(Duration.ZERO) // force checkpoint to persist db update.
throw e
} }
} }
else {
if (!useTwoPhaseFinality || !needsNotarySignature(transaction)) { if (newPlatformSessions.isNotEmpty())
broadcastToOtherParticipants(externalTxParticipants, newPlatformSessions + oldPlatformSessions, stxn) finaliseLocallyAndBroadcast(newPlatformSessions, transaction,
} else if (useTwoPhaseFinality && oldPlatformSessions.isNotEmpty()) { FlowTransactionMetadata(
broadcastToOtherParticipants(externalTxParticipants, oldPlatformSessions, stxn) serviceHub.myInfo.legalIdentities.first().name,
statesToRecord,
sessions.map { it.counterparty.name }.toSet()))
else
recordTransactionLocally(transaction)
transaction
} }
broadcastToOtherParticipants(externalTxParticipants, oldPlatformSessions, stxn)
return stxn return stxn
} }
catch (e: NotaryException) { else {
if (useTwoPhaseFinality) { val stxn = if (requiresNotarisation) {
(serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(transaction.id) notarise().first
if (newPlatformSessions.isNotEmpty()) { } else transaction
broadcastNotaryError(newPlatformSessions, e) recordTransactionLocally(stxn)
} else sleep(Duration.ZERO) // force checkpoint to persist db update. broadcastToOtherParticipants(externalTxParticipants, newPlatformSessions + oldPlatformSessions, stxn)
} return stxn
throw e
} }
} }
@ -244,16 +252,30 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
private fun recordLocallyAndBroadcast(sessions: Collection<FlowSession>, tx: SignedTransaction) { private fun recordLocallyAndBroadcast(sessions: Collection<FlowSession>, tx: SignedTransaction) {
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordLocallyAndBroadcast", flowLogic = this) { serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordLocallyAndBroadcast", flowLogic = this) {
recordUnnotarisedTransaction(tx) recordUnnotarisedTransaction(tx)
logger.info("Recorded transaction without notary signature locally.")
if (sessions.isEmpty()) return
progressTracker.currentStep = BROADCASTING_PRE_NOTARISATION progressTracker.currentStep = BROADCASTING_PRE_NOTARISATION
broadcast(sessions, tx)
}
}
@Suspendable
private fun finaliseLocallyAndBroadcast(sessions: Collection<FlowSession>, tx: SignedTransaction, metadata: FlowTransactionMetadata) {
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finaliseLocallyAndBroadcast", flowLogic = this) {
finaliseLocally(tx, metadata = metadata)
progressTracker.currentStep = BROADCASTING
broadcast(sessions, tx)
}
}
@Suspendable
private fun broadcast(sessions: Collection<FlowSession>, tx: SignedTransaction) {
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#broadcast", flowLogic = this) {
sessions.forEach { session -> sessions.forEach { session ->
try { try {
logger.debug { "Sending transaction to party $session." } logger.debug { "Sending transaction to party $session." }
subFlow(SendTransactionFlow(session, tx)) subFlow(SendTransactionFlow(session, tx))
} catch (e: UnexpectedFlowEndException) { } catch (e: UnexpectedFlowEndException) {
throw UnexpectedFlowEndException( throw UnexpectedFlowEndException(
"${session.counterparty} has finished prematurely and we're trying to send them a transaction without notary signature. " + "${session.counterparty} has finished prematurely and we're trying to send them a transaction." +
"Did they forget to call ReceiveFinalityFlow? (${e.message})", "Did they forget to call ReceiveFinalityFlow? (${e.message})",
e.cause, e.cause,
e.originalErrorId e.originalErrorId
@ -282,9 +304,20 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
) )
} }
} }
progressTracker.currentStep = FINALISING_TRANSACTION finaliseLocally(transaction, notarySignatures)
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) { }
(serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(transaction, notarySignatures, statesToRecord) }
@Suspendable
private fun finaliseLocally(stx: SignedTransaction, notarySignatures: List<TransactionSignature> = emptyList(),
metadata: FlowTransactionMetadata? = null) {
progressTracker.currentStep = FINALISING_TRANSACTION
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finaliseLocally", flowLogic = this) {
if (notarySignatures.isEmpty()) {
(serviceHub as ServiceHubCoreInternal).finalizeTransaction(stx, statesToRecord, metadata!!)
logger.info("Finalised transaction locally.")
} else {
(serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx, notarySignatures, statesToRecord)
logger.info("Finalised transaction locally with notary signature.") logger.info("Finalised transaction locally with notary signature.")
} }
} }
@ -376,22 +409,17 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
serviceHub.myInfo.legalIdentities.first().name, serviceHub.myInfo.legalIdentities.first().name,
statesToRecord, statesToRecord,
sessions.map { it.counterparty.name }.toSet())) sessions.map { it.counterparty.name }.toSet()))
logger.info("Recorded un-notarised transaction locally.")
return tx return tx
} }
} }
@Suspendable @Suspendable
private fun notariseOrRecord(): SignedTransaction { private fun notarise(): Pair<SignedTransaction, List<TransactionSignature>> {
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#notariseOrRecord", flowLogic = this) { return serviceHub.telemetryServiceInternal.span("${this::class.java.name}#notariseOrRecord", flowLogic = this) {
return if (needsNotarySignature(transaction)) { progressTracker.currentStep = NOTARISING
progressTracker.currentStep = NOTARISING val notarySignatures = subFlow(NotaryFlow.Client(transaction, skipVerification = true))
val notarySignatures = subFlow(NotaryFlow.Client(transaction, skipVerification = true)) Pair(transaction + notarySignatures, notarySignatures)
transaction + notarySignatures
} else {
logger.info("No need to notarise this transaction. Recording locally.")
recordTransactionLocally(transaction)
transaction
}
} }
} }
@ -409,7 +437,7 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
private fun extractExternalParticipants(ltx: LedgerTransaction): Set<Party> { private fun extractExternalParticipants(ltx: LedgerTransaction): Set<Party> {
val participants = ltx.outputStates.flatMap { it.participants } + ltx.inputStates.flatMap { it.participants } val participants = ltx.outputStates.flatMap { it.participants } + ltx.inputStates.flatMap { it.participants }
return groupAbstractPartyByWellKnownParty(serviceHub, participants).keys - serviceHub.myInfo.legalIdentities return groupAbstractPartyByWellKnownParty(serviceHub, participants).keys - serviceHub.myInfo.legalIdentities.toSet()
} }
private fun verifyTx(): LedgerTransaction { private fun verifyTx(): LedgerTransaction {
@ -456,40 +484,49 @@ class ReceiveFinalityFlow @JvmOverloads constructor(private val otherSideSession
private val expectedTxId: SecureHash? = null, private val expectedTxId: SecureHash? = null,
private val statesToRecord: StatesToRecord = ONLY_RELEVANT, private val statesToRecord: StatesToRecord = ONLY_RELEVANT,
private val handlePropagatedNotaryError: Boolean? = null) : FlowLogic<SignedTransaction>() { private val handlePropagatedNotaryError: Boolean? = null) : FlowLogic<SignedTransaction>() {
@Suppress("ComplexMethod") @Suppress("ComplexMethod", "NestedBlockDepth")
@Suspendable @Suspendable
override fun call(): SignedTransaction { override fun call(): SignedTransaction {
val stx = subFlow(ReceiveTransactionFlow(otherSideSession, checkSufficientSignatures = false, statesToRecord = statesToRecord, deferredAck = true)) val stx = subFlow(ReceiveTransactionFlow(otherSideSession, checkSufficientSignatures = false, statesToRecord = statesToRecord, deferredAck = true))
val requiresNotarisation = needsNotarySignature(stx)
val fromTwoPhaseFinalityNode = serviceHub.networkMapCache.getNodeByLegalIdentity(otherSideSession.counterparty)?.platformVersion!! >= PlatformVersionSwitches.TWO_PHASE_FINALITY val fromTwoPhaseFinalityNode = serviceHub.networkMapCache.getNodeByLegalIdentity(otherSideSession.counterparty)?.platformVersion!! >= PlatformVersionSwitches.TWO_PHASE_FINALITY
if (fromTwoPhaseFinalityNode && needsNotarySignature(stx)) { if (fromTwoPhaseFinalityNode) {
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) { if (requiresNotarisation) {
logger.debug { "Peer recording transaction without notary signature." } serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) {
(serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx, logger.debug { "Peer recording transaction without notary signature." }
FlowTransactionMetadata(otherSideSession.counterparty.name, statesToRecord)) (serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx,
} FlowTransactionMetadata(otherSideSession.counterparty.name, statesToRecord))
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)
logger.info("Peer recorded transaction without notary signature. Waiting to receive notary signature.")
try {
val notarySignatures = otherSideSession.receive<Try<List<TransactionSignature>>>().unwrap { it.getOrThrow() }
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) {
logger.debug { "Peer received notarised signature." }
(serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx, notarySignatures, statesToRecord)
logger.info("Peer finalised transaction with notary signature.")
} }
} catch (e: NotaryException) { otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)
logger.info("Peer received notary error.") logger.info("Peer recorded transaction without notary signature. Waiting to receive notary signature.")
val overrideHandlePropagatedNotaryError = handlePropagatedNotaryError ?: try {
val notarySignatures = otherSideSession.receive<Try<List<TransactionSignature>>>().unwrap { it.getOrThrow() }
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) {
logger.debug { "Peer received notarised signature." }
(serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx, notarySignatures, statesToRecord)
logger.info("Peer finalised transaction with notary signature.")
}
} catch (e: NotaryException) {
logger.info("Peer received notary error.")
val overrideHandlePropagatedNotaryError = handlePropagatedNotaryError ?:
(serviceHub.cordappProvider.getAppContext().cordapp.targetPlatformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY) (serviceHub.cordappProvider.getAppContext().cordapp.targetPlatformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY)
if (overrideHandlePropagatedNotaryError) { if (overrideHandlePropagatedNotaryError) {
(serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(stx.id) (serviceHub as ServiceHubCoreInternal).removeUnnotarisedTransaction(stx.id)
sleep(Duration.ZERO) // force checkpoint to persist db update. sleep(Duration.ZERO) // force checkpoint to persist db update.
throw e throw e
}
else {
otherSideSession.receive<Any>() // simulate unexpected flow end
}
} }
else { } else {
otherSideSession.receive<Any>() // simulate unexpected flow end serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransaction", flowLogic = this) {
(serviceHub as ServiceHubCoreInternal).finalizeTransaction(stx, statesToRecord,
FlowTransactionMetadata(otherSideSession.counterparty.name, statesToRecord))
logger.info("Peer recorded transaction with recovery metadata.")
} }
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)
} }
} else { } else {
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordTransactions", flowLogic = this) { serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordTransactions", flowLogic = this) {

View File

@ -33,5 +33,5 @@ data class FlowTransactionMetadata(
enum class TransactionStatus { enum class TransactionStatus {
UNVERIFIED, UNVERIFIED,
VERIFIED, VERIFIED,
MISSING_NOTARY_SIG; IN_FLIGHT;
} }

View File

@ -30,13 +30,14 @@ interface ServiceHubCoreInternal : ServiceHub {
val attachmentsClassLoaderCache: AttachmentsClassLoaderCache val attachmentsClassLoaderCache: AttachmentsClassLoaderCache
/** /**
* Stores [SignedTransaction] and participant signatures without the notary signature in the local transaction storage. * Stores [SignedTransaction] and participant signatures without the notary signature in the local transaction storage,
* Optionally add finality flow recovery metadata. * inclusive of flow recovery metadata.
* This is expected to be run within a database transaction. * This is expected to be run within a database transaction.
* *
* @param txn The transaction to record. * @param txn The transaction to record.
* @param metadata Finality flow recovery metadata.
*/ */
fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata?= null) fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata)
/** /**
* Removes transaction from data store. * Removes transaction from data store.
@ -54,6 +55,15 @@ interface ServiceHubCoreInternal : ServiceHub {
* @param statesToRecord how the vault should treat the output states of the transaction. * @param statesToRecord how the vault should treat the output states of the transaction.
*/ */
fun finalizeTransactionWithExtraSignatures(txn: SignedTransaction, sigs: Collection<TransactionSignature>, statesToRecord: StatesToRecord) fun finalizeTransactionWithExtraSignatures(txn: SignedTransaction, sigs: Collection<TransactionSignature>, statesToRecord: StatesToRecord)
/**
* Records a [SignedTransaction] as VERIFIED with flow recovery metadata.
*
* @param txn The transaction to record.
* @param statesToRecord how the vault should treat the output states of the transaction.
* @param metadata Finality flow recovery metadata.
*/
fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord, metadata: FlowTransactionMetadata)
} }
interface TransactionsResolver { interface TransactionsResolver {

View File

@ -251,6 +251,9 @@ dependencies {
integrationTestCompile(project(":testing:cordapps:missingmigration")) integrationTestCompile(project(":testing:cordapps:missingmigration"))
testCompile project(':testing:cordapps:dbfailure:dbfworkflows') testCompile project(':testing:cordapps:dbfailure:dbfworkflows')
// used by FinalityFlowErrorHandlingTest
slowIntegrationTestCompile project(':testing:cordapps:cashobservers')
} }
tasks.withType(JavaCompile).configureEach { tasks.withType(JavaCompile).configureEach {

View File

@ -0,0 +1,101 @@
package net.corda.node.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.CordaRuntimeException
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.finance.DOLLARS
import net.corda.finance.test.flows.CashIssueWithObserversFlow
import net.corda.node.services.statemachine.StateMachineErrorHandlingTest
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.flows.waitForAllFlowsToComplete
import net.corda.testing.node.NotarySpec
import net.corda.testing.node.internal.FINANCE_CORDAPPS
import net.corda.testing.node.internal.enclosedCordapp
import org.junit.Test
import kotlin.test.assertEquals
import kotlin.test.fail
class FinalityFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
/**
* Throws an exception after recording an issuance transaction but before broadcasting the transaction to observer sessions.
*
*/
@Test(timeout = 300_000)
fun `error after recording an issuance transaction inside of FinalityFlow generates recovery metadata`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false),
extraCordappPackagesToScan = listOf("net.corda.node.flows", "net.corda.finance.test.flows")) {
val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME, FINANCE_CORDAPPS + enclosedCordapp())
val rules = """
RULE Set flag when entering receive finality flow
CLASS ${FinalityFlow::class.java.name}
METHOD call
AT ENTRY
IF !flagged("finality_flag")
DO flag("finality_flag"); traceln("Setting finality flag")
ENDRULE
RULE Throw exception when recording transaction
CLASS ${FinalityFlow::class.java.name}
METHOD finaliseLocallyAndBroadcast
AT EXIT
IF flagged("finality_flag")
DO traceln("Throwing exception");
throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
try {
alice.rpc.startFlow(
::CashIssueWithObserversFlow,
500.DOLLARS,
OpaqueBytes.of(0x01),
defaultNotaryIdentity,
setOf(charlie.nodeInfo.singleIdentity())
).returnValue.getOrThrow(30.seconds)
fail()
}
catch (e: CordaRuntimeException) {
waitForAllFlowsToComplete(alice)
val txId = alice.rpc.stateMachineRecordedTransactionMappingSnapshot().single().transactionId
alice.rpc.startFlow(::GetFlowTransaction, txId).returnValue.getOrThrow().apply {
assertEquals("V", this.first) // "V" -> VERIFIED
assertEquals(ALICE_NAME.toString(), this.second) // initiator
assertEquals(CHARLIE_NAME.toString(), this.third) // peers
}
}
}
}
}
// Internal use for testing only!!
@StartableByRPC
class GetFlowTransaction(private val txId: SecureHash) : FlowLogic<Triple<String, String, String>>() {
@Suspendable
override fun call(): Triple<String, String, String> {
return serviceHub.jdbcSession().prepareStatement("select * from node_transactions where tx_id = ?")
.apply { setString(1, txId.toString()) }
.use { ps ->
ps.executeQuery().use { rs ->
rs.next()
Triple(rs.getString(4), // TransactionStatus
rs.getString(7), // initiator
rs.getString(8)) // participants
}
}
}
}

View File

@ -49,13 +49,16 @@ abstract class StateMachineErrorHandlingTest {
counter = 0 counter = 0
} }
internal fun startDriver(notarySpec: NotarySpec = NotarySpec(DUMMY_NOTARY_NAME), dsl: DriverDSL.() -> Unit) { internal fun startDriver(notarySpec: NotarySpec = NotarySpec(DUMMY_NOTARY_NAME),
extraCordappPackagesToScan: List<String> = emptyList(),
dsl: DriverDSL.() -> Unit) {
driver( driver(
DriverParameters( DriverParameters(
notarySpecs = listOf(notarySpec), notarySpecs = listOf(notarySpec),
startNodesInProcess = false, startNodesInProcess = false,
inMemoryDB = false, inMemoryDB = false,
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true") systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true"),
extraCordappPackagesToScan = extraCordappPackagesToScan
) )
) { ) {
dsl() dsl()

View File

@ -354,6 +354,7 @@ class FlowHospitalTest {
it.startFlow(::CreateTransactionButDontFinalizeFlow, nodeBHandle.nodeInfo.singleIdentity(), ref3).returnValue.getOrThrow(20.seconds) it.startFlow(::CreateTransactionButDontFinalizeFlow, nodeBHandle.nodeInfo.singleIdentity(), ref3).returnValue.getOrThrow(20.seconds)
} }
waitForAllFlowsToComplete(nodeAHandle) waitForAllFlowsToComplete(nodeAHandle)
waitForAllFlowsToComplete(nodeBHandle)
} }
assertEquals(0, dischargedCounter) assertEquals(0, dischargedCounter)
assertEquals(1, observationCounter) assertEquals(1, observationCounter)

View File

@ -160,7 +160,6 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
vaultService: VaultServiceInternal, vaultService: VaultServiceInternal,
database: CordaPersistence) { database: CordaPersistence) {
database.transaction { database.transaction {
require(sigs.isNotEmpty()) { "No signatures passed in for recording" }
recordTransactions(statesToRecord, listOf(txn), validatedTransactions, stateMachineRecordedTransactionMapping, vaultService, database) { recordTransactions(statesToRecord, listOf(txn), validatedTransactions, stateMachineRecordedTransactionMapping, vaultService, database) {
validatedTransactions.finalizeTransactionWithExtraSignatures(it, sigs) validatedTransactions.finalizeTransactionWithExtraSignatures(it, sigs)
} }
@ -227,6 +226,7 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
override fun finalizeTransactionWithExtraSignatures(txn: SignedTransaction, sigs: Collection<TransactionSignature>, statesToRecord: StatesToRecord) { override fun finalizeTransactionWithExtraSignatures(txn: SignedTransaction, sigs: Collection<TransactionSignature>, statesToRecord: StatesToRecord) {
requireSupportedHashType(txn) requireSupportedHashType(txn)
require(sigs.isNotEmpty()) { "No signatures passed in for recording" }
if (txn.coreTransaction is WireTransaction) if (txn.coreTransaction is WireTransaction)
(txn + sigs).verifyRequiredSignatures() (txn + sigs).verifyRequiredSignatures()
finalizeTransactionWithExtraSignatures( finalizeTransactionWithExtraSignatures(
@ -240,7 +240,18 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
) )
} }
override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata?) { override fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord, metadata: FlowTransactionMetadata) {
requireSupportedHashType(txn)
if (txn.coreTransaction is WireTransaction)
txn.verifyRequiredSignatures()
database.transaction {
recordTransactions(statesToRecord, listOf(txn), validatedTransactions, stateMachineRecordedTransactionMapping, vaultService, database) {
validatedTransactions.finalizeTransaction(txn, metadata)
}
}
}
override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata) {
if (txn.coreTransaction is WireTransaction) { if (txn.coreTransaction is WireTransaction) {
txn.notary?.let { notary -> txn.notary?.let { notary ->
txn.verifySignaturesExcept(notary.owningKey) txn.verifySignaturesExcept(notary.owningKey)
@ -344,13 +355,13 @@ interface WritableTransactionStorage : TransactionStorage {
fun addTransaction(transaction: SignedTransaction): Boolean fun addTransaction(transaction: SignedTransaction): Boolean
/** /**
* Add an un-notarised transaction to the store with a status of *MISSING_TRANSACTION_SIG*. * Add an un-notarised transaction to the store with a status of *MISSING_TRANSACTION_SIG* and inclusive of flow recovery metadata.
* Optionally add finality flow recovery metadata. *
* @param transaction The transaction to be recorded. * @param transaction The transaction to be recorded.
* @param metadata Finality flow recovery metadata. * @param metadata Finality flow recovery metadata.
* @return true if the transaction was recorded as a *new* transaction, false if the transaction already exists. * @return true if the transaction was recorded as a *new* transaction, false if the transaction already exists.
*/ */
fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata? = null): Boolean fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata): Boolean
/** /**
* Removes an un-notarised transaction (with a status of *MISSING_TRANSACTION_SIG*) from the data store. * Removes an un-notarised transaction (with a status of *MISSING_TRANSACTION_SIG*) from the data store.
@ -358,6 +369,15 @@ interface WritableTransactionStorage : TransactionStorage {
*/ */
fun removeUnnotarisedTransaction(id: SecureHash): Boolean fun removeUnnotarisedTransaction(id: SecureHash): Boolean
/**
* Add a finalised transaction to the store with flow recovery metadata.
*
* @param transaction The transaction to be recorded.
* @param metadata Finality flow recovery metadata.
* @return true if the transaction was recorded as a *new* transaction, false if the transaction already exists.
*/
fun finalizeTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata): Boolean
/** /**
* Update a previously un-notarised transaction including associated notary signatures. * Update a previously un-notarised transaction including associated notary signatures.
* @param transaction The notarised transaction to be finalized. * @param transaction The notarised transaction to be finalized.

View File

@ -103,13 +103,13 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
enum class TransactionStatus { enum class TransactionStatus {
UNVERIFIED, UNVERIFIED,
VERIFIED, VERIFIED,
MISSING_NOTARY_SIG; IN_FLIGHT;
fun toDatabaseValue(): String { fun toDatabaseValue(): String {
return when (this) { return when (this) {
UNVERIFIED -> "U" UNVERIFIED -> "U"
VERIFIED -> "V" VERIFIED -> "V"
MISSING_NOTARY_SIG -> "M" IN_FLIGHT -> "F"
} }
} }
@ -121,7 +121,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
return when(this) { return when(this) {
UNVERIFIED -> net.corda.core.flows.TransactionStatus.UNVERIFIED UNVERIFIED -> net.corda.core.flows.TransactionStatus.UNVERIFIED
VERIFIED -> net.corda.core.flows.TransactionStatus.VERIFIED VERIFIED -> net.corda.core.flows.TransactionStatus.VERIFIED
MISSING_NOTARY_SIG -> net.corda.core.flows.TransactionStatus.MISSING_NOTARY_SIG IN_FLIGHT -> net.corda.core.flows.TransactionStatus.IN_FLIGHT
} }
} }
@ -130,7 +130,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
return when (databaseValue) { return when (databaseValue) {
"V" -> VERIFIED "V" -> VERIFIED
"U" -> UNVERIFIED "U" -> UNVERIFIED
"M" -> MISSING_NOTARY_SIG "F" -> IN_FLIGHT
else -> throw UnexpectedStatusValueException(databaseValue) else -> throw UnexpectedStatusValueException(databaseValue)
} }
} }
@ -241,7 +241,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
criteriaUpdate.set(updateRoot.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.VERIFIED) criteriaUpdate.set(updateRoot.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.VERIFIED)
criteriaUpdate.where(criteriaBuilder.and( criteriaUpdate.where(criteriaBuilder.and(
criteriaBuilder.equal(updateRoot.get<String>(DBTransaction::txId.name), txId.toString()), criteriaBuilder.equal(updateRoot.get<String>(DBTransaction::txId.name), txId.toString()),
criteriaBuilder.and(updateRoot.get<TransactionStatus>(DBTransaction::status.name).`in`(setOf(TransactionStatus.UNVERIFIED, TransactionStatus.MISSING_NOTARY_SIG)) criteriaBuilder.and(updateRoot.get<TransactionStatus>(DBTransaction::status.name).`in`(setOf(TransactionStatus.UNVERIFIED, TransactionStatus.IN_FLIGHT))
))) )))
criteriaUpdate.set(updateRoot.get<Instant>(DBTransaction::timestamp.name), clock.instant()) criteriaUpdate.set(updateRoot.get<Instant>(DBTransaction::timestamp.name), clock.instant())
val update = session.createQuery(criteriaUpdate) val update = session.createQuery(criteriaUpdate)
@ -254,20 +254,15 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
updateTransaction(transaction.id) updateTransaction(transaction.id)
} }
override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata?) = override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata) =
database.transaction { addTransaction(transaction, metadata, TransactionStatus.IN_FLIGHT) {
txStorage.locked { false
val cacheValue = TxCacheValue(transaction, status = TransactionStatus.MISSING_NOTARY_SIG, metadata = metadata)
val added = addWithDuplicatesAllowed(transaction.id, cacheValue)
if (added) {
logger.info ("Transaction ${transaction.id} recorded as un-notarised.")
} else {
logger.info("Transaction ${transaction.id} (un-notarised) already exists so no need to record.")
}
added
}
} }
override fun finalizeTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata) =
addTransaction(transaction, metadata) {
false
}
override fun removeUnnotarisedTransaction(id: SecureHash): Boolean { override fun removeUnnotarisedTransaction(id: SecureHash): Boolean {
return database.transaction { return database.transaction {
val session = currentDBSession() val session = currentDBSession()
@ -276,7 +271,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
val root = delete.from(DBTransaction::class.java) val root = delete.from(DBTransaction::class.java)
delete.where(criteriaBuilder.and( delete.where(criteriaBuilder.and(
criteriaBuilder.equal(root.get<String>(DBTransaction::txId.name), id.toString()), criteriaBuilder.equal(root.get<String>(DBTransaction::txId.name), id.toString()),
criteriaBuilder.equal(root.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.MISSING_NOTARY_SIG) criteriaBuilder.equal(root.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.IN_FLIGHT)
)) ))
if (session.createQuery(delete).executeUpdate() != 0) { if (session.createQuery(delete).executeUpdate() != 0) {
txStorage.locked { txStorage.locked {
@ -294,16 +289,21 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
finalizeTransactionWithExtraSignatures(transaction.id, signatures) finalizeTransactionWithExtraSignatures(transaction.id, signatures)
} }
private fun addTransaction(transaction: SignedTransaction, updateFn: (SecureHash) -> Boolean): Boolean { private fun addTransaction(transaction: SignedTransaction,
metadata: FlowTransactionMetadata? = null,
status: TransactionStatus = TransactionStatus.VERIFIED,
updateFn: (SecureHash) -> Boolean): Boolean {
return database.transaction { return database.transaction {
txStorage.locked { txStorage.locked {
val cachedValue = TxCacheValue(transaction, TransactionStatus.VERIFIED) val cachedValue = TxCacheValue(transaction, status, metadata)
val addedOrUpdated = addOrUpdate(transaction.id, cachedValue) { k, _ -> updateFn(k) } val addedOrUpdated = addOrUpdate(transaction.id, cachedValue) { k, _ -> updateFn(k) }
if (addedOrUpdated) { if (addedOrUpdated) {
logger.debug { "Transaction ${transaction.id} has been recorded as verified" } logger.debug { "Transaction ${transaction.id} has been recorded as $status" }
onNewTx(transaction) if (status.isVerified())
onNewTx(transaction)
true
} else { } else {
logger.debug { "Transaction ${transaction.id} is already recorded as verified, so no need to re-record" } logger.debug { "Transaction ${transaction.id} is already recorded as $status, so no need to re-record" }
false false
} }
} }
@ -320,7 +320,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
criteriaUpdate.set(updateRoot.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.VERIFIED) criteriaUpdate.set(updateRoot.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.VERIFIED)
criteriaUpdate.where(criteriaBuilder.and( criteriaUpdate.where(criteriaBuilder.and(
criteriaBuilder.equal(updateRoot.get<String>(DBTransaction::txId.name), txId.toString()), criteriaBuilder.equal(updateRoot.get<String>(DBTransaction::txId.name), txId.toString()),
criteriaBuilder.equal(updateRoot.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.MISSING_NOTARY_SIG) criteriaBuilder.equal(updateRoot.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.IN_FLIGHT)
)) ))
criteriaUpdate.set(updateRoot.get<Instant>(DBTransaction::timestamp.name), clock.instant()) criteriaUpdate.set(updateRoot.get<Instant>(DBTransaction::timestamp.name), clock.instant())
val update = session.createQuery(criteriaUpdate) val update = session.createQuery(criteriaUpdate)
@ -360,7 +360,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
txStorage.locked { txStorage.locked {
val cacheValue = TxCacheValue(transaction, status = TransactionStatus.UNVERIFIED) val cacheValue = TxCacheValue(transaction, status = TransactionStatus.UNVERIFIED)
val added = addWithDuplicatesAllowed(transaction.id, cacheValue) { k, v, existingEntry -> val added = addWithDuplicatesAllowed(transaction.id, cacheValue) { k, v, existingEntry ->
if (existingEntry.status == TransactionStatus.MISSING_NOTARY_SIG) { if (existingEntry.status == TransactionStatus.IN_FLIGHT) {
session.merge(toPersistentEntity(k, v)) session.merge(toPersistentEntity(k, v))
true true
} else false } else false

View File

@ -801,10 +801,10 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
return true return true
} }
override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata?): Boolean { override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata): Boolean {
database.transaction { database.transaction {
records.add(TxRecord.Add(transaction)) records.add(TxRecord.Add(transaction))
delegate.addUnnotarisedTransaction(transaction) delegate.addUnnotarisedTransaction(transaction, metadata)
} }
return true return true
} }
@ -815,6 +815,13 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
} }
} }
override fun finalizeTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata): Boolean {
database.transaction {
delegate.finalizeTransaction(transaction, metadata)
}
return true
}
override fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection<TransactionSignature>) : Boolean { override fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection<TransactionSignature>) : Boolean {
database.transaction { database.transaction {
delegate.finalizeTransactionWithExtraSignatures(transaction, signatures) delegate.finalizeTransactionWithExtraSignatures(transaction, signatures)

View File

@ -19,7 +19,7 @@ import net.corda.core.transactions.WireTransaction
import net.corda.node.CordaClock import net.corda.node.CordaClock
import net.corda.node.MutableClock import net.corda.node.MutableClock
import net.corda.node.SimpleClock import net.corda.node.SimpleClock
import net.corda.node.services.persistence.DBTransactionStorage.TransactionStatus.MISSING_NOTARY_SIG import net.corda.node.services.persistence.DBTransactionStorage.TransactionStatus.IN_FLIGHT
import net.corda.node.services.persistence.DBTransactionStorage.TransactionStatus.UNVERIFIED import net.corda.node.services.persistence.DBTransactionStorage.TransactionStatus.UNVERIFIED
import net.corda.node.services.persistence.DBTransactionStorage.TransactionStatus.VERIFIED import net.corda.node.services.persistence.DBTransactionStorage.TransactionStatus.VERIFIED
import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.services.transactions.PersistentUniquenessProvider
@ -113,8 +113,8 @@ class DBTransactionStorageTests {
val transactionClock = TransactionClock(now) val transactionClock = TransactionClock(now)
newTransactionStorage(clock = transactionClock) newTransactionStorage(clock = transactionClock)
val transaction = newTransaction() val transaction = newTransaction()
transactionStorage.addUnnotarisedTransaction(transaction) transactionStorage.addUnnotarisedTransaction(transaction, FlowTransactionMetadata(ALICE.party.name))
assertEquals(MISSING_NOTARY_SIG, readTransactionFromDB(transaction.id).status) assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status)
} }
@Test(timeout = 300_000) @Test(timeout = 300_000)
@ -125,7 +125,7 @@ class DBTransactionStorageTests {
val transaction = newTransaction() val transaction = newTransaction()
transactionStorage.addUnnotarisedTransaction(transaction, FlowTransactionMetadata(ALICE.party.name, StatesToRecord.ALL_VISIBLE, setOf(BOB_PARTY.name))) transactionStorage.addUnnotarisedTransaction(transaction, FlowTransactionMetadata(ALICE.party.name, StatesToRecord.ALL_VISIBLE, setOf(BOB_PARTY.name)))
val txn = readTransactionFromDB(transaction.id) val txn = readTransactionFromDB(transaction.id)
assertEquals(MISSING_NOTARY_SIG, txn.status) assertEquals(IN_FLIGHT, txn.status)
assertEquals(StatesToRecord.ALL_VISIBLE, txn.statesToRecord) assertEquals(StatesToRecord.ALL_VISIBLE, txn.statesToRecord)
assertEquals(ALICE_NAME.toString(), txn.initiator) assertEquals(ALICE_NAME.toString(), txn.initiator)
assertEquals(listOf(BOB_NAME.toString()), txn.participants) assertEquals(listOf(BOB_NAME.toString()), txn.participants)
@ -144,15 +144,46 @@ class DBTransactionStorageTests {
} }
} }
@Test(timeout = 300_000)
fun `finalize transaction after recording transaction as un-notarised`() {
val now = Instant.ofEpochSecond(333444555L)
val transactionClock = TransactionClock(now)
newTransactionStorage(clock = transactionClock)
val transaction = newTransaction(notarySig = false)
transactionStorage.addUnnotarisedTransaction(transaction, FlowTransactionMetadata(ALICE.party.name))
assertNull(transactionStorage.getTransaction(transaction.id))
assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status)
transactionStorage.finalizeTransactionWithExtraSignatures(transaction, emptyList())
readTransactionFromDB(transaction.id).let {
assertSignatures(it.transaction, it.signatures, transaction.sigs)
assertEquals(VERIFIED, it.status)
}
}
@Test(timeout = 300_000)
fun `finalize transaction with recovery metadata`() {
val now = Instant.ofEpochSecond(333444555L)
val transactionClock = TransactionClock(now)
newTransactionStorage(clock = transactionClock)
val transaction = newTransaction(notarySig = false)
transactionStorage.finalizeTransaction(transaction,
FlowTransactionMetadata(ALICE_NAME))
readTransactionFromDB(transaction.id).let {
assertEquals(VERIFIED, it.status)
assertEquals(ALICE_NAME.toString(), it.initiator)
assertEquals(StatesToRecord.ONLY_RELEVANT, it.statesToRecord)
}
}
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `finalize transaction with extra signatures after recording transaction as un-notarised`() { fun `finalize transaction with extra signatures after recording transaction as un-notarised`() {
val now = Instant.ofEpochSecond(333444555L) val now = Instant.ofEpochSecond(333444555L)
val transactionClock = TransactionClock(now) val transactionClock = TransactionClock(now)
newTransactionStorage(clock = transactionClock) newTransactionStorage(clock = transactionClock)
val transaction = newTransaction(notarySig = false) val transaction = newTransaction(notarySig = false)
transactionStorage.addUnnotarisedTransaction(transaction) transactionStorage.addUnnotarisedTransaction(transaction, FlowTransactionMetadata(ALICE.party.name))
assertNull(transactionStorage.getTransaction(transaction.id)) assertNull(transactionStorage.getTransaction(transaction.id))
assertEquals(MISSING_NOTARY_SIG, readTransactionFromDB(transaction.id).status) assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status)
val notarySig = notarySig(transaction.id) val notarySig = notarySig(transaction.id)
transactionStorage.finalizeTransactionWithExtraSignatures(transaction, listOf(notarySig)) transactionStorage.finalizeTransactionWithExtraSignatures(transaction, listOf(notarySig))
readTransactionFromDB(transaction.id).let { readTransactionFromDB(transaction.id).let {
@ -167,9 +198,9 @@ class DBTransactionStorageTests {
val transactionClock = TransactionClock(now) val transactionClock = TransactionClock(now)
newTransactionStorage(clock = transactionClock) newTransactionStorage(clock = transactionClock)
val transaction = newTransaction(notarySig = false) val transaction = newTransaction(notarySig = false)
transactionStorage.addUnnotarisedTransaction(transaction) transactionStorage.addUnnotarisedTransaction(transaction, FlowTransactionMetadata(ALICE.party.name))
assertNull(transactionStorage.getTransaction(transaction.id)) assertNull(transactionStorage.getTransaction(transaction.id))
assertEquals(MISSING_NOTARY_SIG, readTransactionFromDB(transaction.id).status) assertEquals(IN_FLIGHT, readTransactionFromDB(transaction.id).status)
assertEquals(true, transactionStorage.removeUnnotarisedTransaction(transaction.id)) assertEquals(true, transactionStorage.removeUnnotarisedTransaction(transaction.id))
assertFailsWith<AssertionError> { readTransactionFromDB(transaction.id).status } assertFailsWith<AssertionError> { readTransactionFromDB(transaction.id).status }
@ -201,8 +232,8 @@ class DBTransactionStorageTests {
val transactionWithoutNotarySig = newTransaction(notarySig = false) val transactionWithoutNotarySig = newTransaction(notarySig = false)
// txn recorded as un-notarised (simulate ReceiverFinalityFlow in initial flow) // txn recorded as un-notarised (simulate ReceiverFinalityFlow in initial flow)
transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySig) transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySig, FlowTransactionMetadata(ALICE.party.name))
assertEquals(MISSING_NOTARY_SIG, readTransactionFromDB(transactionWithoutNotarySig.id).status) assertEquals(IN_FLIGHT, readTransactionFromDB(transactionWithoutNotarySig.id).status)
// txn then recorded as unverified (simulate ResolveTransactionFlow in follow-up flow) // txn then recorded as unverified (simulate ResolveTransactionFlow in follow-up flow)
val notarySig = notarySig(transactionWithoutNotarySig.id) val notarySig = notarySig(transactionWithoutNotarySig.id)
@ -232,8 +263,8 @@ class DBTransactionStorageTests {
val transactionWithoutNotarySigs = newTransaction(notarySig = false) val transactionWithoutNotarySigs = newTransaction(notarySig = false)
// txn recorded as un-notarised (simulate ReceiverFinalityFlow in initial flow) // txn recorded as un-notarised (simulate ReceiverFinalityFlow in initial flow)
transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySigs) transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySigs, FlowTransactionMetadata(ALICE.party.name))
assertEquals(MISSING_NOTARY_SIG, readTransactionFromDB(transactionWithoutNotarySigs.id).status) assertEquals(IN_FLIGHT, readTransactionFromDB(transactionWithoutNotarySigs.id).status)
// txn then recorded as unverified (simulate ResolveTransactionFlow in follow-up flow) // txn then recorded as unverified (simulate ResolveTransactionFlow in follow-up flow)
val notarySig = notarySig(transactionWithoutNotarySigs.id) val notarySig = notarySig(transactionWithoutNotarySigs.id)

View File

@ -102,6 +102,7 @@ include 'testing:cordapps:dbfailure:dbfcontracts'
include 'testing:cordapps:dbfailure:dbfworkflows' include 'testing:cordapps:dbfailure:dbfworkflows'
include 'testing:cordapps:missingmigration' include 'testing:cordapps:missingmigration'
include 'testing:cordapps:sleeping' include 'testing:cordapps:sleeping'
include 'testing:cordapps:cashobservers'
// Common libraries - start // Common libraries - start
include 'common-validation' include 'common-validation'

View File

@ -0,0 +1,17 @@
apply plugin: 'kotlin'
//apply plugin: 'net.corda.plugins.cordapp'
//apply plugin: 'net.corda.plugins.quasar-utils'
dependencies {
compile project(":core")
compile project(':finance:workflows')
}
jar {
baseName "testing-cashobservers-cordapp"
manifest {
// This JAR is part of Corda's testing framework.
// Driver will not include it as part of an out-of-process node.
attributes('Corda-Testing': true)
}
}

View File

@ -0,0 +1,48 @@
package net.corda.finance.test.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.Amount
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.OpaqueBytes
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.AbstractCashFlow
import net.corda.finance.issuedBy
import java.util.Currency
@StartableByRPC
@InitiatingFlow
class CashIssueWithObserversFlow(private val amount: Amount<Currency>,
private val issuerBankPartyRef: OpaqueBytes,
private val notary: Party,
private val observers: Set<Party>) : AbstractCashFlow<AbstractCashFlow.Result>(tracker()) {
@Suspendable
override fun call(): Result {
progressTracker.currentStep = Companion.GENERATING_TX
val builder = TransactionBuilder(notary)
val issuer = ourIdentity.ref(issuerBankPartyRef)
val signers = Cash().generateIssue(builder, amount.issuedBy(issuer), ourIdentity, notary)
progressTracker.currentStep = Companion.SIGNING_TX
val tx = serviceHub.signInitialTransaction(builder, signers)
progressTracker.currentStep = Companion.FINALISING_TX
val observerSessions = observers.map { initiateFlow(it) }
val notarised = finaliseTx(tx, observerSessions, "Unable to notarise issue")
return Result(notarised, ourIdentity)
}
}
@InitiatedBy(CashIssueWithObserversFlow::class)
class CashIssueReceiverFlowWithObservers(private val otherSide: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
if (!serviceHub.myInfo.isLegalIdentity(otherSide.counterparty)) {
subFlow(ReceiveFinalityFlow(otherSide))
}
}
}

View File

@ -55,14 +55,17 @@ open class MockTransactionStorage : WritableTransactionStorage, SingletonSeriali
} }
} }
override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata?): Boolean { override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata): Boolean {
return txns.putIfAbsent(transaction.id, TxHolder(transaction, status = TransactionStatus.MISSING_NOTARY_SIG)) == null return txns.putIfAbsent(transaction.id, TxHolder(transaction, status = TransactionStatus.IN_FLIGHT)) == null
} }
override fun removeUnnotarisedTransaction(id: SecureHash): Boolean { override fun removeUnnotarisedTransaction(id: SecureHash): Boolean {
return txns.remove(id) != null return txns.remove(id) != null
} }
override fun finalizeTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata) =
addTransaction(transaction)
override fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection<TransactionSignature>): Boolean { override fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection<TransactionSignature>): Boolean {
val current = txns.replace(transaction.id, TxHolder(transaction, status = TransactionStatus.VERIFIED)) val current = txns.replace(transaction.id, TxHolder(transaction, status = TransactionStatus.VERIFIED))
return if (current != null) { return if (current != null) {

View File

@ -139,11 +139,13 @@ data class TestTransactionDSLInterpreter private constructor(
override val attachmentsClassLoaderCache: AttachmentsClassLoaderCache = AttachmentsClassLoaderCacheImpl(TestingNamedCacheFactory()) override val attachmentsClassLoaderCache: AttachmentsClassLoaderCache = AttachmentsClassLoaderCacheImpl(TestingNamedCacheFactory())
override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata?) {} override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata) {}
override fun removeUnnotarisedTransaction(id: SecureHash) {} override fun removeUnnotarisedTransaction(id: SecureHash) {}
override fun finalizeTransactionWithExtraSignatures(txn: SignedTransaction, sigs: Collection<TransactionSignature>, statesToRecord: StatesToRecord) {} override fun finalizeTransactionWithExtraSignatures(txn: SignedTransaction, sigs: Collection<TransactionSignature>, statesToRecord: StatesToRecord) {}
override fun finalizeTransaction(txn: SignedTransaction, statesToRecord: StatesToRecord, metadata: FlowTransactionMetadata) {}
} }
private fun copy(): TestTransactionDSLInterpreter = private fun copy(): TestTransactionDSLInterpreter =