From 36cd9b979189e338b61215938db5c01e1a1b0087 Mon Sep 17 00:00:00 2001 From: JamesHR3 <45565019+JamesHR3@users.noreply.github.com> Date: Sun, 13 Jan 2019 15:03:53 +0000 Subject: [PATCH] [CORDA-2402] Ensure out of order transactions result in correct vault state (#4559) * Pass states to record through to transaction resolution * Add a test case * Add comment indicating why states are always added in tx resolution * Update observer node documentation --- .../core/flows/ReceiveTransactionFlow.kt | 2 +- .../core/internal/ResolveTransactionsFlow.kt | 13 +- docs/source/tutorial-observer-nodes.rst | 10 +- .../services/persistence/MessageChainState.kt | 4 +- .../persistence/TransactionOrderingTests.kt | 115 ++++++++++++++++++ 5 files changed, 134 insertions(+), 10 deletions(-) create mode 100644 node/src/test/kotlin/net/corda/node/services/persistence/TransactionOrderingTests.kt diff --git a/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt b/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt index 189e19ad75..41b61ff15e 100644 --- a/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt @@ -43,7 +43,7 @@ open class ReceiveTransactionFlow @JvmOverloads constructor(private val otherSid it.pushToLoggingContext() logger.info("Received transaction acknowledgement request from party ${otherSideSession.counterparty}.") checkParameterHash(it.networkParametersHash) - subFlow(ResolveTransactionsFlow(it, otherSideSession)) + subFlow(ResolveTransactionsFlow(it, otherSideSession, statesToRecord)) logger.info("Transaction dependencies resolution completed.") try { it.verify(serviceHub, checkSufficientSignatures) diff --git a/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt b/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt index 2fd98ab265..5816266db2 100644 --- a/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt +++ b/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt @@ -22,7 +22,9 @@ import kotlin.math.min * Each retrieved transaction is validated and inserted into the local transaction storage. */ @DeleteForDJVM -class ResolveTransactionsFlow(txHashesArg: Set, private val otherSide: FlowSession) : FlowLogic() { +class ResolveTransactionsFlow(txHashesArg: Set, + private val otherSide: FlowSession, + private val statesToRecord: StatesToRecord = StatesToRecord.NONE) : FlowLogic() { // Need it ordered in terms of iteration. Needs to be a variable for the check-pointing logic to work. private val txHashes = txHashesArg.toList() @@ -37,6 +39,10 @@ class ResolveTransactionsFlow(txHashesArg: Set, private val otherSid this.signedTransaction = signedTransaction } + constructor(signedTransaction: SignedTransaction, otherSide: FlowSession, statesToRecord: StatesToRecord) : this(dependencyIDs(signedTransaction), otherSide, statesToRecord) { + this.signedTransaction = signedTransaction + } + @DeleteForDJVM companion object { private fun dependencyIDs(stx: SignedTransaction) = stx.inputs.map { it.txhash }.toSet() + stx.references.map { it.txhash }.toSet() @@ -84,13 +90,16 @@ class ResolveTransactionsFlow(txHashesArg: Set, private val otherSid // Finish fetching data. val result = topologicalSort(newTxns) + // If transaction resolution is performed for a transaction where some states are relevant, then those should be + // recorded if this has not already occurred. + val usedStatesToRecord = if (statesToRecord == StatesToRecord.NONE) StatesToRecord.ONLY_RELEVANT else statesToRecord result.forEach { // For each transaction, verify it and insert it into the database. As we are iterating over them in a // depth-first order, we should not encounter any verification failures due to missing data. If we fail // half way through, it's no big deal, although it might result in us attempting to re-download data // redundantly next time we attempt verification. it.verify(serviceHub) - serviceHub.recordTransactions(StatesToRecord.NONE, listOf(it)) + serviceHub.recordTransactions(usedStatesToRecord, listOf(it)) } } diff --git a/docs/source/tutorial-observer-nodes.rst b/docs/source/tutorial-observer-nodes.rst index 984230d7e4..9d3cdcfe75 100644 --- a/docs/source/tutorial-observer-nodes.rst +++ b/docs/source/tutorial-observer-nodes.rst @@ -47,8 +47,8 @@ Caveats later re-record the same transaction as an observer. This issue is tracked here: https://r3-cev.atlassian.net/browse/CORDA-883 -* Observer nodes will only record the states of the transactions sent to them, and not any states from any previous - transactions in the chain. If the observer node is required to follow the creation and deletion of states, then each - transaction in the chain involving those states must be sent individually. This is because the observer node does not - necessarily have any visibility into the states of intermediate transactions, and so cannot always determine whether - a previous state has been consumed when a new transaction is received. +* When an observer node is sent a transaction with the ALL_VISIBLE flag set, any transactions in the transaction history + that have not already been received will also have ALL_VISIBLE states recorded. This mean a node that is both an observer + and a participant may have some transactions with all states recorded and some with only relevant states recorded, even + if those transactions are part of the same chain. As a result, there may be more states present in the vault than would be + expected if just those transactions sent with the ALL_VISIBLE recording flag were processed in this way. diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/MessageChainState.kt b/node/src/test/kotlin/net/corda/node/services/persistence/MessageChainState.kt index a81f796756..37dfdbb61e 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/MessageChainState.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/MessageChainState.kt @@ -15,8 +15,8 @@ import javax.persistence.Table @CordaSerializable data class MessageData(val value: String) -data class MessageChainState(val message: MessageData, val by: Party, override val linearId: UniqueIdentifier = UniqueIdentifier()) : LinearState, QueryableState { - override val participants: List = listOf(by) +data class MessageChainState(val message: MessageData, val by: Party, override val linearId: UniqueIdentifier = UniqueIdentifier(), val extraParty: Party? = null) : LinearState, QueryableState { + override val participants: List = if (extraParty == null) listOf(by) else listOf(by, extraParty) override fun generateMappedObject(schema: MappedSchema): PersistentState { return when (schema) { diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/TransactionOrderingTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/TransactionOrderingTests.kt new file mode 100644 index 0000000000..507c72bd03 --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/persistence/TransactionOrderingTests.kt @@ -0,0 +1,115 @@ +package net.corda.node.services.persistence + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.contracts.Command +import net.corda.core.contracts.StateAndContract +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.StateRef +import net.corda.core.flows.* +import net.corda.core.identity.Party +import net.corda.core.internal.packageName +import net.corda.core.node.StatesToRecord +import net.corda.core.node.services.Vault +import net.corda.core.node.services.vault.QueryCriteria +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.getOrThrow +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.node.internal.InternalMockNetwork +import net.corda.testing.node.internal.cordappWithPackages +import net.corda.testing.node.internal.startFlow +import org.junit.After +import org.junit.Before +import org.junit.Test +import kotlin.test.assertEquals + +class TransactionOrderingTests { + private lateinit var mockNet: InternalMockNetwork + + @Before + fun start() { + mockNet = InternalMockNetwork( + cordappsForAllNodes = listOf(cordappWithPackages(MessageChainState::class.packageName)), + networkSendManuallyPumped = false, + threadPerNode = true) + } + + @After + fun cleanUp() { + mockNet.stopNodes() + } + + @Test + fun `Out of order transactions are recorded in vault correctly`() { + val alice = mockNet.createPartyNode(ALICE_NAME) + val aliceID = alice.info.identityFromX500Name(ALICE_NAME) + + val bob = mockNet.createPartyNode(BOB_NAME) + val bobID = bob.info.identityFromX500Name(BOB_NAME) + bob.registerInitiatedFlow(ReceiveTx::class.java) + + val notary = mockNet.defaultNotaryNode + val notaryID = mockNet.defaultNotaryIdentity + + fun signTx(txBuilder: TransactionBuilder): SignedTransaction { + val first = alice.services.signInitialTransaction(txBuilder) + val second = bob.services.addSignature(first) + return notary.services.addSignature(second) + } + + val state1 = MessageChainState(MessageData("A"), aliceID, extraParty = bobID) + val command = Command(MessageChainContract.Commands.Send(), state1.participants.map {it.owningKey}) + val tx1Builder = TransactionBuilder(notaryID).withItems( + StateAndContract(state1, MESSAGE_CHAIN_CONTRACT_PROGRAM_ID), + command) + val stx1 = signTx(tx1Builder) + + val state2 = MessageChainState(MessageData("AA"), aliceID, state1.linearId, extraParty = bobID) + val tx2Builder = TransactionBuilder(notaryID).withItems( + StateAndContract(state2, MESSAGE_CHAIN_CONTRACT_PROGRAM_ID), + command, + StateAndRef(stx1.coreTransaction.outputs[0], StateRef(stx1.coreTransaction.id, 0)) + ) + val stx2 = signTx(tx2Builder) + + val state3 = MessageChainState(MessageData("AAA"), aliceID, state1.linearId, extraParty = bobID) + val tx3Builder = TransactionBuilder(notaryID).withItems( + StateAndContract(state3, MESSAGE_CHAIN_CONTRACT_PROGRAM_ID), + command, + StateAndRef(stx2.coreTransaction.outputs[0], StateRef(stx2.coreTransaction.id, 0)) + ) + val stx3 = signTx(tx3Builder) + + alice.services.recordTransactions(listOf(stx1, stx2, stx3)) + + alice.services.startFlow(SendTx(bobID, stx3)).resultFuture.getOrThrow() + alice.services.startFlow(SendTx(bobID, stx1)).resultFuture.getOrThrow() + alice.services.startFlow(SendTx(bobID, stx2)).resultFuture.getOrThrow() + + val queryCriteria = QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL) + val bobStates = bob.services.vaultService.queryBy(MessageChainState::class.java, queryCriteria) + assertEquals(3, bobStates.states.size) + } +} + +@InitiatingFlow +@StartableByRPC +class SendTx(private val party: Party, + private val stx: SignedTransaction) : FlowLogic() { + @Suspendable + override fun call() { + val session = initiateFlow(party) + subFlow(SendTransactionFlow(session, stx)) + session.receive() + } +} + +@InitiatedBy(SendTx::class) +class ReceiveTx(private val otherSideSession: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + subFlow(ReceiveTransactionFlow(otherSideSession, true, StatesToRecord.ONLY_RELEVANT)) + otherSideSession.send(Unit) + } +} \ No newline at end of file