[CORDA-2402] Ensure out of order transactions result in correct vault state (#4559)

* Pass states to record through to transaction resolution

* Add a test case

* Add comment indicating why states are always added in tx resolution

* Update observer node documentation
This commit is contained in:
JamesHR3 2019-01-13 15:03:53 +00:00 committed by GitHub
parent 7a4b6b3e44
commit 36cd9b9791
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 134 additions and 10 deletions

View File

@ -43,7 +43,7 @@ open class ReceiveTransactionFlow @JvmOverloads constructor(private val otherSid
it.pushToLoggingContext()
logger.info("Received transaction acknowledgement request from party ${otherSideSession.counterparty}.")
checkParameterHash(it.networkParametersHash)
subFlow(ResolveTransactionsFlow(it, otherSideSession))
subFlow(ResolveTransactionsFlow(it, otherSideSession, statesToRecord))
logger.info("Transaction dependencies resolution completed.")
try {
it.verify(serviceHub, checkSufficientSignatures)

View File

@ -22,7 +22,9 @@ import kotlin.math.min
* Each retrieved transaction is validated and inserted into the local transaction storage.
*/
@DeleteForDJVM
class ResolveTransactionsFlow(txHashesArg: Set<SecureHash>, private val otherSide: FlowSession) : FlowLogic<Unit>() {
class ResolveTransactionsFlow(txHashesArg: Set<SecureHash>,
private val otherSide: FlowSession,
private val statesToRecord: StatesToRecord = StatesToRecord.NONE) : FlowLogic<Unit>() {
// Need it ordered in terms of iteration. Needs to be a variable for the check-pointing logic to work.
private val txHashes = txHashesArg.toList()
@ -37,6 +39,10 @@ class ResolveTransactionsFlow(txHashesArg: Set<SecureHash>, private val otherSid
this.signedTransaction = signedTransaction
}
constructor(signedTransaction: SignedTransaction, otherSide: FlowSession, statesToRecord: StatesToRecord) : this(dependencyIDs(signedTransaction), otherSide, statesToRecord) {
this.signedTransaction = signedTransaction
}
@DeleteForDJVM
companion object {
private fun dependencyIDs(stx: SignedTransaction) = stx.inputs.map { it.txhash }.toSet() + stx.references.map { it.txhash }.toSet()
@ -84,13 +90,16 @@ class ResolveTransactionsFlow(txHashesArg: Set<SecureHash>, private val otherSid
// Finish fetching data.
val result = topologicalSort(newTxns)
// If transaction resolution is performed for a transaction where some states are relevant, then those should be
// recorded if this has not already occurred.
val usedStatesToRecord = if (statesToRecord == StatesToRecord.NONE) StatesToRecord.ONLY_RELEVANT else statesToRecord
result.forEach {
// For each transaction, verify it and insert it into the database. As we are iterating over them in a
// depth-first order, we should not encounter any verification failures due to missing data. If we fail
// half way through, it's no big deal, although it might result in us attempting to re-download data
// redundantly next time we attempt verification.
it.verify(serviceHub)
serviceHub.recordTransactions(StatesToRecord.NONE, listOf(it))
serviceHub.recordTransactions(usedStatesToRecord, listOf(it))
}
}

View File

@ -47,8 +47,8 @@ Caveats
later re-record the same transaction as an observer. This issue is tracked here:
https://r3-cev.atlassian.net/browse/CORDA-883
* Observer nodes will only record the states of the transactions sent to them, and not any states from any previous
transactions in the chain. If the observer node is required to follow the creation and deletion of states, then each
transaction in the chain involving those states must be sent individually. This is because the observer node does not
necessarily have any visibility into the states of intermediate transactions, and so cannot always determine whether
a previous state has been consumed when a new transaction is received.
* When an observer node is sent a transaction with the ALL_VISIBLE flag set, any transactions in the transaction history
that have not already been received will also have ALL_VISIBLE states recorded. This mean a node that is both an observer
and a participant may have some transactions with all states recorded and some with only relevant states recorded, even
if those transactions are part of the same chain. As a result, there may be more states present in the vault than would be
expected if just those transactions sent with the ALL_VISIBLE recording flag were processed in this way.

View File

@ -15,8 +15,8 @@ import javax.persistence.Table
@CordaSerializable
data class MessageData(val value: String)
data class MessageChainState(val message: MessageData, val by: Party, override val linearId: UniqueIdentifier = UniqueIdentifier()) : LinearState, QueryableState {
override val participants: List<AbstractParty> = listOf(by)
data class MessageChainState(val message: MessageData, val by: Party, override val linearId: UniqueIdentifier = UniqueIdentifier(), val extraParty: Party? = null) : LinearState, QueryableState {
override val participants: List<AbstractParty> = if (extraParty == null) listOf(by) else listOf(by, extraParty)
override fun generateMappedObject(schema: MappedSchema): PersistentState {
return when (schema) {

View File

@ -0,0 +1,115 @@
package net.corda.node.services.persistence
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.Command
import net.corda.core.contracts.StateAndContract
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.packageName
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.cordappWithPackages
import net.corda.testing.node.internal.startFlow
import org.junit.After
import org.junit.Before
import org.junit.Test
import kotlin.test.assertEquals
class TransactionOrderingTests {
private lateinit var mockNet: InternalMockNetwork
@Before
fun start() {
mockNet = InternalMockNetwork(
cordappsForAllNodes = listOf(cordappWithPackages(MessageChainState::class.packageName)),
networkSendManuallyPumped = false,
threadPerNode = true)
}
@After
fun cleanUp() {
mockNet.stopNodes()
}
@Test
fun `Out of order transactions are recorded in vault correctly`() {
val alice = mockNet.createPartyNode(ALICE_NAME)
val aliceID = alice.info.identityFromX500Name(ALICE_NAME)
val bob = mockNet.createPartyNode(BOB_NAME)
val bobID = bob.info.identityFromX500Name(BOB_NAME)
bob.registerInitiatedFlow(ReceiveTx::class.java)
val notary = mockNet.defaultNotaryNode
val notaryID = mockNet.defaultNotaryIdentity
fun signTx(txBuilder: TransactionBuilder): SignedTransaction {
val first = alice.services.signInitialTransaction(txBuilder)
val second = bob.services.addSignature(first)
return notary.services.addSignature(second)
}
val state1 = MessageChainState(MessageData("A"), aliceID, extraParty = bobID)
val command = Command(MessageChainContract.Commands.Send(), state1.participants.map {it.owningKey})
val tx1Builder = TransactionBuilder(notaryID).withItems(
StateAndContract(state1, MESSAGE_CHAIN_CONTRACT_PROGRAM_ID),
command)
val stx1 = signTx(tx1Builder)
val state2 = MessageChainState(MessageData("AA"), aliceID, state1.linearId, extraParty = bobID)
val tx2Builder = TransactionBuilder(notaryID).withItems(
StateAndContract(state2, MESSAGE_CHAIN_CONTRACT_PROGRAM_ID),
command,
StateAndRef(stx1.coreTransaction.outputs[0], StateRef(stx1.coreTransaction.id, 0))
)
val stx2 = signTx(tx2Builder)
val state3 = MessageChainState(MessageData("AAA"), aliceID, state1.linearId, extraParty = bobID)
val tx3Builder = TransactionBuilder(notaryID).withItems(
StateAndContract(state3, MESSAGE_CHAIN_CONTRACT_PROGRAM_ID),
command,
StateAndRef(stx2.coreTransaction.outputs[0], StateRef(stx2.coreTransaction.id, 0))
)
val stx3 = signTx(tx3Builder)
alice.services.recordTransactions(listOf(stx1, stx2, stx3))
alice.services.startFlow(SendTx(bobID, stx3)).resultFuture.getOrThrow()
alice.services.startFlow(SendTx(bobID, stx1)).resultFuture.getOrThrow()
alice.services.startFlow(SendTx(bobID, stx2)).resultFuture.getOrThrow()
val queryCriteria = QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL)
val bobStates = bob.services.vaultService.queryBy(MessageChainState::class.java, queryCriteria)
assertEquals(3, bobStates.states.size)
}
}
@InitiatingFlow
@StartableByRPC
class SendTx(private val party: Party,
private val stx: SignedTransaction) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(party)
subFlow(SendTransactionFlow(session, stx))
session.receive<Unit>()
}
}
@InitiatedBy(SendTx::class)
class ReceiveTx(private val otherSideSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
subFlow(ReceiveTransactionFlow(otherSideSession, true, StatesToRecord.ONLY_RELEVANT))
otherSideSession.send(Unit)
}
}