mirror of
https://github.com/corda/corda.git
synced 2025-04-07 11:27:01 +00:00
ENT-6875 Two Phase Finality Flow - improve ledger consistency & recoverability (#7290)
This commit is contained in:
parent
021c70143b
commit
d2900d54ab
@ -1,30 +1,82 @@
|
||||
package net.corda.coretests.flows
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.natpryce.hamkrest.and
|
||||
import com.natpryce.hamkrest.assertion.assertThat
|
||||
import net.corda.core.contracts.Amount
|
||||
import net.corda.core.contracts.PartyAndReference
|
||||
import net.corda.core.contracts.StateAndContract
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.TransactionVerificationException
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.flows.FinalityFlow
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.FlowTransactionMetadata
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.NotaryError
|
||||
import net.corda.core.flows.NotaryException
|
||||
import net.corda.core.flows.NotarySigCheck
|
||||
import net.corda.core.flows.ReceiveFinalityFlow
|
||||
import net.corda.core.flows.ReceiveTransactionFlow
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.flows.TransactionStatus
|
||||
import net.corda.core.flows.UnexpectedFlowEndException
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.FetchDataFlow
|
||||
import net.corda.core.internal.PLATFORM_VERSION
|
||||
import net.corda.core.internal.PlatformVersionSwitches
|
||||
import net.corda.core.internal.ServiceHubCoreInternal
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.coretests.flows.WithFinality.FinalityInvoker
|
||||
import net.corda.finance.POUNDS
|
||||
import net.corda.finance.contracts.asset.Cash
|
||||
import net.corda.finance.issuedBy
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.coretesting.internal.matchers.flow.willReturn
|
||||
import net.corda.coretesting.internal.matchers.flow.willThrow
|
||||
import net.corda.testing.node.internal.*
|
||||
import net.corda.coretests.flows.WithFinality.FinalityInvoker
|
||||
import net.corda.finance.GBP
|
||||
import net.corda.finance.POUNDS
|
||||
import net.corda.finance.contracts.asset.Cash
|
||||
import net.corda.finance.flows.CashIssueFlow
|
||||
import net.corda.finance.flows.CashPaymentFlow
|
||||
import net.corda.finance.issuedBy
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.CHARLIE_NAME
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.node.internal.CustomCordapp
|
||||
import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
|
||||
import net.corda.testing.node.internal.FINANCE_CONTRACTS_CORDAPP
|
||||
import net.corda.testing.node.internal.FINANCE_WORKFLOWS_CORDAPP
|
||||
import net.corda.testing.node.internal.InternalMockNetwork
|
||||
import net.corda.testing.node.internal.InternalMockNodeParameters
|
||||
import net.corda.testing.node.internal.MOCK_VERSION_INFO
|
||||
import net.corda.testing.node.internal.TestCordappInternal
|
||||
import net.corda.testing.node.internal.TestStartedNode
|
||||
import net.corda.testing.node.internal.cordappWithPackages
|
||||
import net.corda.testing.node.internal.enclosedCordapp
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Test
|
||||
import java.sql.SQLException
|
||||
import java.util.Random
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertNull
|
||||
import kotlin.test.fail
|
||||
|
||||
class FinalityFlowTests : WithFinality {
|
||||
companion object {
|
||||
private val CHARLIE = TestIdentity(CHARLIE_NAME, 90).party
|
||||
}
|
||||
|
||||
override val mockNet = InternalMockNetwork(cordappsForAllNodes = listOf(FINANCE_CONTRACTS_CORDAPP, enclosedCordapp(),
|
||||
override val mockNet = InternalMockNetwork(cordappsForAllNodes = setOf(FINANCE_CONTRACTS_CORDAPP, FINANCE_WORKFLOWS_CORDAPP, DUMMY_CONTRACTS_CORDAPP, enclosedCordapp(),
|
||||
CustomCordapp(targetPlatformVersion = 3, classes = setOf(FinalityFlow::class.java))))
|
||||
|
||||
private val aliceNode = makeNode(ALICE_NAME)
|
||||
@ -62,7 +114,7 @@ class FinalityFlowTests : WithFinality {
|
||||
val stx = aliceNode.issuesCashTo(oldBob)
|
||||
@Suppress("DEPRECATION")
|
||||
aliceNode.startFlowAndRunNetwork(FinalityFlow(stx)).resultFuture.getOrThrow()
|
||||
assertThat(oldBob.services.validatedTransactions.getTransaction(stx.id)).isNotNull()
|
||||
assertThat(oldBob.services.validatedTransactions.getTransaction(stx.id)).isNotNull
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
@ -76,12 +128,262 @@ class FinalityFlowTests : WithFinality {
|
||||
oldRecipients = setOf(oldBob.info.singleIdentity())
|
||||
)).resultFuture
|
||||
resultFuture.getOrThrow()
|
||||
assertThat(newCharlie.services.validatedTransactions.getTransaction(stx.id)).isNotNull()
|
||||
assertThat(oldBob.services.validatedTransactions.getTransaction(stx.id)).isNotNull()
|
||||
assertThat(newCharlie.services.validatedTransactions.getTransaction(stx.id)).isNotNull
|
||||
assertThat(oldBob.services.validatedTransactions.getTransaction(stx.id)).isNotNull
|
||||
}
|
||||
|
||||
private fun createBob(cordapps: List<TestCordappInternal> = emptyList()): TestStartedNode {
|
||||
return mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME, additionalCordapps = cordapps))
|
||||
@Test(timeout=300_000)
|
||||
fun `two phase finality flow transaction`() {
|
||||
val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY)
|
||||
|
||||
val stx = aliceNode.startFlow(CashIssueFlow(Amount(1000L, GBP), OpaqueBytes.of(1), notary)).resultFuture.getOrThrow().stx
|
||||
aliceNode.startFlowAndRunNetwork(CashPaymentFlow(Amount(100, GBP), bobNode.info.singleIdentity())).resultFuture.getOrThrow()
|
||||
|
||||
assertThat(aliceNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
|
||||
assertThat(bobNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `two phase finality flow initiator to pre-2PF peer`() {
|
||||
val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY - 1)
|
||||
|
||||
val stx = aliceNode.startFlow(CashIssueFlow(Amount(1000L, GBP), OpaqueBytes.of(1), notary)).resultFuture.getOrThrow().stx
|
||||
aliceNode.startFlowAndRunNetwork(CashPaymentFlow(Amount(100, GBP), bobNode.info.singleIdentity())).resultFuture.getOrThrow()
|
||||
|
||||
assertThat(aliceNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
|
||||
assertThat(bobNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `pre-2PF initiator to two phase finality flow peer`() {
|
||||
val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY - 1)
|
||||
|
||||
val stx = bobNode.startFlow(CashIssueFlow(Amount(1000L, GBP), OpaqueBytes.of(1), notary)).resultFuture.getOrThrow().stx
|
||||
bobNode.startFlowAndRunNetwork(CashPaymentFlow(Amount(100, GBP), aliceNode.info.singleIdentity())).resultFuture.getOrThrow()
|
||||
|
||||
assertThat(aliceNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
|
||||
assertThat(bobNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `two phase finality flow double spend transaction`() {
|
||||
val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY)
|
||||
|
||||
val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow()
|
||||
val stx = aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
|
||||
|
||||
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusAlice)
|
||||
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusBob)
|
||||
|
||||
try {
|
||||
aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
|
||||
}
|
||||
catch (e: NotaryException) {
|
||||
val stxId = (e.error as NotaryError.Conflict).txId
|
||||
val (_, txnDsStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail()
|
||||
assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnDsStatusAlice)
|
||||
val (_, txnDsStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail()
|
||||
assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnDsStatusBob)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `two phase finality flow double spend transaction from pre-2PF initiator`() {
|
||||
val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY - 1)
|
||||
|
||||
val ref = bobNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow()
|
||||
val stx = bobNode.startFlowAndRunNetwork(SpendFlow(ref, aliceNode.info.singleIdentity())).resultFuture.getOrThrow()
|
||||
|
||||
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusAlice)
|
||||
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusBob)
|
||||
|
||||
try {
|
||||
bobNode.startFlowAndRunNetwork(SpendFlow(ref, aliceNode.info.singleIdentity())).resultFuture.getOrThrow()
|
||||
}
|
||||
catch (e: NotaryException) {
|
||||
val stxId = (e.error as NotaryError.Conflict).txId
|
||||
assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId))
|
||||
assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId))
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `two phase finality flow double spend transaction to pre-2PF peer`() {
|
||||
val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY - 1)
|
||||
|
||||
val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow()
|
||||
val stx = aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
|
||||
|
||||
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusAlice)
|
||||
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusBob)
|
||||
|
||||
try {
|
||||
aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
|
||||
}
|
||||
catch (e: NotaryException) {
|
||||
val stxId = (e.error as NotaryError.Conflict).txId
|
||||
val (_, txnDsStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail()
|
||||
assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnDsStatusAlice)
|
||||
assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId))
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `two phase finality flow speedy spender`() {
|
||||
val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY)
|
||||
|
||||
val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow()
|
||||
val notarisedStxn1 = aliceNode.startFlowAndRunNetwork(SpeedySpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
|
||||
|
||||
val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusAlice)
|
||||
val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail()
|
||||
assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnStatusBob)
|
||||
|
||||
// now lets attempt a new spend with the new output of the previous transaction
|
||||
val newStateRef = notarisedStxn1.coreTransaction.outRef<DummyContract.SingleOwnerState>(1)
|
||||
val notarisedStxn2 = aliceNode.startFlowAndRunNetwork(SpeedySpendFlow(newStateRef, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
|
||||
|
||||
// the original transaction is now finalised at Bob (despite the original flow not completing) because Bob resolved the
|
||||
// original transaction from Alice in the second transaction (and Alice had already notarised and finalised the original transaction)
|
||||
val (_, txnStatusBobAgain) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusBobAgain)
|
||||
|
||||
val (_, txnStatusAlice2) = aliceNode.services.validatedTransactions.getTransactionInternal(notarisedStxn2.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusAlice2)
|
||||
val (_, txnStatusBob2) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn2.id) ?: fail()
|
||||
assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnStatusBob2)
|
||||
|
||||
// Validate attempt at flow finalisation by Bob has no effect on outcome.
|
||||
val finaliseStxn1 = bobNode.startFlowAndRunNetwork(FinaliseSpeedySpendFlow(notarisedStxn1.id, notarisedStxn1.sigs)).resultFuture.getOrThrow()
|
||||
val (_, txnStatusBobYetAgain) = bobNode.services.validatedTransactions.getTransactionInternal(finaliseStxn1.id) ?: fail()
|
||||
assertEquals(TransactionStatus.VERIFIED, txnStatusBobYetAgain)
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class IssueFlow(val notary: Party) : FlowLogic<StateAndRef<DummyContract.SingleOwnerState>>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call(): StateAndRef<DummyContract.SingleOwnerState> {
|
||||
val partyAndReference = PartyAndReference(ourIdentity, OpaqueBytes.of(1))
|
||||
val txBuilder = DummyContract.generateInitial(Random().nextInt(), notary, partyAndReference)
|
||||
val signedTransaction = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey)
|
||||
val notarised = subFlow(FinalityFlow(signedTransaction, emptySet<FlowSession>()))
|
||||
return notarised.coreTransaction.outRef(0)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
class SpendFlow(private val stateAndRef: StateAndRef<DummyContract.SingleOwnerState>, private val newOwner: Party) : FlowLogic<SignedTransaction>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
val txBuilder = DummyContract.move(stateAndRef, newOwner)
|
||||
val signedTransaction = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey)
|
||||
val sessionWithCounterParty = initiateFlow(newOwner)
|
||||
sessionWithCounterParty.sendAndReceive<String>("initial-message")
|
||||
return subFlow(FinalityFlow(signedTransaction, setOf(sessionWithCounterParty)))
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(SpendFlow::class)
|
||||
class AcceptSpendFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
otherSide.receive<String>()
|
||||
otherSide.send("initial-response")
|
||||
|
||||
subFlow(ReceiveFinalityFlow(otherSide))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This flow allows an Initiator to race ahead of a Receiver when using Two Phase Finality.
|
||||
* The initiator transaction will be finalised, so output states can be used in a follow-up transaction.
|
||||
* The receiver transaction will not be finalised, causing ledger inconsistency.
|
||||
*/
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
class SpeedySpendFlow(private val stateAndRef: StateAndRef<DummyContract.SingleOwnerState>, private val newOwner: Party) : FlowLogic<SignedTransaction>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
val newState = StateAndContract(DummyContract.SingleOwnerState(99999, ourIdentity), DummyContract.PROGRAM_ID)
|
||||
val txBuilder = DummyContract.move(stateAndRef, newOwner).withItems(newState)
|
||||
val signedTransaction = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey)
|
||||
val sessionWithCounterParty = initiateFlow(newOwner)
|
||||
try {
|
||||
subFlow(FinalityFlow(signedTransaction, setOf(sessionWithCounterParty)))
|
||||
}
|
||||
catch (e: FinalisationFailedException) {
|
||||
// expected (transaction has been notarised by Initiator)
|
||||
return e.notarisedTxn
|
||||
}
|
||||
return signedTransaction
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(SpeedySpendFlow::class)
|
||||
class AcceptSpeedySpendFlow(private val otherSideSession: FlowSession) : FlowLogic<SignedTransaction>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
// Mimic ReceiveFinalityFlow but fail to finalise
|
||||
try {
|
||||
val stx = subFlow(ReceiveTransactionFlow(otherSideSession,
|
||||
checkSufficientSignatures = false, statesToRecord = StatesToRecord.ONLY_RELEVANT, deferredAck = true))
|
||||
require(NotarySigCheck.needsNotarySignature(stx))
|
||||
logger.info("Peer recording transaction without notary signature.")
|
||||
(serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx,
|
||||
FlowTransactionMetadata(otherSideSession.counterparty.name, StatesToRecord.ONLY_RELEVANT))
|
||||
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (overrideAutoAck)
|
||||
logger.info("Peer recorded transaction without notary signature.")
|
||||
|
||||
val notarySignatures = otherSideSession.receive<List<TransactionSignature>>()
|
||||
.unwrap { it }
|
||||
logger.info("Peer received notarised signature.")
|
||||
(serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx + notarySignatures, notarySignatures, StatesToRecord.ONLY_RELEVANT)
|
||||
throw FinalisationFailedException(stx + notarySignatures)
|
||||
}
|
||||
catch (e: SQLException) {
|
||||
logger.error("Peer failure upon recording or finalising transaction: $e")
|
||||
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (overrideAutoAck)
|
||||
throw UnexpectedFlowEndException("Peer failure upon recording or finalising transaction.", e.cause)
|
||||
}
|
||||
catch (uae: TransactionVerificationException.UntrustedAttachmentsException) {
|
||||
logger.error("Peer failure upon receiving transaction: $uae")
|
||||
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (overrideAutoAck)
|
||||
throw uae
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class FinaliseSpeedySpendFlow(val id: SecureHash, val sigs: List<TransactionSignature>) : FlowLogic<SignedTransaction>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
// Mimic ReceiveFinalityFlow finalisation
|
||||
val stx = serviceHub.validatedTransactions.getTransaction(id) ?: throw FlowException("Missing transaction: $id")
|
||||
(serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx + sigs, sigs, StatesToRecord.ONLY_RELEVANT)
|
||||
logger.info("Peer finalised transaction with notary signature.")
|
||||
|
||||
return stx + sigs
|
||||
}
|
||||
}
|
||||
|
||||
class FinalisationFailedException(val notarisedTxn: SignedTransaction) : FlowException("Failed to finalise transaction with notary signature.")
|
||||
|
||||
private fun createBob(cordapps: List<TestCordappInternal> = emptyList(), platformVersion: Int = PLATFORM_VERSION): TestStartedNode {
|
||||
return mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME, additionalCordapps = cordapps,
|
||||
version = MOCK_VERSION_INFO.copy(platformVersion = platformVersion)))
|
||||
}
|
||||
|
||||
private fun TestStartedNode.issuesCashTo(recipient: TestStartedNode): SignedTransaction {
|
||||
|
@ -3,9 +3,14 @@ package net.corda.core.flows
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.CordaInternal
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.crypto.isFulfilledBy
|
||||
import net.corda.core.flows.NotarySigCheck.needsNotarySignature
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.identity.groupAbstractPartyByWellKnownParty
|
||||
import net.corda.core.internal.FetchDataFlow
|
||||
import net.corda.core.internal.PlatformVersionSwitches
|
||||
import net.corda.core.internal.ServiceHubCoreInternal
|
||||
import net.corda.core.internal.pushToLoggingContext
|
||||
import net.corda.core.internal.telemetry.telemetryServiceInternal
|
||||
import net.corda.core.internal.warnOnce
|
||||
@ -15,6 +20,7 @@ import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.unwrap
|
||||
|
||||
/**
|
||||
* Verifies the given transaction, then sends it to the named notary. If the notary agrees that the transaction
|
||||
@ -133,10 +139,18 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
|
||||
override fun childProgressTracker() = NotaryFlow.Client.tracker()
|
||||
}
|
||||
|
||||
object BROADCASTING : ProgressTracker.Step("Broadcasting transaction to participants")
|
||||
@Suppress("ClassNaming")
|
||||
object RECORD_UNNOTARISED : ProgressTracker.Step("Recording un-notarised transaction locally")
|
||||
@Suppress("ClassNaming")
|
||||
object BROADCASTING_PRE_NOTARISATION : ProgressTracker.Step("Broadcasting un-notarised transaction")
|
||||
@Suppress("ClassNaming")
|
||||
object BROADCASTING_POST_NOTARISATION : ProgressTracker.Step("Broadcasting notary signature")
|
||||
@Suppress("ClassNaming")
|
||||
object FINALISING_TRANSACTION : ProgressTracker.Step("Finalising transaction locally")
|
||||
object BROADCASTING : ProgressTracker.Step("Broadcasting notarised transaction to other participants")
|
||||
|
||||
@JvmStatic
|
||||
fun tracker() = ProgressTracker(NOTARISING, BROADCASTING)
|
||||
fun tracker() = ProgressTracker(RECORD_UNNOTARISED, BROADCASTING_PRE_NOTARISATION, NOTARISING, BROADCASTING_POST_NOTARISATION, FINALISING_TRANSACTION, BROADCASTING)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
@ -155,7 +169,6 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
|
||||
// the point where subFlow is invoked, as that minimizes the checkpointing work to be done.
|
||||
//
|
||||
// Lookup the resolved transactions and use them to map each signed transaction to the list of participants.
|
||||
// Then send to the notary if needed, record locally and distribute.
|
||||
|
||||
transaction.pushToLoggingContext()
|
||||
logCommandData()
|
||||
@ -173,32 +186,121 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
|
||||
}
|
||||
}
|
||||
|
||||
val notarised = notariseAndRecord()
|
||||
// Recoverability
|
||||
// As of platform version 13 we introduce a 2-phase finality protocol whereby
|
||||
// - record un-notarised transaction locally and broadcast to external participants to record
|
||||
// - notarise transaction
|
||||
// - broadcast notary signature to external participants (finalise remotely)
|
||||
// - finalise locally
|
||||
|
||||
progressTracker.currentStep = BROADCASTING
|
||||
val (oldPlatformSessions, newPlatformSessions) = sessions.partition {
|
||||
serviceHub.networkMapCache.getNodeByLegalName(it.counterparty.name)?.platformVersion!! < PlatformVersionSwitches.TWO_PHASE_FINALITY
|
||||
}
|
||||
|
||||
if (newApi) {
|
||||
oldV3Broadcast(notarised, oldParticipants.toSet())
|
||||
for (session in sessions) {
|
||||
val useTwoPhaseFinality = serviceHub.myInfo.platformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY
|
||||
if (useTwoPhaseFinality && needsNotarySignature(transaction)) {
|
||||
recordLocallyAndBroadcast(newPlatformSessions, transaction)
|
||||
}
|
||||
|
||||
val stxn = notariseOrRecord()
|
||||
val notarySignatures = stxn.sigs - transaction.sigs.toSet()
|
||||
if (notarySignatures.isNotEmpty()) {
|
||||
if (useTwoPhaseFinality && newPlatformSessions.isNotEmpty()) {
|
||||
broadcastSignaturesAndFinalise(newPlatformSessions, notarySignatures)
|
||||
}
|
||||
else {
|
||||
progressTracker.currentStep = FINALISING_TRANSACTION
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) {
|
||||
(serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(transaction, notarySignatures, statesToRecord)
|
||||
logger.info("Finalised transaction locally.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!useTwoPhaseFinality || !needsNotarySignature(transaction)) {
|
||||
broadcastToOtherParticipants(externalTxParticipants, newPlatformSessions + oldPlatformSessions, stxn)
|
||||
} else if (useTwoPhaseFinality && oldPlatformSessions.isNotEmpty()) {
|
||||
broadcastToOtherParticipants(externalTxParticipants, oldPlatformSessions, stxn)
|
||||
}
|
||||
|
||||
return stxn
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun recordLocallyAndBroadcast(sessions: Collection<FlowSession>, tx: SignedTransaction) {
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordLocallyAndBroadcast", flowLogic = this) {
|
||||
recordUnnotarisedTransaction(tx)
|
||||
logger.info("Recorded transaction without notary signature locally.")
|
||||
progressTracker.currentStep = BROADCASTING_PRE_NOTARISATION
|
||||
sessions.forEach { session ->
|
||||
try {
|
||||
subFlow(SendTransactionFlow(session, notarised))
|
||||
logger.info("Party ${session.counterparty} received the transaction.")
|
||||
logger.debug { "Sending transaction to party $session." }
|
||||
subFlow(SendTransactionFlow(session, tx))
|
||||
} catch (e: UnexpectedFlowEndException) {
|
||||
throw UnexpectedFlowEndException(
|
||||
"${session.counterparty} has finished prematurely and we're trying to send them the finalised transaction. " +
|
||||
"${session.counterparty} has finished prematurely and we're trying to send them a transaction without notary signature. " +
|
||||
"Did they forget to call ReceiveFinalityFlow? (${e.message})",
|
||||
e.cause,
|
||||
e.originalErrorId
|
||||
)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
oldV3Broadcast(notarised, (externalTxParticipants + oldParticipants).toSet())
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("All parties received the transaction successfully.")
|
||||
@Suspendable
|
||||
private fun broadcastSignaturesAndFinalise(sessions: Collection<FlowSession>, notarySignatures: List<TransactionSignature>) {
|
||||
progressTracker.currentStep = BROADCASTING_POST_NOTARISATION
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#broadcastSignaturesAndFinalise", flowLogic = this) {
|
||||
logger.info("Transaction notarised and broadcasting notary signature.")
|
||||
sessions.forEach { session ->
|
||||
try {
|
||||
logger.debug { "Sending notary signature to party $session." }
|
||||
session.send(notarySignatures)
|
||||
// remote will finalise txn with notary signature
|
||||
} catch (e: UnexpectedFlowEndException) {
|
||||
throw UnexpectedFlowEndException(
|
||||
"${session.counterparty} has finished prematurely and we're trying to send them the notary signature. " +
|
||||
"Did they forget to call ReceiveFinalityFlow? (${e.message})",
|
||||
e.cause,
|
||||
e.originalErrorId
|
||||
)
|
||||
}
|
||||
}
|
||||
progressTracker.currentStep = FINALISING_TRANSACTION
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) {
|
||||
(serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(transaction, notarySignatures, statesToRecord)
|
||||
logger.info("Finalised transaction locally with notary signature.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return notarised
|
||||
@Suspendable
|
||||
private fun broadcastToOtherParticipants(externalTxParticipants: Set<Party>, sessions: Collection<FlowSession>, tx: SignedTransaction) {
|
||||
if (externalTxParticipants.isEmpty() && sessions.isEmpty() && oldParticipants.isEmpty()) return
|
||||
progressTracker.currentStep = BROADCASTING
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#broadcastToOtherParticipants", flowLogic = this) {
|
||||
logger.info("Broadcasting complete transaction to other participants.")
|
||||
if (newApi) {
|
||||
oldV3Broadcast(tx, oldParticipants.toSet())
|
||||
for (session in sessions) {
|
||||
try {
|
||||
logger.debug { "Sending transaction to party $session." }
|
||||
subFlow(SendTransactionFlow(session, tx))
|
||||
} catch (e: UnexpectedFlowEndException) {
|
||||
throw UnexpectedFlowEndException(
|
||||
"${session.counterparty} has finished prematurely and we're trying to send them the finalised transaction. " +
|
||||
"Did they forget to call ReceiveFinalityFlow? (${e.message})",
|
||||
e.cause,
|
||||
e.originalErrorId
|
||||
)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
oldV3Broadcast(tx, (externalTxParticipants + oldParticipants).toSet())
|
||||
}
|
||||
logger.info("Broadcasted complete transaction to other participants.")
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
@ -221,22 +323,39 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun notariseAndRecord(): SignedTransaction {
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#notariseAndRecord", flowLogic = this) {
|
||||
val notarised = if (needsNotarySignature(transaction)) {
|
||||
private fun recordTransactionLocally(tx: SignedTransaction): SignedTransaction {
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordTransactionLocally", flowLogic = this) {
|
||||
serviceHub.recordTransactions(statesToRecord, listOf(tx))
|
||||
logger.info("Recorded transaction locally.")
|
||||
return tx
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun recordUnnotarisedTransaction(tx: SignedTransaction): SignedTransaction {
|
||||
progressTracker.currentStep = RECORD_UNNOTARISED
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) {
|
||||
(serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(tx,
|
||||
FlowTransactionMetadata(
|
||||
serviceHub.myInfo.legalIdentities.first().name,
|
||||
statesToRecord,
|
||||
sessions.map { it.counterparty.name }.toSet()))
|
||||
return tx
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun notariseOrRecord(): SignedTransaction {
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#notariseOrRecord", flowLogic = this) {
|
||||
return if (needsNotarySignature(transaction)) {
|
||||
progressTracker.currentStep = NOTARISING
|
||||
val notarySignatures = subFlow(NotaryFlow.Client(transaction, skipVerification = true))
|
||||
transaction + notarySignatures
|
||||
} else {
|
||||
logger.info("No need to notarise this transaction.")
|
||||
logger.info("No need to notarise this transaction. Recording locally.")
|
||||
recordTransactionLocally(transaction)
|
||||
transaction
|
||||
}
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#notariseAndRecord:recordTransactions", flowLogic = this) {
|
||||
logger.info("Recording transaction locally.")
|
||||
serviceHub.recordTransactions(statesToRecord, listOf(notarised))
|
||||
logger.info("Recorded transaction locally successfully.")
|
||||
}
|
||||
return notarised
|
||||
}
|
||||
}
|
||||
|
||||
@ -268,6 +387,20 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
|
||||
}
|
||||
}
|
||||
|
||||
object NotarySigCheck {
|
||||
fun needsNotarySignature(stx: SignedTransaction): Boolean {
|
||||
val wtx = stx.tx
|
||||
val needsNotarisation = wtx.inputs.isNotEmpty() || wtx.references.isNotEmpty() || wtx.timeWindow != null
|
||||
return needsNotarisation && hasNoNotarySignature(stx)
|
||||
}
|
||||
|
||||
private fun hasNoNotarySignature(stx: SignedTransaction): Boolean {
|
||||
val notaryKey = stx.tx.notary?.owningKey
|
||||
val signers = stx.sigs.asSequence().map { it.by }.toSet()
|
||||
return notaryKey?.isFulfilledBy(signers) != true
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The receiving counterpart to [FinalityFlow].
|
||||
*
|
||||
@ -285,15 +418,34 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
|
||||
class ReceiveFinalityFlow @JvmOverloads constructor(private val otherSideSession: FlowSession,
|
||||
private val expectedTxId: SecureHash? = null,
|
||||
private val statesToRecord: StatesToRecord = ONLY_RELEVANT) : FlowLogic<SignedTransaction>() {
|
||||
@Suppress("ComplexMethod")
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
return subFlow(object : ReceiveTransactionFlow(otherSideSession, checkSufficientSignatures = true, statesToRecord = statesToRecord) {
|
||||
override fun checkBeforeRecording(stx: SignedTransaction) {
|
||||
require(expectedTxId == null || expectedTxId == stx.id) {
|
||||
"We expected to receive transaction with ID $expectedTxId but instead got ${stx.id}. Transaction was" +
|
||||
"not recorded and nor its states sent to the vault."
|
||||
}
|
||||
val stx = subFlow(ReceiveTransactionFlow(otherSideSession, checkSufficientSignatures = false, statesToRecord = statesToRecord, deferredAck = true))
|
||||
|
||||
val fromTwoPhaseFinalityNode = serviceHub.networkMapCache.getNodeByLegalName(otherSideSession.counterparty.name)?.platformVersion!! >= PlatformVersionSwitches.TWO_PHASE_FINALITY
|
||||
if (fromTwoPhaseFinalityNode && needsNotarySignature(stx)) {
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) {
|
||||
logger.debug { "Peer recording transaction without notary signature." }
|
||||
(serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx,
|
||||
FlowTransactionMetadata(otherSideSession.counterparty.name, statesToRecord))
|
||||
}
|
||||
})
|
||||
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)
|
||||
logger.info("Peer recorded transaction without notary signature. Waiting to receive notary signature.")
|
||||
val notarySignatures = otherSideSession.receive<List<TransactionSignature>>()
|
||||
.unwrap { it }
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) {
|
||||
logger.debug { "Peer received notarised signature." }
|
||||
(serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx, notarySignatures, statesToRecord)
|
||||
logger.info("Peer finalised transaction with notary signature.")
|
||||
}
|
||||
} else {
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordTransactions", flowLogic = this) {
|
||||
serviceHub.recordTransactions(statesToRecord, setOf(stx))
|
||||
}
|
||||
otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)
|
||||
logger.info("Peer successfully recorded received transaction.")
|
||||
}
|
||||
return stx
|
||||
}
|
||||
}
|
||||
|
37
core/src/main/kotlin/net/corda/core/flows/FlowTransaction.kt
Normal file
37
core/src/main/kotlin/net/corda/core/flows/FlowTransaction.kt
Normal file
@ -0,0 +1,37 @@
|
||||
package net.corda.core.flows
|
||||
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import java.time.Instant
|
||||
|
||||
/**
|
||||
* Flow data object representing key information required for recovery.
|
||||
*/
|
||||
|
||||
@CordaSerializable
|
||||
data class FlowTransaction(
|
||||
val stateMachineRunId: StateMachineRunId,
|
||||
val txId: String,
|
||||
val status: TransactionStatus,
|
||||
val signatures: ByteArray?,
|
||||
val timestamp: Instant,
|
||||
val metadata: FlowTransactionMetadata?) {
|
||||
|
||||
fun isInitiator(myCordaX500Name: CordaX500Name) =
|
||||
this.metadata?.initiator == myCordaX500Name
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
data class FlowTransactionMetadata(
|
||||
val initiator: CordaX500Name,
|
||||
val statesToRecord: StatesToRecord? = StatesToRecord.ONLY_RELEVANT,
|
||||
val peers: Set<CordaX500Name>? = null
|
||||
)
|
||||
|
||||
@CordaSerializable
|
||||
enum class TransactionStatus {
|
||||
UNVERIFIED,
|
||||
VERIFIED,
|
||||
MISSING_NOTARY_SIG;
|
||||
}
|
@ -27,10 +27,20 @@ import java.security.SignatureException
|
||||
* @property otherSideSession session to the other side which is calling [SendTransactionFlow].
|
||||
* @property checkSufficientSignatures if true checks all required signatures are present. See [SignedTransaction.verify].
|
||||
* @property statesToRecord which transaction states should be recorded in the vault, if any.
|
||||
* @property deferredAck if set then the caller of this flow is responsible for explicitly sending a FetchDataFlow.Request.End
|
||||
* acknowledgement to indicate transaction resolution is complete. See usage within [FinalityFlow].
|
||||
* Not recommended for 3rd party use.
|
||||
*/
|
||||
open class ReceiveTransactionFlow @JvmOverloads constructor(private val otherSideSession: FlowSession,
|
||||
private val checkSufficientSignatures: Boolean = true,
|
||||
private val statesToRecord: StatesToRecord = StatesToRecord.NONE) : FlowLogic<SignedTransaction>() {
|
||||
open class ReceiveTransactionFlow constructor(private val otherSideSession: FlowSession,
|
||||
private val checkSufficientSignatures: Boolean = true,
|
||||
private val statesToRecord: StatesToRecord = StatesToRecord.NONE,
|
||||
private val deferredAck: Boolean = false) : FlowLogic<SignedTransaction>() {
|
||||
@JvmOverloads constructor(
|
||||
otherSideSession: FlowSession,
|
||||
checkSufficientSignatures: Boolean = true,
|
||||
statesToRecord: StatesToRecord = StatesToRecord.NONE
|
||||
) : this(otherSideSession, checkSufficientSignatures, statesToRecord, false)
|
||||
|
||||
@Suppress("KDocMissingDocumentation")
|
||||
@Suspendable
|
||||
@Throws(SignatureException::class,
|
||||
@ -47,7 +57,7 @@ open class ReceiveTransactionFlow @JvmOverloads constructor(private val otherSid
|
||||
it.pushToLoggingContext()
|
||||
logger.info("Received transaction acknowledgement request from party ${otherSideSession.counterparty}.")
|
||||
checkParameterHash(it.networkParametersHash)
|
||||
subFlow(ResolveTransactionsFlow(it, otherSideSession, statesToRecord))
|
||||
subFlow(ResolveTransactionsFlow(it, otherSideSession, statesToRecord, deferredAck))
|
||||
logger.info("Transaction dependencies resolution completed.")
|
||||
try {
|
||||
it.verify(serviceHub, checkSufficientSignatures)
|
||||
|
@ -18,4 +18,5 @@ object PlatformVersionSwitches {
|
||||
const val ENABLE_P2P_COMPRESSION = 7
|
||||
const val RESTRICTED_DATABASE_OPERATIONS = 7
|
||||
const val CERTIFICATE_ROTATION = 9
|
||||
const val TWO_PHASE_FINALITY = 13
|
||||
}
|
@ -21,7 +21,8 @@ class ResolveTransactionsFlow private constructor(
|
||||
val initialTx: SignedTransaction?,
|
||||
val txHashes: Set<SecureHash>,
|
||||
val otherSide: FlowSession,
|
||||
val statesToRecord: StatesToRecord
|
||||
val statesToRecord: StatesToRecord,
|
||||
val deferredAck: Boolean = false
|
||||
) : FlowLogic<Unit>() {
|
||||
|
||||
constructor(txHashes: Set<SecureHash>, otherSide: FlowSession, statesToRecord: StatesToRecord = StatesToRecord.NONE)
|
||||
@ -36,6 +37,9 @@ class ResolveTransactionsFlow private constructor(
|
||||
constructor(transaction: SignedTransaction, otherSide: FlowSession, statesToRecord: StatesToRecord = StatesToRecord.NONE)
|
||||
: this(transaction, transaction.dependencies, otherSide, statesToRecord)
|
||||
|
||||
constructor(transaction: SignedTransaction, otherSide: FlowSession, statesToRecord: StatesToRecord = StatesToRecord.NONE, deferredAck: Boolean = false)
|
||||
: this(transaction, transaction.dependencies, otherSide, statesToRecord, deferredAck)
|
||||
|
||||
private var fetchNetParamsFromCounterpart = false
|
||||
|
||||
@Suppress("MagicNumber")
|
||||
@ -60,8 +64,10 @@ class ResolveTransactionsFlow private constructor(
|
||||
val resolver = (serviceHub as ServiceHubCoreInternal).createTransactionsResolver(this)
|
||||
resolver.downloadDependencies(batchMode)
|
||||
|
||||
logger.trace { "ResolveTransactionsFlow: Sending END." }
|
||||
otherSide.send(FetchDataFlow.Request.End) // Finish fetching data.
|
||||
if (!deferredAck) {
|
||||
logger.trace { "ResolveTransactionsFlow: Sending END." }
|
||||
otherSide.send(FetchDataFlow.Request.End) // Finish fetching data.
|
||||
}
|
||||
|
||||
// If transaction resolution is performed for a transaction where some states are relevant, then those should be
|
||||
// recorded if this has not already occurred.
|
||||
|
@ -2,10 +2,13 @@ package net.corda.core.internal
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.DeleteForDJVM
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.flows.FlowTransactionMetadata
|
||||
import net.corda.core.internal.notary.NotaryService
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.serialization.internal.AttachmentsClassLoaderCache
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import java.util.concurrent.ExecutorService
|
||||
|
||||
// TODO: This should really be called ServiceHubInternal but that name is already taken by net.corda.node.services.api.ServiceHubInternal.
|
||||
@ -24,6 +27,24 @@ interface ServiceHubCoreInternal : ServiceHub {
|
||||
fun createTransactionsResolver(flow: ResolveTransactionsFlow): TransactionsResolver
|
||||
|
||||
val attachmentsClassLoaderCache: AttachmentsClassLoaderCache
|
||||
|
||||
/**
|
||||
* Stores [SignedTransaction] and participant signatures without the notary signature in the local transaction storage.
|
||||
* Optionally add finality flow recovery metadata.
|
||||
* This is expected to be run within a database transaction.
|
||||
*
|
||||
* @param txn The transaction to record.
|
||||
*/
|
||||
fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata?= null)
|
||||
|
||||
/**
|
||||
* Stores [SignedTransaction] with extra signatures in the local transaction storage
|
||||
*
|
||||
* @param sigs The signatures to add to the transaction.
|
||||
* @param txn The transactions to record.
|
||||
* @param statesToRecord how the vault should treat the output states of the transaction.
|
||||
*/
|
||||
fun finalizeTransactionWithExtraSignatures(txn: SignedTransaction, sigs: Collection<TransactionSignature>, statesToRecord: StatesToRecord)
|
||||
}
|
||||
|
||||
interface TransactionsResolver {
|
||||
|
@ -336,8 +336,8 @@ class FlowReloadAfterCheckpointTest {
|
||||
.toSet()
|
||||
.single()
|
||||
reloads.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
|
||||
assertEquals(7, reloads.filter { it == flowStartedByAlice }.size)
|
||||
assertEquals(6, reloads.filter { it == flowStartedByBob }.size)
|
||||
assertEquals(8, reloads.filter { it == flowStartedByAlice }.size)
|
||||
assertEquals(7, reloads.filter { it == flowStartedByBob }.size)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -19,7 +19,10 @@ import net.corda.core.flows.ReceiveFinalityFlow
|
||||
import net.corda.core.flows.SignTransactionFlow
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.flows.UnexpectedFlowEndException
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.PLATFORM_VERSION
|
||||
import net.corda.core.internal.PlatformVersionSwitches
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.messaging.StateMachineUpdate
|
||||
import net.corda.core.messaging.startFlow
|
||||
@ -40,14 +43,22 @@ import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.node.User
|
||||
import net.corda.testing.node.internal.CustomCordapp
|
||||
import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
|
||||
import net.corda.testing.node.internal.InternalMockNetwork
|
||||
import net.corda.testing.node.internal.InternalMockNodeParameters
|
||||
import net.corda.testing.node.internal.MOCK_VERSION_INFO
|
||||
import net.corda.testing.node.internal.TestCordappInternal
|
||||
import net.corda.testing.node.internal.TestStartedNode
|
||||
import net.corda.testing.node.internal.enclosedCordapp
|
||||
import net.corda.testing.node.internal.findCordapp
|
||||
import net.corda.testing.node.testContext
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.sql.SQLException
|
||||
import java.util.*
|
||||
import java.util.Random
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.TimeoutException
|
||||
@ -59,6 +70,11 @@ class FlowHospitalTest {
|
||||
|
||||
private val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
|
||||
|
||||
companion object {
|
||||
private val mockNet = InternalMockNetwork(cordappsForAllNodes = setOf(DUMMY_CONTRACTS_CORDAPP, enclosedCordapp(),
|
||||
CustomCordapp(targetPlatformVersion = 3, classes = setOf(FinalityFlow::class.java))))
|
||||
}
|
||||
|
||||
@Before
|
||||
fun before() {
|
||||
SpendStateAndCatchDoubleSpendResponderFlow.exceptionSeenInUserFlow = false
|
||||
@ -238,6 +254,78 @@ class FlowHospitalTest {
|
||||
assertTrue(SpendStateAndCatchDoubleSpendResponderFlow.exceptionSeenInUserFlow)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `catching a notary error - two phase finality flow initiator to pre-2PF peer`() {
|
||||
var dischargedCounter = 0
|
||||
StaffedFlowHospital.onFlowErrorPropagated.add { _, _ ->
|
||||
++dischargedCounter
|
||||
}
|
||||
val aliceNode = createNode(ALICE_NAME, platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY)
|
||||
val bobNode = createNode(BOB_NAME, platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY - 1)
|
||||
|
||||
val handle = aliceNode.services.startFlow(CreateTransactionFlow(bobNode.info.singleIdentity()), testContext())
|
||||
mockNet.runNetwork()
|
||||
val ref = handle.getOrThrow().resultFuture.get()
|
||||
aliceNode.services.startFlow(SpendStateAndCatchDoubleSpendFlow(bobNode.info.singleIdentity(), ref), testContext()).getOrThrow()
|
||||
aliceNode.services.startFlow(SpendStateAndCatchDoubleSpendFlow(bobNode.info.singleIdentity(), ref), testContext()).getOrThrow()
|
||||
mockNet.runNetwork()
|
||||
|
||||
// 1 is the notary failing to notarise and propagating the error
|
||||
// 2 is the receiving flow failing due to the unexpected session end error
|
||||
assertEquals(2, dischargedCounter)
|
||||
assertTrue(SpendStateAndCatchDoubleSpendResponderFlow.exceptionSeenInUserFlow)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `catching a notary error - pre-2PF initiator to two phase finality flow peer`() {
|
||||
var dischargedCounter = 0
|
||||
StaffedFlowHospital.onFlowErrorPropagated.add { _, _ ->
|
||||
++dischargedCounter
|
||||
}
|
||||
val aliceNode = createNode(ALICE_NAME, platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY - 1)
|
||||
val bobNode = createNode(BOB_NAME, platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY)
|
||||
|
||||
val handle = aliceNode.services.startFlow(CreateTransactionFlow(bobNode.info.singleIdentity()), testContext())
|
||||
mockNet.runNetwork()
|
||||
val ref = handle.getOrThrow().resultFuture.get()
|
||||
aliceNode.services.startFlow(SpendStateAndCatchDoubleSpendFlow(bobNode.info.singleIdentity(), ref), testContext()).getOrThrow()
|
||||
aliceNode.services.startFlow(SpendStateAndCatchDoubleSpendFlow(bobNode.info.singleIdentity(), ref), testContext()).getOrThrow()
|
||||
mockNet.runNetwork()
|
||||
|
||||
// 1 is the notary failing to notarise and propagating the error
|
||||
// 2 is the receiving flow failing due to the unexpected session end error
|
||||
assertEquals(2, dischargedCounter)
|
||||
assertTrue(SpendStateAndCatchDoubleSpendResponderFlow.exceptionSeenInUserFlow)
|
||||
}
|
||||
|
||||
private fun createNode(legalName: CordaX500Name, cordapps: List<TestCordappInternal> = emptyList(), platformVersion: Int = PLATFORM_VERSION): TestStartedNode {
|
||||
return mockNet.createNode(InternalMockNodeParameters(legalName = legalName, additionalCordapps = cordapps,
|
||||
version = MOCK_VERSION_INFO.copy(platformVersion = platformVersion)))
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `old finality flow catching a notary error will cause a peer to fail with unexpected session end during ReceiveFinalityFlow that passes through user code`() {
|
||||
var dischargedCounter = 0
|
||||
StaffedFlowHospital.onFlowErrorPropagated.add { _, _ ->
|
||||
++dischargedCounter
|
||||
}
|
||||
val user = User("mark", "dadada", setOf(Permissions.all()))
|
||||
driver(DriverParameters(isDebug = false, startNodesInProcess = true)) {
|
||||
|
||||
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
nodeAHandle.rpc.let {
|
||||
val ref = it.startFlow(::CreateTransactionFlow, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow(20.seconds)
|
||||
it.startFlow(::SpendStateAndCatchDoubleSpendOldFinalityFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds)
|
||||
it.startFlow(::SpendStateAndCatchDoubleSpendOldFinalityFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds)
|
||||
}
|
||||
}
|
||||
// 1 is the notary failing to notarise and propagating the error
|
||||
// 2 is the receiving flow failing due to the unexpected session end error
|
||||
assertEquals(2, dischargedCounter)
|
||||
assertTrue(SpendStateAndCatchDoubleSpendResponderOldFinalityFlow.exceptionSeenInUserFlow)
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `unexpected session end errors outside of ReceiveFinalityFlow are not handled`() {
|
||||
var dischargedCounter = 0
|
||||
@ -483,6 +571,62 @@ class FlowHospitalTest {
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
@StartableByRPC
|
||||
class SpendStateAndCatchDoubleSpendOldFinalityFlow(
|
||||
private val peer: Party,
|
||||
private val ref: StateAndRef<DummyState>,
|
||||
private val consumePeerError: Boolean
|
||||
) : FlowLogic<StateAndRef<DummyState>>() {
|
||||
|
||||
constructor(peer: Party, ref: StateAndRef<DummyState>): this(peer, ref, false)
|
||||
|
||||
@Suspendable
|
||||
override fun call(): StateAndRef<DummyState> {
|
||||
val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply {
|
||||
addInputState(ref)
|
||||
addOutputState(DummyState(participants = listOf(ourIdentity)))
|
||||
addCommand(DummyContract.Commands.Move(), listOf(ourIdentity.owningKey, peer.owningKey))
|
||||
}
|
||||
val stx = serviceHub.signInitialTransaction(tx)
|
||||
val session = initiateFlow(peer)
|
||||
session.send(consumePeerError)
|
||||
val ftx = subFlow(CollectSignaturesFlow(stx, listOf(session)))
|
||||
try {
|
||||
subFlow(OldFinalityFlow(ftx, session))
|
||||
} catch(e: NotaryException) {
|
||||
logger.info("Caught notary exception")
|
||||
}
|
||||
return ftx.coreTransaction.outRef(0)
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(SpendStateAndCatchDoubleSpendOldFinalityFlow::class)
|
||||
class SpendStateAndCatchDoubleSpendResponderOldFinalityFlow(private val session: FlowSession) : FlowLogic<Unit>() {
|
||||
|
||||
companion object {
|
||||
var exceptionSeenInUserFlow = false
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val consumeError = session.receive<Boolean>().unwrap { it }
|
||||
val stx = subFlow(object : SignTransactionFlow(session) {
|
||||
override fun checkTransaction(stx: SignedTransaction) {
|
||||
|
||||
}
|
||||
})
|
||||
try {
|
||||
subFlow(OldReceiveFinalityFlow(session, stx.id))
|
||||
} catch (e: UnexpectedFlowEndException) {
|
||||
exceptionSeenInUserFlow = true
|
||||
if (!consumeError) {
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
@StartableByRPC
|
||||
class CreateTransactionButDontFinalizeFlow(private val peer: Party, private val ref: StateAndRef<DummyState>) : FlowLogic<Unit>() {
|
||||
|
@ -0,0 +1,306 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.CordaInternal
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.isFulfilledBy
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.NotaryException
|
||||
import net.corda.core.flows.NotaryFlow
|
||||
import net.corda.core.flows.ReceiveTransactionFlow
|
||||
import net.corda.core.flows.SendTransactionFlow
|
||||
import net.corda.core.flows.UnexpectedFlowEndException
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.identity.groupAbstractPartyByWellKnownParty
|
||||
import net.corda.core.internal.telemetry.telemetryServiceInternal
|
||||
import net.corda.core.internal.warnOnce
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.node.StatesToRecord.ONLY_RELEVANT
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.debug
|
||||
|
||||
/**
|
||||
* Verifies the given transaction, then sends it to the named notary. If the notary agrees that the transaction
|
||||
* is acceptable then it is from that point onwards committed to the ledger, and will be written through to the
|
||||
* vault. Additionally it will be distributed to the parties reflected in the participants list of the states.
|
||||
*
|
||||
* By default, the initiating flow will commit states that are relevant to the initiating party as indicated by
|
||||
* [StatesToRecord.ONLY_RELEVANT]. Relevance is determined by the union of all participants to states which have been
|
||||
* included in the transaction. This default behaviour may be modified by passing in an alternate value for [StatesToRecord].
|
||||
*
|
||||
* The transaction is expected to have already been resolved: if its dependencies are not available in local
|
||||
* storage, verification will fail. It must have signatures from all necessary parties other than the notary.
|
||||
*
|
||||
* A list of [FlowSession]s is required for each non-local participant of the transaction. These participants will receive
|
||||
* the final notarised transaction by calling [ReceiveFinalityFlow] in their counterpart flows. Sessions with non-participants
|
||||
* can also be included, but they must specify [StatesToRecord.ALL_VISIBLE] for statesToRecord if they wish to record the
|
||||
* contract states into their vaults.
|
||||
*
|
||||
* The flow returns the same transaction but with the additional signatures from the notary.
|
||||
*
|
||||
* NOTE: This is an inlined flow but for backwards compatibility is annotated with [InitiatingFlow].
|
||||
*/
|
||||
// To maintain backwards compatibility with the old API, FinalityFlow can act both as an initiating flow and as an inlined flow.
|
||||
// This is only possible because a flow is only truly initiating when the first call to initiateFlow is made (where the
|
||||
// presence of @InitiatingFlow is checked). So the new API is inlined simply because that code path doesn't call initiateFlow.
|
||||
@InitiatingFlow
|
||||
class OldFinalityFlow private constructor(val transaction: SignedTransaction,
|
||||
private val oldParticipants: Collection<Party>,
|
||||
override val progressTracker: ProgressTracker,
|
||||
private val sessions: Collection<FlowSession>,
|
||||
private val newApi: Boolean,
|
||||
private val statesToRecord: StatesToRecord = ONLY_RELEVANT) : FlowLogic<SignedTransaction>() {
|
||||
|
||||
@CordaInternal
|
||||
data class ExtraConstructorArgs(val oldParticipants: Collection<Party>, val sessions: Collection<FlowSession>, val newApi: Boolean, val statesToRecord: StatesToRecord)
|
||||
|
||||
@CordaInternal
|
||||
fun getExtraConstructorArgs() = ExtraConstructorArgs(oldParticipants, sessions, newApi, statesToRecord)
|
||||
|
||||
@Deprecated(DEPRECATION_MSG)
|
||||
constructor(transaction: SignedTransaction, extraRecipients: Set<Party>, progressTracker: ProgressTracker) : this(
|
||||
transaction, extraRecipients, progressTracker, emptyList(), false
|
||||
)
|
||||
@Deprecated(DEPRECATION_MSG)
|
||||
constructor(transaction: SignedTransaction, extraRecipients: Set<Party>) : this(transaction, extraRecipients, tracker(), emptyList(), false)
|
||||
@Deprecated(DEPRECATION_MSG)
|
||||
constructor(transaction: SignedTransaction) : this(transaction, emptySet(), tracker(), emptyList(), false)
|
||||
@Deprecated(DEPRECATION_MSG)
|
||||
constructor(transaction: SignedTransaction, progressTracker: ProgressTracker) : this(transaction, emptySet(), progressTracker, emptyList(), false)
|
||||
|
||||
/**
|
||||
* Notarise the given transaction and broadcast it to the given [FlowSession]s. This list **must** at least include
|
||||
* all the non-local participants of the transaction. Sessions to non-participants can also be provided.
|
||||
*
|
||||
* @param transaction What to commit.
|
||||
*/
|
||||
constructor(transaction: SignedTransaction, firstSession: FlowSession, vararg restSessions: FlowSession) : this(
|
||||
transaction, listOf(firstSession) + restSessions.asList()
|
||||
)
|
||||
|
||||
/**
|
||||
* Notarise the given transaction and broadcast it to all the participants.
|
||||
*
|
||||
* @param transaction What to commit.
|
||||
* @param sessions A collection of [FlowSession]s for each non-local participant of the transaction. Sessions to non-participants can
|
||||
* also be provided.
|
||||
*/
|
||||
@JvmOverloads
|
||||
constructor(
|
||||
transaction: SignedTransaction,
|
||||
sessions: Collection<FlowSession>,
|
||||
progressTracker: ProgressTracker = tracker()
|
||||
) : this(transaction, emptyList(), progressTracker, sessions, true)
|
||||
|
||||
/**
|
||||
* Notarise the given transaction and broadcast it to all the participants.
|
||||
*
|
||||
* @param transaction What to commit.
|
||||
* @param sessions A collection of [FlowSession]s for each non-local participant of the transaction. Sessions to non-participants can
|
||||
* also be provided.
|
||||
* @param statesToRecord Which states to commit to the vault.
|
||||
*/
|
||||
@JvmOverloads
|
||||
constructor(
|
||||
transaction: SignedTransaction,
|
||||
sessions: Collection<FlowSession>,
|
||||
statesToRecord: StatesToRecord,
|
||||
progressTracker: ProgressTracker = tracker()
|
||||
) : this(transaction, emptyList(), progressTracker, sessions, true, statesToRecord)
|
||||
|
||||
/**
|
||||
* Notarise the given transaction and broadcast it to all the participants.
|
||||
*
|
||||
* @param transaction What to commit.
|
||||
* @param sessions A collection of [FlowSession]s for each non-local participant.
|
||||
* @param oldParticipants An **optional** collection of parties for participants who are still using the old API.
|
||||
*
|
||||
* You will only need to use this parameter if you have upgraded your CorDapp from the V3 FinalityFlow API but are required to provide
|
||||
* backwards compatibility with participants running V3 nodes. If you're writing a new CorDapp then this does not apply and this
|
||||
* parameter should be ignored.
|
||||
*/
|
||||
@Deprecated(DEPRECATION_MSG)
|
||||
constructor(
|
||||
transaction: SignedTransaction,
|
||||
sessions: Collection<FlowSession>,
|
||||
oldParticipants: Collection<Party>,
|
||||
progressTracker: ProgressTracker
|
||||
) : this(transaction, oldParticipants, progressTracker, sessions, true)
|
||||
|
||||
companion object {
|
||||
private const val DEPRECATION_MSG = "It is unsafe to use this constructor as it requires nodes to automatically " +
|
||||
"accept notarised transactions without first checking their relevancy. Instead, use one of the constructors " +
|
||||
"that requires only FlowSessions."
|
||||
|
||||
object NOTARISING : ProgressTracker.Step("Requesting signature by notary service") {
|
||||
override fun childProgressTracker() = NotaryFlow.Client.tracker()
|
||||
}
|
||||
|
||||
object BROADCASTING : ProgressTracker.Step("Broadcasting transaction to participants")
|
||||
|
||||
@JvmStatic
|
||||
fun tracker() = ProgressTracker(NOTARISING, BROADCASTING)
|
||||
}
|
||||
|
||||
@Suppress("ComplexMethod")
|
||||
@Suspendable
|
||||
@Throws(NotaryException::class)
|
||||
override fun call(): SignedTransaction {
|
||||
if (!newApi) {
|
||||
logger.warnOnce("The current usage of FinalityFlow is unsafe. Please consider upgrading your CorDapp to use " +
|
||||
"FinalityFlow with FlowSessions. (${serviceHub.getAppContext().cordapp.info})")
|
||||
} else {
|
||||
require(sessions.none { serviceHub.myInfo.isLegalIdentity(it.counterparty) }) {
|
||||
"Do not provide flow sessions for the local node. FinalityFlow will record the notarised transaction locally."
|
||||
}
|
||||
}
|
||||
|
||||
// Note: this method is carefully broken up to minimize the amount of data reachable from the stack at
|
||||
// the point where subFlow is invoked, as that minimizes the checkpointing work to be done.
|
||||
//
|
||||
// Lookup the resolved transactions and use them to map each signed transaction to the list of participants.
|
||||
// Then send to the notary if needed, record locally and distribute.
|
||||
|
||||
logCommandData()
|
||||
val ledgerTransaction = verifyTx()
|
||||
val externalTxParticipants = extractExternalParticipants(ledgerTransaction)
|
||||
|
||||
if (newApi) {
|
||||
val sessionParties = sessions.map { it.counterparty }
|
||||
val missingRecipients = externalTxParticipants - sessionParties - oldParticipants
|
||||
require(missingRecipients.isEmpty()) {
|
||||
"Flow sessions were not provided for the following transaction participants: $missingRecipients"
|
||||
}
|
||||
sessionParties.intersect(oldParticipants).let {
|
||||
require(it.isEmpty()) { "The following parties are specified both in flow sessions and in the oldParticipants list: $it" }
|
||||
}
|
||||
}
|
||||
|
||||
val notarised = notariseAndRecord()
|
||||
|
||||
progressTracker.currentStep = BROADCASTING
|
||||
|
||||
if (newApi) {
|
||||
oldV3Broadcast(notarised, oldParticipants.toSet())
|
||||
for (session in sessions) {
|
||||
try {
|
||||
subFlow(SendTransactionFlow(session, notarised))
|
||||
logger.info("Party ${session.counterparty} received the transaction.")
|
||||
} catch (e: UnexpectedFlowEndException) {
|
||||
throw UnexpectedFlowEndException(
|
||||
"${session.counterparty} has finished prematurely and we're trying to send them the finalised transaction. " +
|
||||
"Did they forget to call ReceiveFinalityFlow? (${e.message})",
|
||||
e.cause,
|
||||
e.originalErrorId
|
||||
)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
oldV3Broadcast(notarised, (externalTxParticipants + oldParticipants).toSet())
|
||||
}
|
||||
|
||||
logger.info("All parties received the transaction successfully.")
|
||||
|
||||
return notarised
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun oldV3Broadcast(notarised: SignedTransaction, recipients: Set<Party>) {
|
||||
for (recipient in recipients) {
|
||||
if (!serviceHub.myInfo.isLegalIdentity(recipient)) {
|
||||
logger.debug { "Sending transaction to party $recipient." }
|
||||
val session = initiateFlow(recipient)
|
||||
subFlow(SendTransactionFlow(session, notarised))
|
||||
logger.info("Party $recipient received the transaction.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun logCommandData() {
|
||||
if (logger.isDebugEnabled) {
|
||||
val commandDataTypes = transaction.tx.commands.asSequence().mapNotNull { it.value::class.qualifiedName }.distinct()
|
||||
logger.debug("Started finalization, commands are ${commandDataTypes.joinToString(", ", "[", "]")}.")
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun notariseAndRecord(): SignedTransaction {
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#notariseAndRecord", flowLogic = this) {
|
||||
val notarised = if (needsNotarySignature(transaction)) {
|
||||
progressTracker.currentStep = NOTARISING
|
||||
val notarySignatures = subFlow(NotaryFlow.Client(transaction, skipVerification = true))
|
||||
transaction + notarySignatures
|
||||
} else {
|
||||
logger.info("No need to notarise this transaction.")
|
||||
transaction
|
||||
}
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#notariseAndRecord:recordTransactions", flowLogic = this) {
|
||||
logger.info("Recording transaction locally.")
|
||||
serviceHub.recordTransactions(statesToRecord, listOf(notarised))
|
||||
logger.info("Recorded transaction locally successfully.")
|
||||
}
|
||||
return notarised
|
||||
}
|
||||
}
|
||||
|
||||
private fun needsNotarySignature(stx: SignedTransaction): Boolean {
|
||||
val wtx = stx.tx
|
||||
val needsNotarisation = wtx.inputs.isNotEmpty() || wtx.references.isNotEmpty() || wtx.timeWindow != null
|
||||
return needsNotarisation && hasNoNotarySignature(stx)
|
||||
}
|
||||
|
||||
private fun hasNoNotarySignature(stx: SignedTransaction): Boolean {
|
||||
val notaryKey = stx.tx.notary?.owningKey
|
||||
val signers = stx.sigs.asSequence().map { it.by }.toSet()
|
||||
return notaryKey?.isFulfilledBy(signers) != true
|
||||
}
|
||||
|
||||
private fun extractExternalParticipants(ltx: LedgerTransaction): Set<Party> {
|
||||
val participants = ltx.outputStates.flatMap { it.participants } + ltx.inputStates.flatMap { it.participants }
|
||||
return groupAbstractPartyByWellKnownParty(serviceHub, participants).keys - serviceHub.myInfo.legalIdentities
|
||||
}
|
||||
|
||||
private fun verifyTx(): LedgerTransaction {
|
||||
val notary = transaction.tx.notary
|
||||
// The notary signature(s) are allowed to be missing but no others.
|
||||
if (notary != null) transaction.verifySignaturesExcept(notary.owningKey) else transaction.verifyRequiredSignatures()
|
||||
// TODO= [CORDA-3267] Remove duplicate signature verification
|
||||
val ltx = transaction.toLedgerTransaction(serviceHub, false)
|
||||
ltx.verify()
|
||||
return ltx
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The receiving counterpart to [FinalityFlow].
|
||||
*
|
||||
* All parties who are receiving a finalised transaction from a sender flow must subcall this flow in their own flows.
|
||||
*
|
||||
* It's typical to have already signed the transaction proposal in the same workflow using [SignTransactionFlow]. If so
|
||||
* then the transaction ID can be passed in as an extra check to ensure the finalised transaction is the one that was signed
|
||||
* before it's committed to the vault.
|
||||
*
|
||||
* @param otherSideSession The session which is providing the transaction to record.
|
||||
* @param expectedTxId Expected ID of the transaction that's about to be received. This is typically retrieved from
|
||||
* [SignTransactionFlow]. Setting it to null disables the expected transaction ID check.
|
||||
* @param statesToRecord Which states to commit to the vault. Defaults to [StatesToRecord.ONLY_RELEVANT].
|
||||
*/
|
||||
class OldReceiveFinalityFlow @JvmOverloads constructor(private val otherSideSession: FlowSession,
|
||||
private val expectedTxId: SecureHash? = null,
|
||||
private val statesToRecord: StatesToRecord = ONLY_RELEVANT) : FlowLogic<SignedTransaction>() {
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
return subFlow(object : ReceiveTransactionFlow(otherSideSession, checkSufficientSignatures = true, statesToRecord = statesToRecord) {
|
||||
override fun checkBeforeRecording(stx: SignedTransaction) {
|
||||
require(expectedTxId == null || expectedTxId == stx.id) {
|
||||
"We expected to receive transaction with ID $expectedTxId but instead got ${stx.id}. Transaction was" +
|
||||
"not recorded and nor its states sent to the vault."
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -39,7 +39,6 @@ import org.assertj.core.api.Assertions
|
||||
import org.junit.After
|
||||
import org.junit.Assert
|
||||
import org.junit.Test
|
||||
import java.lang.IllegalStateException
|
||||
import java.sql.SQLException
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.TimeUnit
|
||||
@ -422,9 +421,10 @@ class VaultObserverExceptionTest {
|
||||
/**
|
||||
* An error is thrown inside of the [VaultService.rawUpdates] observable while recording a transaction inside of the initiating node.
|
||||
*
|
||||
* This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction
|
||||
* also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the
|
||||
* counterparty node.
|
||||
* Two Phase Finality update:
|
||||
* This causes the transaction to not be finalised on the local node but the notary still records the transaction as spent. The transaction
|
||||
* does finalize at the counterparty node since the notary signatures are broadcast to peers prior to initiator node finalisation.
|
||||
* Subscriber events will occur on the counterparty node.
|
||||
*
|
||||
* More importantly, the observer listening to the [VaultService.rawUpdates] observable should not unsubscribe.
|
||||
*
|
||||
@ -487,29 +487,32 @@ class VaultObserverExceptionTest {
|
||||
println("First set of flows")
|
||||
val stateId = startErrorInObservableWhenConsumingState()
|
||||
assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size)
|
||||
assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
|
||||
// Ledger is temporarily inconsistent as peer finalised transaction but initiator error'ed before finalisation.
|
||||
assertEquals(1, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
|
||||
assertEquals(1, notary.getNotarisedTransactionIds().size)
|
||||
assertEquals(1, observationCounter)
|
||||
assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
|
||||
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
|
||||
assertEquals(1, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
|
||||
|
||||
println("Second set of flows")
|
||||
val stateId2 = startErrorInObservableWhenConsumingState()
|
||||
assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size)
|
||||
assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
|
||||
// Ledger is temporarily inconsistent as peer finalised transaction but initiator error'ed before finalisation.
|
||||
assertEquals(1, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
|
||||
assertEquals(2, notary.getNotarisedTransactionIds().size)
|
||||
assertEquals(2, observationCounter)
|
||||
assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
|
||||
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
|
||||
assertEquals(2, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An error is thrown inside of the [VaultService.rawUpdates] observable while recording a transaction inside of the initiating node.
|
||||
*
|
||||
* This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction
|
||||
* also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the
|
||||
* counterparty node.
|
||||
* Two Phase Finality update:
|
||||
* This causes the transaction to not be finalised on the local node but the notary still records the transaction as spent. The transaction
|
||||
* does finalize at the counterparty node since the notary signatures are broadcast to peers prior to initiator node finalisation.
|
||||
* Subscriber events will occur on the counterparty node.
|
||||
*
|
||||
* More importantly, the observer listening to the [VaultService.rawUpdates] observable should not unsubscribe.
|
||||
*
|
||||
@ -578,19 +581,21 @@ class VaultObserverExceptionTest {
|
||||
|
||||
val stateId = startErrorInObservableWhenConsumingState()
|
||||
assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size)
|
||||
assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
|
||||
// Ledger is temporarily inconsistent as peer finalised transaction but initiator error'ed before finalisation.
|
||||
assertEquals(1, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
|
||||
assertEquals(1, notary.getNotarisedTransactionIds().size)
|
||||
assertEquals(1, observationCounter)
|
||||
assertEquals(3, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
|
||||
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
|
||||
assertEquals(1, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
|
||||
|
||||
val stateId2 = startErrorInObservableWhenConsumingState()
|
||||
assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size)
|
||||
assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
|
||||
// Ledger is temporarily inconsistent as peer finalised transaction but initiator error'ed before finalisation.
|
||||
assertEquals(1, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
|
||||
assertEquals(2, notary.getNotarisedTransactionIds().size)
|
||||
assertEquals(2, observationCounter)
|
||||
assertEquals(6, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
|
||||
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
|
||||
assertEquals(2, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
|
||||
}
|
||||
}
|
||||
|
||||
@ -681,9 +686,10 @@ class VaultObserverExceptionTest {
|
||||
/**
|
||||
* An error is thrown inside of the [VaultService.updates] observable while recording a transaction inside of the initiating node.
|
||||
*
|
||||
* This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction
|
||||
* also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the
|
||||
* counterparty node.
|
||||
* Two Phase Finality update:
|
||||
* This causes the transaction to not be finalised on the local node but the notary still records the transaction as spent. The transaction
|
||||
* does finalize at the counterparty node since the notary signatures are broadcast to peers prior to initiator node finalisation.
|
||||
* Subscriber events will occur on the counterparty node.
|
||||
*
|
||||
* More importantly, the observer listening to the [VaultService.updates] observable should not unsubscribe.
|
||||
*
|
||||
@ -743,19 +749,21 @@ class VaultObserverExceptionTest {
|
||||
val stateId = startErrorInObservableWhenConsumingState()
|
||||
assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size)
|
||||
assertEquals(1, aliceNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
|
||||
assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
|
||||
// Ledger is temporarily inconsistent as peer finalised transaction but initiator error'ed before finalisation.
|
||||
assertEquals(1, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
|
||||
assertEquals(1, notary.getNotarisedTransactionIds().size)
|
||||
assertEquals(1, observationCounter)
|
||||
assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
|
||||
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
|
||||
assertEquals(1, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
|
||||
|
||||
val stateId2 = startErrorInObservableWhenConsumingState()
|
||||
assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size)
|
||||
assertEquals(2, aliceNode.getAllStates(Vault.StateStatus.UNCONSUMED).size)
|
||||
assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
|
||||
// Ledger is temporarily inconsistent as peer finalised transaction but initiator error'ed before finalisation.
|
||||
assertEquals(1, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
|
||||
assertEquals(2, notary.getNotarisedTransactionIds().size)
|
||||
assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
|
||||
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
|
||||
assertEquals(2, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@ package net.corda.node.services
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.TransactionStatus
|
||||
import net.corda.core.internal.FetchTransactionsFlow
|
||||
import net.corda.core.internal.ResolveTransactionsFlow
|
||||
import net.corda.core.internal.TransactionsResolver
|
||||
@ -101,10 +102,10 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
|
||||
val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage
|
||||
for (txId in sortedDependencies) {
|
||||
// Retrieve and delete the transaction from the unverified store.
|
||||
val (tx, isVerified) = checkNotNull(transactionStorage.getTransactionInternal(txId)) {
|
||||
val (tx, txStatus) = checkNotNull(transactionStorage.getTransactionInternal(txId)) {
|
||||
"Somehow the unverified transaction ($txId) that we stored previously is no longer there."
|
||||
}
|
||||
if (!isVerified) {
|
||||
if (txStatus != TransactionStatus.VERIFIED) {
|
||||
tx.verify(flow.serviceHub)
|
||||
flow.serviceHub.recordTransactions(usedStatesToRecord, listOf(tx))
|
||||
} else {
|
||||
|
@ -3,10 +3,19 @@ package net.corda.node.services.api
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.context.InvocationContext
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowTransactionMetadata
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.flows.TransactionStatus
|
||||
import net.corda.core.internal.FlowStateMachineHandle
|
||||
import net.corda.core.internal.NamedCacheFactory
|
||||
import net.corda.core.internal.ResolveTransactionsFlow
|
||||
import net.corda.core.internal.ServiceHubCoreInternal
|
||||
import net.corda.core.internal.TransactionsResolver
|
||||
import net.corda.core.internal.concurrent.OpenFuture
|
||||
import net.corda.core.internal.dependencies
|
||||
import net.corda.core.internal.requireSupportedHashType
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.messaging.StateMachineTransactionMapping
|
||||
import net.corda.core.node.NodeInfo
|
||||
@ -15,6 +24,7 @@ import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.core.node.services.NetworkMapCacheBase
|
||||
import net.corda.core.node.services.TransactionStorage
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.internal.InitiatedFlowFactory
|
||||
import net.corda.node.internal.cordapp.CordappProviderInternal
|
||||
@ -27,7 +37,11 @@ import net.corda.node.services.statemachine.ExternalEvent
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import java.security.PublicKey
|
||||
import java.util.*
|
||||
import java.util.ArrayList
|
||||
import java.util.Collections
|
||||
import java.util.HashMap
|
||||
import java.util.HashSet
|
||||
import java.util.LinkedHashSet
|
||||
|
||||
interface NetworkMapCacheInternal : NetworkMapCache, NetworkMapCacheBase {
|
||||
override val nodeReady: OpenFuture<Void?>
|
||||
@ -63,12 +77,15 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
|
||||
return sort.complete()
|
||||
}
|
||||
|
||||
@Suppress("LongParameterList")
|
||||
fun recordTransactions(statesToRecord: StatesToRecord,
|
||||
txs: Collection<SignedTransaction>,
|
||||
validatedTransactions: WritableTransactionStorage,
|
||||
stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage,
|
||||
vaultService: VaultServiceInternal,
|
||||
database: CordaPersistence) {
|
||||
database: CordaPersistence,
|
||||
updateFn: (SignedTransaction) -> Boolean = validatedTransactions::addTransaction
|
||||
) {
|
||||
|
||||
database.transaction {
|
||||
require(txs.isNotEmpty()) { "No transactions passed in for recording" }
|
||||
@ -79,9 +96,9 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
|
||||
// for transactions being recorded at ONLY_RELEVANT, if this transaction has been seen before its outputs should already
|
||||
// have been recorded at ONLY_RELEVANT, so there shouldn't be anything to re-record here.
|
||||
val (recordedTransactions, previouslySeenTxs) = if (statesToRecord != StatesToRecord.ALL_VISIBLE) {
|
||||
orderedTxs.filter(validatedTransactions::addTransaction) to emptyList()
|
||||
orderedTxs.filter(updateFn) to emptyList()
|
||||
} else {
|
||||
orderedTxs.partition(validatedTransactions::addTransaction)
|
||||
orderedTxs.partition(updateFn)
|
||||
}
|
||||
val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id
|
||||
if (stateMachineRunId != null) {
|
||||
@ -129,6 +146,22 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
|
||||
vaultService.notifyAll(statesToRecord, recordedTransactions.map { it.coreTransaction }, previouslySeenTxs.map { it.coreTransaction })
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("LongParameterList")
|
||||
fun finalizeTransactionWithExtraSignatures(statesToRecord: StatesToRecord,
|
||||
txn: SignedTransaction,
|
||||
sigs: Collection<TransactionSignature>,
|
||||
validatedTransactions: WritableTransactionStorage,
|
||||
stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage,
|
||||
vaultService: VaultServiceInternal,
|
||||
database: CordaPersistence) {
|
||||
database.transaction {
|
||||
require(sigs.isNotEmpty()) { "No signatures passed in for recording" }
|
||||
recordTransactions(statesToRecord, listOf(txn), validatedTransactions, stateMachineRecordedTransactionMapping, vaultService, database) {
|
||||
validatedTransactions.finalizeTransactionWithExtraSignatures(it, sigs)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override val attachments: AttachmentStorageInternal
|
||||
@ -156,7 +189,9 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
|
||||
val cacheFactory: NamedCacheFactory
|
||||
|
||||
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
|
||||
txs.forEach { requireSupportedHashType(it) }
|
||||
txs.forEach {
|
||||
requireSupportedHashType(it)
|
||||
}
|
||||
recordTransactions(
|
||||
statesToRecord,
|
||||
txs as? Collection ?: txs.toList(), // We can't change txs to a Collection as it's now part of the public API
|
||||
@ -167,6 +202,32 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
|
||||
)
|
||||
}
|
||||
|
||||
override fun finalizeTransactionWithExtraSignatures(txn: SignedTransaction, sigs: Collection<TransactionSignature>, statesToRecord: StatesToRecord) {
|
||||
requireSupportedHashType(txn)
|
||||
if (txn.coreTransaction is WireTransaction)
|
||||
(txn + sigs).verifyRequiredSignatures()
|
||||
finalizeTransactionWithExtraSignatures(
|
||||
statesToRecord,
|
||||
txn,
|
||||
sigs,
|
||||
validatedTransactions,
|
||||
stateMachineRecordedTransactionMapping,
|
||||
vaultService,
|
||||
database
|
||||
)
|
||||
}
|
||||
|
||||
override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata?) {
|
||||
if (txn.coreTransaction is WireTransaction) {
|
||||
txn.notary?.let { notary ->
|
||||
txn.verifySignaturesExcept(notary.owningKey)
|
||||
} ?: txn.verifyRequiredSignatures()
|
||||
}
|
||||
database.transaction {
|
||||
validatedTransactions.addUnnotarisedTransaction(txn, metadata)
|
||||
}
|
||||
}
|
||||
|
||||
override fun createTransactionsResolver(flow: ResolveTransactionsFlow): TransactionsResolver = DbTransactionsResolver(flow)
|
||||
|
||||
/**
|
||||
@ -253,16 +314,33 @@ interface WritableTransactionStorage : TransactionStorage {
|
||||
// TODO: Throw an exception if trying to add a transaction with fewer signatures than an existing entry.
|
||||
fun addTransaction(transaction: SignedTransaction): Boolean
|
||||
|
||||
/**
|
||||
* Add an un-notarised transaction to the store with a status of *MISSING_TRANSACTION_SIG*.
|
||||
* Optionally add finality flow recovery metadata.
|
||||
* @param transaction The transaction to be recorded.
|
||||
* @param metadata Finality flow recovery metadata.
|
||||
* @return true if the transaction was recorded as a *new* transaction, false if the transaction already exists.
|
||||
*/
|
||||
fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata? = null): Boolean
|
||||
|
||||
/**
|
||||
* Update a previously un-notarised transaction including associated notary signatures.
|
||||
* @param transaction The notarised transaction to be finalized.
|
||||
* @param signatures The notary signatures.
|
||||
* @return true if the transaction is recorded as a *finalized* transaction, false if the transaction already exists.
|
||||
*/
|
||||
fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection<TransactionSignature>) : Boolean
|
||||
|
||||
/**
|
||||
* Add a new *unverified* transaction to the store.
|
||||
*/
|
||||
fun addUnverifiedTransaction(transaction: SignedTransaction)
|
||||
|
||||
/**
|
||||
* Return the transaction with the given ID from the store, and a flag of whether it's verified. Returns null if no transaction with the
|
||||
* ID exists.
|
||||
* Return the transaction with the given ID from the store, and its associated [TransactionStatus].
|
||||
* Returns null if no transaction with the ID exists.
|
||||
*/
|
||||
fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, Boolean>?
|
||||
fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, TransactionStatus>?
|
||||
|
||||
/**
|
||||
* Returns a future that completes with the transaction corresponding to [id] once it has been committed. Do not warn when run inside
|
||||
|
@ -3,17 +3,26 @@ package net.corda.node.services.persistence
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.flows.FlowTransactionMetadata
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.NamedCacheFactory
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.internal.bufferUntilSubscribed
|
||||
import net.corda.core.internal.concurrent.doneFuture
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.internal.effectiveSerializationEnv
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.toFuture
|
||||
import net.corda.core.transactions.CoreTransaction
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.node.CordaClock
|
||||
@ -21,22 +30,35 @@ import net.corda.node.services.api.WritableTransactionStorage
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMapBase
|
||||
import net.corda.node.utilities.WeightBasedAppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.*
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
|
||||
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
|
||||
import net.corda.serialization.internal.CordaSerializationEncoding.SNAPPY
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import javax.persistence.*
|
||||
import java.util.Collections
|
||||
import javax.persistence.AttributeConverter
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.Convert
|
||||
import javax.persistence.Converter
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.Id
|
||||
import javax.persistence.Lob
|
||||
import javax.persistence.Table
|
||||
import kotlin.streams.toList
|
||||
|
||||
@Suppress("TooManyFunctions")
|
||||
class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: NamedCacheFactory,
|
||||
private val clock: CordaClock) : WritableTransactionStorage, SingletonSerializeAsToken() {
|
||||
|
||||
@Suppress("MagicNumber") // database column width
|
||||
@Entity
|
||||
@Table(name = "${NODE_DATABASE_PREFIX}transactions")
|
||||
class DBTransaction(
|
||||
data class DBTransaction(
|
||||
@Id
|
||||
@Column(name = "tx_id", length = 144, nullable = false)
|
||||
val txId: String,
|
||||
@ -53,17 +75,41 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
val status: TransactionStatus,
|
||||
|
||||
@Column(name = "timestamp", nullable = false)
|
||||
val timestamp: Instant
|
||||
)
|
||||
val timestamp: Instant,
|
||||
|
||||
@Column(name = "signatures")
|
||||
val signatures: ByteArray?,
|
||||
|
||||
/**
|
||||
* Flow finality metadata used for recovery
|
||||
* TODO: create association table solely for Flow metadata and recovery purposes.
|
||||
* See https://r3-cev.atlassian.net/browse/ENT-9521
|
||||
*/
|
||||
|
||||
/** X500Name of flow initiator **/
|
||||
@Column(name = "initiator")
|
||||
val initiator: String? = null,
|
||||
|
||||
/** X500Name of flow participant parties **/
|
||||
@Column(name = "participants")
|
||||
@Convert(converter = StringListConverter::class)
|
||||
val participants: List<String>? = null,
|
||||
|
||||
/** states to record: NONE, ALL_VISIBLE, ONLY_RELEVANT */
|
||||
@Column(name = "states_to_record")
|
||||
val statesToRecord: StatesToRecord? = null
|
||||
)
|
||||
|
||||
enum class TransactionStatus {
|
||||
UNVERIFIED,
|
||||
VERIFIED;
|
||||
VERIFIED,
|
||||
MISSING_NOTARY_SIG;
|
||||
|
||||
fun toDatabaseValue(): String {
|
||||
return when (this) {
|
||||
UNVERIFIED -> "U"
|
||||
VERIFIED -> "V"
|
||||
MISSING_NOTARY_SIG -> "M"
|
||||
}
|
||||
}
|
||||
|
||||
@ -71,11 +117,20 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
return this == VERIFIED
|
||||
}
|
||||
|
||||
fun toTransactionStatus(): net.corda.core.flows.TransactionStatus {
|
||||
return when(this) {
|
||||
UNVERIFIED -> net.corda.core.flows.TransactionStatus.UNVERIFIED
|
||||
VERIFIED -> net.corda.core.flows.TransactionStatus.VERIFIED
|
||||
MISSING_NOTARY_SIG -> net.corda.core.flows.TransactionStatus.MISSING_NOTARY_SIG
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
fun fromDatabaseValue(databaseValue: String): TransactionStatus {
|
||||
return when (databaseValue) {
|
||||
"V" -> VERIFIED
|
||||
"U" -> UNVERIFIED
|
||||
"M" -> MISSING_NOTARY_SIG
|
||||
else -> throw UnexpectedStatusValueException(databaseValue)
|
||||
}
|
||||
}
|
||||
@ -95,6 +150,21 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
}
|
||||
}
|
||||
|
||||
@Converter
|
||||
class StringListConverter : AttributeConverter<List<String>?, String?> {
|
||||
override fun convertToDatabaseColumn(stringList: List<String>?): String? {
|
||||
return stringList?.let { if (it.isEmpty()) null else it.joinToString(SPLIT_CHAR) }
|
||||
}
|
||||
|
||||
override fun convertToEntityAttribute(string: String?): List<String>? {
|
||||
return string?.split(SPLIT_CHAR)
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val SPLIT_CHAR = ";"
|
||||
}
|
||||
}
|
||||
|
||||
internal companion object {
|
||||
const val TRANSACTION_ALREADY_IN_PROGRESS_WARNING = "trackTransaction is called with an already existing, open DB transaction. As a result, there might be transactions missing from the returned data feed, because of race conditions."
|
||||
|
||||
@ -107,7 +177,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
|
||||
private val logger = contextLogger()
|
||||
|
||||
private fun contextToUse(): SerializationContext {
|
||||
fun contextToUse(): SerializationContext {
|
||||
return if (effectiveSerializationEnv.serializationFactory.currentContext?.useCase == SerializationContext.UseCase.Storage) {
|
||||
effectiveSerializationEnv.serializationFactory.currentContext!!
|
||||
} else {
|
||||
@ -121,10 +191,19 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
cacheFactory = cacheFactory,
|
||||
name = "DBTransactionStorage_transactions",
|
||||
toPersistentEntityKey = SecureHash::toString,
|
||||
fromPersistentEntity = {
|
||||
SecureHash.create(it.txId) to TxCacheValue(
|
||||
it.transaction.deserialize(context = contextToUse()),
|
||||
it.status)
|
||||
fromPersistentEntity = { dbTxn ->
|
||||
SecureHash.create(dbTxn.txId) to TxCacheValue(
|
||||
dbTxn.transaction.deserialize(context = contextToUse()),
|
||||
dbTxn.status,
|
||||
dbTxn.signatures?.deserialize(context = contextToUse()),
|
||||
dbTxn.initiator?.let { initiator ->
|
||||
FlowTransactionMetadata(
|
||||
CordaX500Name.parse(initiator),
|
||||
dbTxn.statesToRecord!!,
|
||||
dbTxn.participants?.let { it.map { CordaX500Name.parse(it) }.toSet() }
|
||||
)
|
||||
}
|
||||
)
|
||||
},
|
||||
toPersistentEntity = { key: SecureHash, value: TxCacheValue ->
|
||||
DBTransaction(
|
||||
@ -132,7 +211,11 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id?.uuid?.toString(),
|
||||
transaction = value.toSignedTx().serialize(context = contextToUse().withEncoding(SNAPPY)).bytes,
|
||||
status = value.status,
|
||||
timestamp = clock.instant()
|
||||
timestamp = clock.instant(),
|
||||
signatures = value.sigs.serialize(context = contextToUse().withEncoding(SNAPPY)).bytes,
|
||||
statesToRecord = value.metadata?.statesToRecord,
|
||||
initiator = value.metadata?.initiator?.toString(),
|
||||
participants = value.metadata?.peers?.map { it.toString() }
|
||||
)
|
||||
},
|
||||
persistentEntityClass = DBTransaction::class.java,
|
||||
@ -158,19 +241,43 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
criteriaUpdate.set(updateRoot.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.VERIFIED)
|
||||
criteriaUpdate.where(criteriaBuilder.and(
|
||||
criteriaBuilder.equal(updateRoot.get<String>(DBTransaction::txId.name), txId.toString()),
|
||||
criteriaBuilder.equal(updateRoot.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.UNVERIFIED)
|
||||
))
|
||||
criteriaBuilder.and(updateRoot.get<TransactionStatus>(DBTransaction::status.name).`in`(setOf(TransactionStatus.UNVERIFIED, TransactionStatus.MISSING_NOTARY_SIG))
|
||||
)))
|
||||
criteriaUpdate.set(updateRoot.get<Instant>(DBTransaction::timestamp.name), clock.instant())
|
||||
val update = session.createQuery(criteriaUpdate)
|
||||
val rowsUpdated = update.executeUpdate()
|
||||
return rowsUpdated != 0
|
||||
}
|
||||
|
||||
override fun addTransaction(transaction: SignedTransaction): Boolean {
|
||||
override fun addTransaction(transaction: SignedTransaction) =
|
||||
addTransaction(transaction) {
|
||||
updateTransaction(transaction.id)
|
||||
}
|
||||
|
||||
override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata?) =
|
||||
database.transaction {
|
||||
txStorage.locked {
|
||||
val cacheValue = TxCacheValue(transaction, status = TransactionStatus.MISSING_NOTARY_SIG, metadata = metadata)
|
||||
val added = addWithDuplicatesAllowed(transaction.id, cacheValue)
|
||||
if (added) {
|
||||
logger.info ("Transaction ${transaction.id} recorded as un-notarised.")
|
||||
} else {
|
||||
logger.info("Transaction ${transaction.id} (un-notarised) already exists so no need to record.")
|
||||
}
|
||||
added
|
||||
}
|
||||
}
|
||||
|
||||
override fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection<TransactionSignature>) =
|
||||
addTransaction(transaction + signatures) {
|
||||
finalizeTransactionWithExtraSignatures(transaction.id, signatures)
|
||||
}
|
||||
|
||||
private fun addTransaction(transaction: SignedTransaction, updateFn: (SecureHash) -> Boolean): Boolean {
|
||||
return database.transaction {
|
||||
txStorage.locked {
|
||||
val cachedValue = TxCacheValue(transaction, TransactionStatus.VERIFIED)
|
||||
val addedOrUpdated = addOrUpdate(transaction.id, cachedValue) { k, _ -> updateTransaction(k) }
|
||||
val addedOrUpdated = addOrUpdate(transaction.id, cachedValue) { k, _ -> updateFn(k) }
|
||||
if (addedOrUpdated) {
|
||||
logger.debug { "Transaction ${transaction.id} has been recorded as verified" }
|
||||
onNewTx(transaction)
|
||||
@ -182,6 +289,40 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
}
|
||||
}
|
||||
|
||||
private fun finalizeTransactionWithExtraSignatures(txId: SecureHash, signatures: Collection<TransactionSignature>): Boolean {
|
||||
return txStorage.locked {
|
||||
val session = currentDBSession()
|
||||
val criteriaBuilder = session.criteriaBuilder
|
||||
val criteriaUpdate = criteriaBuilder.createCriteriaUpdate(DBTransaction::class.java)
|
||||
val updateRoot = criteriaUpdate.from(DBTransaction::class.java)
|
||||
criteriaUpdate.set(updateRoot.get<ByteArray>(DBTransaction::signatures.name), signatures.serialize(context = contextToUse().withEncoding(SNAPPY)).bytes)
|
||||
criteriaUpdate.set(updateRoot.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.VERIFIED)
|
||||
criteriaUpdate.where(criteriaBuilder.and(
|
||||
criteriaBuilder.equal(updateRoot.get<String>(DBTransaction::txId.name), txId.toString()),
|
||||
criteriaBuilder.equal(updateRoot.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.MISSING_NOTARY_SIG)
|
||||
))
|
||||
criteriaUpdate.set(updateRoot.get<Instant>(DBTransaction::timestamp.name), clock.instant())
|
||||
val update = session.createQuery(criteriaUpdate)
|
||||
val rowsUpdated = update.executeUpdate()
|
||||
if (rowsUpdated == 0) {
|
||||
// indicates race condition whereby ReceiverFinality MISSING_NOTARY_SIG overwritten to UNVERIFIED by ResolveTransactionsFlow (in follow-up txn)
|
||||
// TO-DO: ensure unverified txn signatures are validated prior to recording (https://r3-cev.atlassian.net/browse/ENT-9566)
|
||||
val criteriaUpdateUnverified = criteriaBuilder.createCriteriaUpdate(DBTransaction::class.java)
|
||||
val updateRootUnverified = criteriaUpdateUnverified.from(DBTransaction::class.java)
|
||||
criteriaUpdateUnverified.set(updateRootUnverified.get<ByteArray>(DBTransaction::signatures.name), signatures.serialize(context = contextToUse().withEncoding(SNAPPY)).bytes)
|
||||
criteriaUpdateUnverified.set(updateRootUnverified.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.VERIFIED)
|
||||
criteriaUpdateUnverified.where(criteriaBuilder.and(
|
||||
criteriaBuilder.equal(updateRootUnverified.get<String>(DBTransaction::txId.name), txId.toString()),
|
||||
criteriaBuilder.equal(updateRootUnverified.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.UNVERIFIED)
|
||||
))
|
||||
criteriaUpdateUnverified.set(updateRootUnverified.get<Instant>(DBTransaction::timestamp.name), clock.instant())
|
||||
val updateUnverified = session.createQuery(criteriaUpdateUnverified)
|
||||
val rowsUpdatedUnverified = updateUnverified.executeUpdate()
|
||||
rowsUpdatedUnverified != 0
|
||||
} else true
|
||||
}
|
||||
}
|
||||
|
||||
private fun onNewTx(transaction: SignedTransaction): Boolean {
|
||||
updatesPublisher.bufferUntilDatabaseCommit().onNext(transaction)
|
||||
return true
|
||||
@ -194,10 +335,18 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
}
|
||||
|
||||
override fun addUnverifiedTransaction(transaction: SignedTransaction) {
|
||||
if (transaction.coreTransaction is WireTransaction)
|
||||
transaction.verifyRequiredSignatures()
|
||||
database.transaction {
|
||||
txStorage.locked {
|
||||
val cacheValue = TxCacheValue(transaction, status = TransactionStatus.UNVERIFIED)
|
||||
val added = addWithDuplicatesAllowed(transaction.id, cacheValue)
|
||||
val added = addWithDuplicatesAllowed(transaction.id, cacheValue) { k, v, existingEntry ->
|
||||
if (existingEntry.status == TransactionStatus.MISSING_NOTARY_SIG) {
|
||||
// TODO verify signatures on passed in transaction include notary (See https://r3-cev.atlassian.net/browse/ENT-9566))
|
||||
session.merge(toPersistentEntity(k, v))
|
||||
true
|
||||
} else false
|
||||
}
|
||||
if (added) {
|
||||
logger.debug { "Transaction ${transaction.id} recorded as unverified." }
|
||||
} else {
|
||||
@ -207,9 +356,9 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
}
|
||||
}
|
||||
|
||||
override fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, Boolean>? {
|
||||
override fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, net.corda.core.flows.TransactionStatus>? {
|
||||
return database.transaction {
|
||||
txStorage.content[id]?.let { it.toSignedTx() to it.status.isVerified() }
|
||||
txStorage.content[id]?.let { it.toSignedTx() to it.status.toTransactionStatus() }
|
||||
}
|
||||
}
|
||||
|
||||
@ -269,16 +418,30 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
|
||||
}
|
||||
|
||||
// Cache value type to just store the immutable bits of a signed transaction plus conversion helpers
|
||||
private data class TxCacheValue(
|
||||
private class TxCacheValue(
|
||||
val txBits: SerializedBytes<CoreTransaction>,
|
||||
val sigs: List<TransactionSignature>,
|
||||
val status: TransactionStatus
|
||||
val status: TransactionStatus,
|
||||
// flow metadata recorded for recovery
|
||||
val metadata: FlowTransactionMetadata? = null
|
||||
) {
|
||||
constructor(stx: SignedTransaction, status: TransactionStatus) : this(
|
||||
stx.txBits,
|
||||
Collections.unmodifiableList(stx.sigs),
|
||||
status)
|
||||
|
||||
status
|
||||
)
|
||||
constructor(stx: SignedTransaction, status: TransactionStatus, metadata: FlowTransactionMetadata?) : this(
|
||||
stx.txBits,
|
||||
Collections.unmodifiableList(stx.sigs),
|
||||
status,
|
||||
metadata
|
||||
)
|
||||
constructor(stx: SignedTransaction, status: TransactionStatus, sigs: List<TransactionSignature>?, metadata: FlowTransactionMetadata?) : this(
|
||||
stx.txBits,
|
||||
if (sigs == null) Collections.unmodifiableList(stx.sigs) else Collections.unmodifiableList(stx.sigs + sigs).distinct(),
|
||||
status,
|
||||
metadata
|
||||
)
|
||||
fun toSignedTx() = SignedTransaction(txBits, sigs)
|
||||
}
|
||||
}
|
||||
|
@ -469,6 +469,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
|
||||
}
|
||||
|
||||
object FinalityDoctor : Staff {
|
||||
@Suppress("ComplexMethod")
|
||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
|
||||
return if (currentState.flowLogic is FinalityHandler) {
|
||||
log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " +
|
||||
@ -480,10 +481,18 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
|
||||
// no need to keep around the flow, since notarisation has already failed at the counterparty.
|
||||
Diagnosis.NOT_MY_SPECIALTY
|
||||
}
|
||||
isErrorPropagatedFromCounterparty(newError) && isErrorThrownDuringReceiveFinalityFlow(newError) -> {
|
||||
// no need to keep around the flow, since notarisation has already failed at the counterparty.
|
||||
Diagnosis.NOT_MY_SPECIALTY
|
||||
}
|
||||
isEndSessionErrorThrownDuringReceiveTransactionFlow(newError) -> {
|
||||
// Typically occurs if the initiating flow catches a notary exception and ends their flow successfully.
|
||||
Diagnosis.NOT_MY_SPECIALTY
|
||||
}
|
||||
isEndSessionErrorThrownDuringReceiveFinalityFlow(newError) -> {
|
||||
// Typically occurs if the initiating flow catches a notary exception and ends their flow successfully.
|
||||
Diagnosis.NOT_MY_SPECIALTY
|
||||
}
|
||||
else -> {
|
||||
log.warn(
|
||||
"Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " +
|
||||
@ -530,6 +539,19 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
|
||||
&& strippedStacktrace.first().className.startsWith(ReceiveTransactionFlow::class.qualifiedName!!)
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will return true if [ReceiveFinalityFlow] is at the top of the stack during the error.
|
||||
* This may happen in the post-notarisation logic of Two Phase Finality upon receiving a notarisation exception
|
||||
* from the peer running [FinalityFlow].
|
||||
*/
|
||||
private fun isErrorThrownDuringReceiveFinalityFlow(error: Throwable): Boolean {
|
||||
val strippedStacktrace = error.stackTrace
|
||||
.filterNot { it?.className?.contains("counter-flow exception from peer") ?: false }
|
||||
.filterNot { it?.className?.startsWith("net.corda.node.services.statemachine.") ?: false }
|
||||
return strippedStacktrace.isNotEmpty()
|
||||
&& strippedStacktrace.first().className.startsWith(ReceiveFinalityFlow::class.qualifiedName!!)
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if an end session error exception was thrown and that it did so within [ReceiveTransactionFlow].
|
||||
*
|
||||
@ -542,6 +564,15 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
|
||||
&& error.message?.contains(StartedFlowTransition.UNEXPECTED_SESSION_END_MESSAGE) == true
|
||||
&& isErrorThrownDuringReceiveTransactionFlow(error)
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if an end session error exception was thrown and that it did so within [ReceiveFinalityFlow].
|
||||
*/
|
||||
private fun isEndSessionErrorThrownDuringReceiveFinalityFlow(error: Throwable): Boolean {
|
||||
return error is UnexpectedFlowEndException
|
||||
&& error.message?.contains(StartedFlowTransition.UNEXPECTED_SESSION_END_MESSAGE) == true
|
||||
&& isErrorThrownDuringReceiveFinalityFlow(error)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -142,18 +142,22 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
|
||||
* Associates the specified value with the specified key in this map and persists it.
|
||||
* If the map previously contained a committed mapping for the key, the old value is not replaced. It may throw an error from the
|
||||
* underlying storage if this races with another database transaction to store a value for the same key.
|
||||
* An optional [forceUpdate] function allows performing additional checks/updates on an existingEntry to determine whether the map
|
||||
* should be updated.
|
||||
* @return true if added key was unique, otherwise false
|
||||
*/
|
||||
fun addWithDuplicatesAllowed(key: K, value: V, logWarning: Boolean = true): Boolean {
|
||||
fun addWithDuplicatesAllowed(key: K, value: V, logWarning: Boolean = true,
|
||||
forceUpdate: (K, V, E) -> Boolean = { _, _, _ -> false }): Boolean {
|
||||
return set(key, value, logWarning) { k, v ->
|
||||
val session = currentDBSession()
|
||||
val existingEntry = session.find(persistentEntityClass, toPersistentEntityKey(k))
|
||||
if (existingEntry == null) {
|
||||
session.save(toPersistentEntity(k, v))
|
||||
null
|
||||
} else {
|
||||
fromPersistentEntity(existingEntry).second
|
||||
}
|
||||
else if (!forceUpdate(key, value, existingEntry)) {
|
||||
fromPersistentEntity(existingEntry).second
|
||||
} else null
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -28,7 +28,8 @@
|
||||
<include file="migration/node-core.changelog-v16.xml"/>
|
||||
<include file="migration/node-core.changelog-v20.xml"/>
|
||||
<include file="migration/node-core.changelog-v22.xml"/>
|
||||
|
||||
<include file="migration/node-core.changelog-v23.xml"/>
|
||||
<include file="migration/node-core.changelog-v24.xml"/>
|
||||
<!-- This must run after node-core.changelog-init.xml, to prevent database columns being created twice. -->
|
||||
<include file="migration/vault-schema.changelog-v9.xml"/>
|
||||
|
||||
|
@ -0,0 +1,12 @@
|
||||
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
|
||||
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
|
||||
logicalFilePath="migration/node-services.changelog-init.xml">
|
||||
|
||||
<changeSet author="R3.Corda" id="add_signatures_column">
|
||||
<addColumn tableName="node_transactions">
|
||||
<column name="signatures" type="VARBINARY(33554432)"/>
|
||||
</addColumn>
|
||||
</changeSet>
|
||||
</databaseChangeLog>
|
@ -0,0 +1,25 @@
|
||||
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
|
||||
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
|
||||
logicalFilePath="migration/node-services.changelog-init.xml">
|
||||
|
||||
<changeSet author="R3.Corda" id="add_flow_metadata_columns">
|
||||
<addColumn tableName="node_transactions">
|
||||
<column name="initiator" type="NVARCHAR(128)">
|
||||
<constraints nullable="true"/>
|
||||
</column>
|
||||
</addColumn>
|
||||
<addColumn tableName="node_transactions">
|
||||
<column name="participants" type="NVARCHAR(1280)">
|
||||
<constraints nullable="true"/>
|
||||
</column>
|
||||
</addColumn>
|
||||
<addColumn tableName="node_transactions">
|
||||
<column name="states_to_record" type="INT">
|
||||
<constraints nullable="true"/>
|
||||
</column>
|
||||
</addColumn>
|
||||
</changeSet>
|
||||
|
||||
</databaseChangeLog>
|
@ -2,9 +2,26 @@ package net.corda.node.messaging
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.contracts.Amount
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.InsufficientBalanceException
|
||||
import net.corda.core.contracts.Issued
|
||||
import net.corda.core.contracts.OwnableState
|
||||
import net.corda.core.contracts.PartyAndReference
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.TransactionVerificationException
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.SignableData
|
||||
import net.corda.core.crypto.SignatureMetadata
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.FlowTransactionMetadata
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.flows.TransactionStatus
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.AnonymousParty
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
@ -38,14 +55,26 @@ import net.corda.node.services.api.WritableTransactionStorage
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
import net.corda.node.services.statemachine.Checkpoint
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.BOC_NAME
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.core.expect
|
||||
import net.corda.testing.core.expectEvents
|
||||
import net.corda.testing.core.sequence
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.dsl.LedgerDSL
|
||||
import net.corda.testing.dsl.TestLedgerDSLInterpreter
|
||||
import net.corda.testing.dsl.TestTransactionDSLInterpreter
|
||||
import net.corda.testing.internal.IS_OPENJ9
|
||||
import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.internal.vault.VaultFiller
|
||||
import net.corda.testing.node.internal.*
|
||||
import net.corda.testing.node.internal.FINANCE_CONTRACTS_CORDAPP
|
||||
import net.corda.testing.node.internal.InternalMockNetwork
|
||||
import net.corda.testing.node.internal.InternalMockNodeParameters
|
||||
import net.corda.testing.node.internal.TestStartedNode
|
||||
import net.corda.testing.node.internal.startFlow
|
||||
import net.corda.testing.node.ledger
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
@ -56,7 +85,11 @@ import org.junit.runner.RunWith
|
||||
import org.junit.runners.Parameterized
|
||||
import rx.Observable
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.util.*
|
||||
import java.util.ArrayList
|
||||
import java.util.Collections
|
||||
import java.util.Currency
|
||||
import java.util.Random
|
||||
import java.util.UUID
|
||||
import java.util.jar.JarOutputStream
|
||||
import java.util.zip.ZipEntry
|
||||
import kotlin.streams.toList
|
||||
@ -139,7 +172,7 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
|
||||
|
||||
// TODO: Verify that the result was inserted into the transaction database.
|
||||
// assertEquals(bobResult.get(), aliceNode.storage.validatedTransactions[aliceResult.get().id])
|
||||
assertEquals(aliceResult.getOrThrow(), bobStateMachine.getOrThrow().resultFuture.getOrThrow())
|
||||
assertEquals(aliceResult.getOrThrow().id, (bobStateMachine.getOrThrow().resultFuture.getOrThrow() as SignedTransaction).id)
|
||||
|
||||
aliceNode.dispose()
|
||||
bobNode.dispose()
|
||||
@ -285,7 +318,7 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
|
||||
mockNet.runNetwork()
|
||||
|
||||
// Bob is now finished and has the same transaction as Alice.
|
||||
assertThat(bobFuture.getOrThrow()).isEqualTo(aliceFuture.getOrThrow())
|
||||
assertThat((bobFuture.getOrThrow() as SignedTransaction).id).isEqualTo((aliceFuture.getOrThrow().id))
|
||||
|
||||
assertThat(bobNode.smm.findStateMachines(Buyer::class.java)).isEmpty()
|
||||
bobNode.database.transaction {
|
||||
@ -768,6 +801,21 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
|
||||
return true
|
||||
}
|
||||
|
||||
override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata?): Boolean {
|
||||
database.transaction {
|
||||
records.add(TxRecord.Add(transaction))
|
||||
delegate.addUnnotarisedTransaction(transaction)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
override fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection<TransactionSignature>) : Boolean {
|
||||
database.transaction {
|
||||
delegate.finalizeTransactionWithExtraSignatures(transaction, signatures)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
override fun addUnverifiedTransaction(transaction: SignedTransaction) {
|
||||
database.transaction {
|
||||
delegate.addUnverifiedTransaction(transaction)
|
||||
@ -781,11 +829,12 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
|
||||
}
|
||||
}
|
||||
|
||||
override fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, Boolean>? {
|
||||
override fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, TransactionStatus>? {
|
||||
return database.transaction {
|
||||
delegate.getTransactionInternal(id)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
interface TxRecord {
|
||||
|
@ -212,7 +212,8 @@ class VaultStateMigrationTest {
|
||||
stateMachineRunId = null,
|
||||
transaction = tx.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes,
|
||||
status = DBTransactionStorage.TransactionStatus.VERIFIED,
|
||||
timestamp = Instant.now()
|
||||
timestamp = Instant.now(),
|
||||
signatures = null
|
||||
)
|
||||
session.save(persistentTx)
|
||||
}
|
||||
|
@ -1,21 +1,32 @@
|
||||
package net.corda.node.services.persistence
|
||||
|
||||
import junit.framework.TestCase.assertNotNull
|
||||
import junit.framework.TestCase.assertTrue
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.SignableData
|
||||
import net.corda.core.crypto.SignatureMetadata
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.crypto.sign
|
||||
import net.corda.core.flows.FlowTransactionMetadata
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.toFuture
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.node.CordaClock
|
||||
import net.corda.node.MutableClock
|
||||
import net.corda.node.SimpleClock
|
||||
import net.corda.node.services.persistence.DBTransactionStorage.TransactionStatus.MISSING_NOTARY_SIG
|
||||
import net.corda.node.services.persistence.DBTransactionStorage.TransactionStatus.UNVERIFIED
|
||||
import net.corda.node.services.persistence.DBTransactionStorage.TransactionStatus.VERIFIED
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
@ -32,17 +43,21 @@ import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import rx.plugins.RxJavaHooks
|
||||
import java.security.KeyPair
|
||||
import java.time.Clock
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.Semaphore
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.concurrent.thread
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFalse
|
||||
import kotlin.test.assertNull
|
||||
|
||||
class DBTransactionStorageTests {
|
||||
private companion object {
|
||||
val ALICE_PUBKEY = TestIdentity(ALICE_NAME, 70).publicKey
|
||||
val DUMMY_NOTARY = TestIdentity(DUMMY_NOTARY_NAME, 20).party
|
||||
val ALICE = TestIdentity(ALICE_NAME, 70)
|
||||
val BOB_PARTY = TestIdentity(BOB_NAME, 80).party
|
||||
val DUMMY_NOTARY = TestIdentity(DUMMY_NOTARY_NAME, 20)
|
||||
}
|
||||
|
||||
@Rule
|
||||
@ -90,6 +105,140 @@ class DBTransactionStorageTests {
|
||||
assertEquals(now, readTransactionTimestampFromDB(transaction.id))
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `create transaction missing notary signature and validate status in db`() {
|
||||
val now = Instant.ofEpochSecond(333444555L)
|
||||
val transactionClock = TransactionClock(now)
|
||||
newTransactionStorage(clock = transactionClock)
|
||||
val transaction = newTransaction()
|
||||
transactionStorage.addUnnotarisedTransaction(transaction)
|
||||
assertEquals(MISSING_NOTARY_SIG, readTransactionFromDB(transaction.id).status)
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `create un-notarised transaction with flow metadata and validate status in db`() {
|
||||
val now = Instant.ofEpochSecond(333444555L)
|
||||
val transactionClock = TransactionClock(now)
|
||||
newTransactionStorage(clock = transactionClock)
|
||||
val transaction = newTransaction()
|
||||
transactionStorage.addUnnotarisedTransaction(transaction, FlowTransactionMetadata(ALICE.party.name, StatesToRecord.ALL_VISIBLE, setOf(BOB_PARTY.name)))
|
||||
val txn = readTransactionFromDB(transaction.id)
|
||||
assertEquals(MISSING_NOTARY_SIG, txn.status)
|
||||
assertEquals(StatesToRecord.ALL_VISIBLE, txn.statesToRecord)
|
||||
assertEquals(ALICE_NAME.toString(), txn.initiator)
|
||||
assertEquals(listOf(BOB_NAME.toString()), txn.participants)
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `finalize transaction with no prior recording of un-notarised transaction`() {
|
||||
val now = Instant.ofEpochSecond(333444555L)
|
||||
val transactionClock = TransactionClock(now)
|
||||
newTransactionStorage(clock = transactionClock)
|
||||
val transaction = newTransaction()
|
||||
transactionStorage.finalizeTransactionWithExtraSignatures(transaction, listOf(notarySig(transaction.id)))
|
||||
readTransactionFromDB(transaction.id).let {
|
||||
assertSignatures(it.transaction, it.signatures, transaction.sigs)
|
||||
assertEquals(VERIFIED, it.status)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `finalize transaction with extra signatures after recording transaction as un-notarised`() {
|
||||
val now = Instant.ofEpochSecond(333444555L)
|
||||
val transactionClock = TransactionClock(now)
|
||||
newTransactionStorage(clock = transactionClock)
|
||||
val transaction = newTransaction(notarySig = false)
|
||||
transactionStorage.addUnnotarisedTransaction(transaction)
|
||||
assertNull(transactionStorage.getTransaction(transaction.id))
|
||||
assertEquals(MISSING_NOTARY_SIG, readTransactionFromDB(transaction.id).status)
|
||||
val notarySig = notarySig(transaction.id)
|
||||
transactionStorage.finalizeTransactionWithExtraSignatures(transaction, listOf(notarySig))
|
||||
readTransactionFromDB(transaction.id).let {
|
||||
assertSignatures(it.transaction, it.signatures, transaction.sigs + notarySig)
|
||||
assertEquals(VERIFIED, it.status)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `finalize unverified transaction and verify no additional signatures are added`() {
|
||||
val now = Instant.ofEpochSecond(333444555L)
|
||||
val transactionClock = TransactionClock(now)
|
||||
newTransactionStorage(clock = transactionClock)
|
||||
val transaction = newTransaction()
|
||||
transactionStorage.addUnverifiedTransaction(transaction)
|
||||
assertNull(transactionStorage.getTransaction(transaction.id))
|
||||
assertEquals(UNVERIFIED, readTransactionFromDB(transaction.id).status)
|
||||
// attempt to finalise with another notary signature
|
||||
transactionStorage.finalizeTransactionWithExtraSignatures(transaction, listOf(notarySig(transaction.id)))
|
||||
readTransactionFromDB(transaction.id).let {
|
||||
assertSignatures(it.transaction, it.signatures, transaction.sigs)
|
||||
assertEquals(VERIFIED, it.status)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `simulate finalize race condition where first transaction trumps follow-up transaction`() {
|
||||
val now = Instant.ofEpochSecond(333444555L)
|
||||
val transactionClock = TransactionClock(now)
|
||||
newTransactionStorage(clock = transactionClock)
|
||||
val transactionWithoutNotarySig = newTransaction(notarySig = false)
|
||||
|
||||
// txn recorded as un-notarised (simulate ReceiverFinalityFlow in initial flow)
|
||||
transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySig)
|
||||
assertEquals(MISSING_NOTARY_SIG, readTransactionFromDB(transactionWithoutNotarySig.id).status)
|
||||
|
||||
// txn then recorded as unverified (simulate ResolveTransactionFlow in follow-up flow)
|
||||
val notarySig = notarySig(transactionWithoutNotarySig.id)
|
||||
transactionStorage.addUnverifiedTransaction(transactionWithoutNotarySig + notarySig)
|
||||
assertEquals(UNVERIFIED, readTransactionFromDB(transactionWithoutNotarySig.id).status)
|
||||
|
||||
// txn finalised with notary signatures (even though in UNVERIFIED state)
|
||||
assertTrue(transactionStorage.finalizeTransactionWithExtraSignatures(transactionWithoutNotarySig, listOf(notarySig)))
|
||||
readTransactionFromDB(transactionWithoutNotarySig.id).let {
|
||||
assertSignatures(it.transaction, it.signatures, transactionWithoutNotarySig.sigs + notarySig)
|
||||
assertEquals(VERIFIED, it.status)
|
||||
}
|
||||
|
||||
// attempt to record follow-up txn
|
||||
assertFalse(transactionStorage.addTransaction(transactionWithoutNotarySig + notarySig))
|
||||
readTransactionFromDB(transactionWithoutNotarySig.id).let {
|
||||
assertSignatures(it.transaction, it.signatures, transactionWithoutNotarySig.sigs + notarySig)
|
||||
assertEquals(VERIFIED, it.status)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `simulate finalize race condition where follow-up transaction races ahead of initial transaction`() {
|
||||
val now = Instant.ofEpochSecond(333444555L)
|
||||
val transactionClock = TransactionClock(now)
|
||||
newTransactionStorage(clock = transactionClock)
|
||||
val transactionWithoutNotarySigs = newTransaction(notarySig = false)
|
||||
|
||||
// txn recorded as un-notarised (simulate ReceiverFinalityFlow in initial flow)
|
||||
transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySigs)
|
||||
assertEquals(MISSING_NOTARY_SIG, readTransactionFromDB(transactionWithoutNotarySigs.id).status)
|
||||
|
||||
// txn then recorded as unverified (simulate ResolveTransactionFlow in follow-up flow)
|
||||
val notarySig = notarySig(transactionWithoutNotarySigs.id)
|
||||
val transactionWithNotarySigs = transactionWithoutNotarySigs + notarySig
|
||||
transactionStorage.addUnverifiedTransaction(transactionWithNotarySigs)
|
||||
assertEquals(UNVERIFIED, readTransactionFromDB(transactionWithoutNotarySigs.id).status)
|
||||
|
||||
// txn then recorded as verified (simulate ResolveTransactions recording in follow-up flow)
|
||||
assertTrue(transactionStorage.addTransaction(transactionWithNotarySigs))
|
||||
readTransactionFromDB(transactionWithoutNotarySigs.id).let {
|
||||
assertSignatures(it.transaction, it.signatures, expectedSigs = transactionWithNotarySigs.sigs)
|
||||
assertEquals(VERIFIED, it.status)
|
||||
}
|
||||
|
||||
// attempt to finalise original txn
|
||||
assertFalse(transactionStorage.finalizeTransactionWithExtraSignatures(transactionWithoutNotarySigs, listOf(notarySig)))
|
||||
readTransactionFromDB(transactionWithoutNotarySigs.id).let {
|
||||
assertSignatures(it.transaction, it.signatures, expectedSigs = transactionWithNotarySigs.sigs)
|
||||
assertEquals(VERIFIED, it.status)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `create unverified then verified transaction and validate timestamps in db`() {
|
||||
val unverifiedTime = Instant.ofEpochSecond(555666777L)
|
||||
@ -175,6 +324,17 @@ class DBTransactionStorageTests {
|
||||
return fromDb[0].timestamp
|
||||
}
|
||||
|
||||
private fun readTransactionFromDB(id: SecureHash): DBTransactionStorage.DBTransaction {
|
||||
val fromDb = database.transaction {
|
||||
session.createQuery(
|
||||
"from ${DBTransactionStorage.DBTransaction::class.java.name} where tx_id = :transactionId",
|
||||
DBTransactionStorage.DBTransaction::class.java
|
||||
).setParameter("transactionId", id.toString()).resultList.map { it }
|
||||
}
|
||||
assertEquals(1, fromDb.size)
|
||||
return fromDb[0]
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `empty store`() {
|
||||
assertThat(transactionStorage.getTransaction(newTransaction().id)).isNull()
|
||||
@ -369,7 +529,7 @@ class DBTransactionStorageTests {
|
||||
|
||||
// Assert
|
||||
|
||||
assertThat(result).isNotNull()
|
||||
assertThat(result).isNotNull
|
||||
assertThat(result?.get(20, TimeUnit.SECONDS)?.id).isEqualTo(signedTransaction.id)
|
||||
}
|
||||
|
||||
@ -399,18 +559,36 @@ class DBTransactionStorageTests {
|
||||
assertThat(transactionStorage.getTransaction(transaction.id)).isEqualTo(transaction)
|
||||
}
|
||||
|
||||
private fun newTransaction(): SignedTransaction {
|
||||
private fun newTransaction(notarySig: Boolean = true): SignedTransaction {
|
||||
val wtx = createWireTransaction(
|
||||
inputs = listOf(StateRef(SecureHash.randomSHA256(), 0)),
|
||||
attachments = emptyList(),
|
||||
outputs = emptyList(),
|
||||
commands = listOf(dummyCommand()),
|
||||
notary = DUMMY_NOTARY,
|
||||
commands = listOf(dummyCommand(ALICE.publicKey)),
|
||||
notary = DUMMY_NOTARY.party,
|
||||
timeWindow = null
|
||||
)
|
||||
return SignedTransaction(
|
||||
wtx,
|
||||
listOf(TransactionSignature(ByteArray(1), ALICE_PUBKEY, SignatureMetadata(1, Crypto.findSignatureScheme(ALICE_PUBKEY).schemeNumberID)))
|
||||
)
|
||||
return makeSigned(wtx, ALICE.keyPair, notarySig = notarySig)
|
||||
}
|
||||
|
||||
private fun makeSigned(wtx: WireTransaction, vararg keys: KeyPair, notarySig: Boolean = true): SignedTransaction {
|
||||
val keySigs = keys.map { it.sign(SignableData(wtx.id, SignatureMetadata(1, Crypto.findSignatureScheme(it.public).schemeNumberID))) }
|
||||
val sigs = if (notarySig) {
|
||||
keySigs + notarySig(wtx.id)
|
||||
} else {
|
||||
keySigs
|
||||
}
|
||||
return SignedTransaction(wtx, sigs)
|
||||
}
|
||||
|
||||
private fun notarySig(txId: SecureHash) =
|
||||
DUMMY_NOTARY.keyPair.sign(SignableData(txId, SignatureMetadata(1, Crypto.findSignatureScheme(DUMMY_NOTARY.publicKey).schemeNumberID)))
|
||||
|
||||
private fun assertSignatures(transaction: ByteArray, extraSigs: ByteArray?,
|
||||
expectedSigs: List<TransactionSignature>) {
|
||||
assertNotNull(extraSigs)
|
||||
assertEquals(expectedSigs,
|
||||
(transaction.deserialize<SignedTransaction>(context = DBTransactionStorage.contextToUse()).sigs +
|
||||
extraSigs!!.deserialize<List<TransactionSignature>>()).distinct())
|
||||
}
|
||||
}
|
||||
|
@ -319,7 +319,7 @@ class RetryFlowMockTest {
|
||||
|
||||
private fun doInsert() {
|
||||
val tx = DBTransactionStorage.DBTransaction("Foo", null, Utils.EMPTY_BYTES,
|
||||
DBTransactionStorage.TransactionStatus.VERIFIED, Instant.now())
|
||||
DBTransactionStorage.TransactionStatus.VERIFIED, Instant.now(), null)
|
||||
contextTransaction.session.save(tx)
|
||||
}
|
||||
}
|
||||
|
@ -13,27 +13,47 @@ import net.corda.core.identity.Party
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.internal.PLATFORM_VERSION
|
||||
import net.corda.core.internal.requireSupportedHashType
|
||||
import net.corda.core.internal.telemetry.TelemetryComponent
|
||||
import net.corda.core.internal.telemetry.TelemetryServiceImpl
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.messaging.FlowHandle
|
||||
import net.corda.core.messaging.FlowProgressHandle
|
||||
import net.corda.core.messaging.StateMachineTransactionMapping
|
||||
import net.corda.core.node.*
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.node.AppServiceHub
|
||||
import net.corda.core.node.NetworkParameters
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.node.ServicesForResolution
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.node.services.ContractUpgradeService
|
||||
import net.corda.core.node.services.CordaService
|
||||
import net.corda.core.node.services.IdentityService
|
||||
import net.corda.core.node.services.KeyManagementService
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.core.node.services.NetworkParametersService
|
||||
import net.corda.core.node.services.ServiceLifecycleObserver
|
||||
import net.corda.core.node.services.TransactionStorage
|
||||
import net.corda.core.node.services.TransactionVerifierService
|
||||
import net.corda.core.node.services.VaultService
|
||||
import net.corda.core.node.services.diagnostics.DiagnosticsService
|
||||
import net.corda.core.internal.telemetry.TelemetryComponent
|
||||
import net.corda.core.internal.telemetry.TelemetryServiceImpl
|
||||
import net.corda.core.node.services.vault.CordaTransactionSupport
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.coretesting.internal.DEV_ROOT_CA
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.internal.ServicesForResolutionImpl
|
||||
import net.corda.node.internal.cordapp.JarScanningCordappLoader
|
||||
import net.corda.node.services.api.*
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.api.StateMachineRecordedTransactionMappingStorage
|
||||
import net.corda.node.services.api.VaultServiceInternal
|
||||
import net.corda.node.services.api.WritableTransactionStorage
|
||||
import net.corda.node.services.diagnostics.NodeDiagnosticsService
|
||||
import net.corda.node.services.identity.InMemoryIdentityService
|
||||
import net.corda.node.services.identity.PersistentIdentityService
|
||||
import net.corda.node.services.keys.BasicHSMKeyManagementService
|
||||
import net.corda.node.services.network.PersistentNetworkMapCache
|
||||
import net.corda.node.services.persistence.PublicKeyToOwningIdentityCacheImpl
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
||||
@ -44,12 +64,16 @@ import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.nodeapi.internal.persistence.contextTransaction
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.coretesting.internal.DEV_ROOT_CA
|
||||
import net.corda.node.services.network.PersistentNetworkMapCache
|
||||
import net.corda.testing.internal.MockCordappProvider
|
||||
import net.corda.testing.internal.TestingNamedCacheFactory
|
||||
import net.corda.testing.internal.configureDatabase
|
||||
import net.corda.testing.node.internal.*
|
||||
import net.corda.testing.node.internal.DriverDSLImpl
|
||||
import net.corda.testing.node.internal.MockCryptoService
|
||||
import net.corda.testing.node.internal.MockKeyManagementService
|
||||
import net.corda.testing.node.internal.MockNetworkParametersStorage
|
||||
import net.corda.testing.node.internal.MockTransactionStorage
|
||||
import net.corda.testing.node.internal.cordappsForPackages
|
||||
import net.corda.testing.node.internal.getCallerPackage
|
||||
import net.corda.testing.services.MockAttachmentStorage
|
||||
import java.io.ByteArrayOutputStream
|
||||
import java.nio.file.Paths
|
||||
@ -57,7 +81,7 @@ import java.security.KeyPair
|
||||
import java.sql.Connection
|
||||
import java.time.Clock
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.Properties
|
||||
import java.util.function.Consumer
|
||||
import java.util.jar.JarFile
|
||||
import java.util.zip.ZipEntry
|
||||
|
@ -569,12 +569,15 @@ open class InternalMockNetwork(cordappPackages: List<String> = emptyList(),
|
||||
val allActiveFlows = allNodes.flatMap { it.smm.snapshot() }
|
||||
|
||||
return allActiveFlows.any {
|
||||
val flowState = it.snapshot().checkpoint.flowState
|
||||
flowState is FlowState.Started && when (flowState.flowIORequest) {
|
||||
is FlowIORequest.ExecuteAsyncOperation -> true
|
||||
is FlowIORequest.Sleep -> true
|
||||
else -> false
|
||||
}
|
||||
val flowSnapshot = it.snapshot()
|
||||
if (!flowSnapshot.isFlowResumed && flowSnapshot.isWaitingForFuture) {
|
||||
val flowState = flowSnapshot.checkpoint.flowState
|
||||
flowState is FlowState.Started && when (flowState.flowIORequest) {
|
||||
is FlowIORequest.ExecuteAsyncOperation -> true
|
||||
is FlowIORequest.Sleep -> true
|
||||
else -> false
|
||||
}
|
||||
} else false
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2,12 +2,15 @@ package net.corda.testing.node.internal
|
||||
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.internal.concurrent.doneFuture
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.toFuture
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.node.services.api.WritableTransactionStorage
|
||||
import net.corda.core.flows.FlowTransactionMetadata
|
||||
import net.corda.core.flows.TransactionStatus
|
||||
import net.corda.testing.node.MockServices
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
@ -42,11 +45,23 @@ open class MockTransactionStorage : WritableTransactionStorage, SingletonSeriali
|
||||
}
|
||||
|
||||
override fun addTransaction(transaction: SignedTransaction): Boolean {
|
||||
val current = txns.putIfAbsent(transaction.id, TxHolder(transaction, isVerified = true))
|
||||
val current = txns.putIfAbsent(transaction.id, TxHolder(transaction, status = TransactionStatus.VERIFIED))
|
||||
return if (current == null) {
|
||||
notify(transaction)
|
||||
} else if (!current.isVerified) {
|
||||
current.isVerified = true
|
||||
notify(transaction)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata?): Boolean {
|
||||
return txns.putIfAbsent(transaction.id, TxHolder(transaction, status = TransactionStatus.MISSING_NOTARY_SIG)) == null
|
||||
}
|
||||
|
||||
override fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection<TransactionSignature>): Boolean {
|
||||
val current = txns.replace(transaction.id, TxHolder(transaction, status = TransactionStatus.VERIFIED))
|
||||
return if (current != null) {
|
||||
notify(transaction)
|
||||
} else {
|
||||
false
|
||||
@ -54,12 +69,14 @@ open class MockTransactionStorage : WritableTransactionStorage, SingletonSeriali
|
||||
}
|
||||
|
||||
override fun addUnverifiedTransaction(transaction: SignedTransaction) {
|
||||
txns.putIfAbsent(transaction.id, TxHolder(transaction, isVerified = false))
|
||||
txns.putIfAbsent(transaction.id, TxHolder(transaction, status = TransactionStatus.UNVERIFIED))
|
||||
}
|
||||
|
||||
override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id]?.let { if (it.isVerified) it.stx else null }
|
||||
override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id]?.let { if (it.status == TransactionStatus.VERIFIED) it.stx else null }
|
||||
|
||||
override fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, Boolean>? = txns[id]?.let { Pair(it.stx, it.isVerified) }
|
||||
override fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, TransactionStatus>? = txns[id]?.let { Pair(it.stx, it.status) }
|
||||
|
||||
private class TxHolder(val stx: SignedTransaction, var isVerified: Boolean)
|
||||
private class TxHolder(val stx: SignedTransaction, var status: TransactionStatus) {
|
||||
val isVerified = status == TransactionStatus.VERIFIED
|
||||
}
|
||||
}
|
@ -6,12 +6,15 @@ import net.corda.core.contracts.*
|
||||
import net.corda.core.cordapp.CordappProvider
|
||||
import net.corda.core.crypto.NullKeys.NULL_SIGNATURE
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.flows.FlowTransactionMetadata
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.internal.notary.NotaryService
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.node.ServicesForResolution
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.node.services.AttachmentId
|
||||
import net.corda.core.node.services.TransactionStorage
|
||||
import net.corda.core.serialization.internal.AttachmentsClassLoaderCache
|
||||
@ -134,6 +137,10 @@ data class TestTransactionDSLInterpreter private constructor(
|
||||
override val notaryService: NotaryService? = null
|
||||
|
||||
override val attachmentsClassLoaderCache: AttachmentsClassLoaderCache = AttachmentsClassLoaderCacheImpl(TestingNamedCacheFactory())
|
||||
|
||||
override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata?) {}
|
||||
|
||||
override fun finalizeTransactionWithExtraSignatures(txn: SignedTransaction, sigs: Collection<TransactionSignature>, statesToRecord: StatesToRecord) {}
|
||||
}
|
||||
|
||||
private fun copy(): TestTransactionDSLInterpreter =
|
||||
|
Loading…
x
Reference in New Issue
Block a user