From 075f68f179d85ca7458eab8333760a38d50e1b70 Mon Sep 17 00:00:00 2001 From: James Higgs <45565019+JamesHR3@users.noreply.github.com> Date: Thu, 4 Jul 2019 14:00:42 +0100 Subject: [PATCH] [CORDA-2086] Allow transactions to be re-recorded using StatesToRecord.ALL_VISIBLE (#5184) --- .../core/flows/ReferencedStatesFlowTests.kt | 67 ++++- docs/source/changelog.rst | 7 + docs/source/tutorial-observer-nodes.rst | 8 +- .../node/services/api/ServiceHubInternal.kt | 13 +- .../node/services/api/VaultServiceInternal.kt | 7 +- .../node/services/vault/NodeVaultService.kt | 71 ++++-- .../ObserverNodeTransactionTests.kt | 239 +++++++++++++++--- .../corda/node/testing/MessageChainState.kt | 17 +- 8 files changed, 349 insertions(+), 80 deletions(-) diff --git a/core/src/test/kotlin/net/corda/core/flows/ReferencedStatesFlowTests.kt b/core/src/test/kotlin/net/corda/core/flows/ReferencedStatesFlowTests.kt index b64b60be49..d85ed222b1 100644 --- a/core/src/test/kotlin/net/corda/core/flows/ReferencedStatesFlowTests.kt +++ b/core/src/test/kotlin/net/corda/core/flows/ReferencedStatesFlowTests.kt @@ -8,7 +8,6 @@ import net.corda.core.node.StatesToRecord import net.corda.core.node.services.Vault import net.corda.core.node.services.queryBy import net.corda.core.node.services.vault.QueryCriteria -import net.corda.core.toFuture import net.corda.core.transactions.LedgerTransaction import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder @@ -32,7 +31,7 @@ class ReferencedStatesFlowTests { @Before fun setup() { - nodes = (0..1).map { + nodes = (0..2).map { mockNet.createNode( parameters = InternalMockNodeParameters(version = VersionInfo(4, "Blah", "Blah", "Blah")) ) @@ -77,7 +76,7 @@ class ReferencedStatesFlowTests { // 2. Use the "newRefState" a transaction involving another party (nodes[1]) which creates a new state. They should store the new state and the reference state. val newTx = nodes[0].services.startFlow(UseRefState(nodes[1].info.legalIdentities.first(), newRefState.state.data.linearId)).resultFuture.getOrThrow() // Wait until node 1 stores the new tx. - nodes[1].services.validatedTransactions.updates.filter { it.id == newTx.id }.toFuture().getOrThrow() + nodes[1].services.validatedTransactions.trackTransaction(newTx.id).getOrThrow() // Check that nodes[1] has finished recording the transaction (and updating the vault.. hopefully!). // nodes[1] should have two states. The newly created output of type "Regular.State" and the reference state created by nodes[0]. assertEquals(2, nodes[1].services.vaultService.queryBy().states.size) @@ -108,7 +107,7 @@ class ReferencedStatesFlowTests { // 2. Use the "newRefState" a transaction involving another party (nodes[1]) which creates a new state. They should store the new state and the reference state. val newTx = nodes[0].services.startFlow(UseRefState(nodes[1].info.legalIdentities.first(), newRefState.state.data.linearId)).resultFuture.getOrThrow() // Wait until node 1 stores the new tx. - nodes[1].services.validatedTransactions.updates.filter { it.id == newTx.id }.toFuture().getOrThrow() + nodes[1].services.validatedTransactions.trackTransaction(newTx.id).getOrThrow() // Check that nodes[1] has finished recording the transaction (and updating the vault.. hopefully!). val allRefStates = nodes[1].services.vaultService.queryBy() // nodes[1] should have two states. The newly created output and the reference state created by nodes[0]. @@ -125,7 +124,7 @@ class ReferencedStatesFlowTests { val newTx = nodes[0].services.startFlow(UseRefState(nodes[1].info.legalIdentities.first(), newRefState.state.data.linearId)) .resultFuture.getOrThrow() // Wait until node 1 stores the new tx. - nodes[1].services.validatedTransactions.updates.filter { it.id == newTx.id }.toFuture().getOrThrow() + nodes[1].services.validatedTransactions.trackTransaction(newTx.id).getOrThrow() // Check that nodes[1] has finished recording the transaction (and updating the vault.. hopefully!). // nodes[1] should have two states. The newly created output of type "Regular.State" and the reference state created by nodes[0]. assertEquals(2, nodes[1].services.vaultService.queryBy().states.size) @@ -144,13 +143,13 @@ class ReferencedStatesFlowTests { assertEquals(Vault.StateStatus.UNCONSUMED, theReferencedStateOnNodeZero.statesMetadata.single().status) // 3. Update the reference state but don't share the update. - val updatedRefTx = nodes[0].services.startFlow(UpdateRefState(newRefState)).resultFuture.getOrThrow() + nodes[0].services.startFlow(UpdateRefState(newRefState)).resultFuture.getOrThrow() // 4. Use the evolved state as a reference state. val updatedTx = nodes[0].services.startFlow(UseRefState(nodes[1].info.legalIdentities.first(), newRefState.state.data.linearId)) .resultFuture.getOrThrow() // Wait until node 1 stores the new tx. - nodes[1].services.validatedTransactions.updates.filter { it.id == updatedTx.id }.toFuture().getOrThrow() + nodes[1].services.validatedTransactions.trackTransaction(updatedTx.id).getOrThrow() // Check that nodes[1] has finished recording the transaction (and updating the vault.. hopefully!). // nodes[1] should have four states. The originals, plus the newly created output of type "Regular.State" and the reference state created by nodes[0]. assertEquals(4, nodes[1].services.vaultService.queryBy(QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL)).states.size) @@ -167,6 +166,37 @@ class ReferencedStatesFlowTests { assertEquals(Vault.StateStatus.CONSUMED, theOriginalReferencedStateOnNodeZero.statesMetadata.single().status) } + @Test + fun `check consumed reference state is found if a transaction refers to it`() { + // 1. Create a state to be used as a reference state. Don't share it. + val newRefTx = nodes[0].services.startFlow(CreateRefState()).resultFuture.getOrThrow() + val newRefState = newRefTx.tx.outRefsOfType().single() + + // 2. Use the "newRefState" in a transaction involving another party (nodes[1]) which creates a new state. They should store the new state and the reference state. + val newTx = nodes[0].services.startFlow(UseRefState(nodes[1].info.legalIdentities.first(), newRefState.state.data.linearId)) + .resultFuture.getOrThrow() + + // Wait until node 1 stores the new tx. + nodes[1].services.validatedTransactions.trackTransaction(newTx.id).getOrThrow() + // Check that nodes[1] has finished recording the transaction (and updating the vault.. hopefully!). + // nodes[1] should have two states. The newly created output of type "Regular.State" and the reference state created by nodes[0]. + assertEquals(2, nodes[1].services.vaultService.queryBy().states.size) + + // 3. Update the reference state but don't share the update. + val updatedRefTx = nodes[0].services.startFlow(UpdateRefState(newRefState)).resultFuture.getOrThrow() + + // 4. Now report the transactions that created the two reference states to a third party. + nodes[0].services.startFlow(ReportTransactionFlow(nodes[2].info.legalIdentities.first(), newRefTx)).resultFuture.getOrThrow() + nodes[0].services.startFlow(ReportTransactionFlow(nodes[2].info.legalIdentities.first(), updatedRefTx)).resultFuture.getOrThrow() + // Check that there are two linear states in the vault (note that one is consumed) + assertEquals(2, nodes[2].services.vaultService.queryBy(QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL)).states.size) + + // 5. Report the transaction that uses the consumed reference state + nodes[0].services.startFlow(ReportTransactionFlow(nodes[2].info.legalIdentities.first(), newTx)).resultFuture.getOrThrow() + // There should be 3 linear states in the vault + assertEquals(3, nodes[2].services.vaultService.queryBy(QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL)).states.size) + } + // A dummy reference state contract. class RefState : Contract { companion object { @@ -284,4 +314,27 @@ class ReferencedStatesFlowTests { return subFlow(ReceiveFinalityFlow(otherSession, statesToRecord = StatesToRecord.ONLY_RELEVANT)) } } + + // A flow to report a transaction to a third party. + @InitiatingFlow + @StartableByRPC + class ReportTransactionFlow(private val reportee: Party, + private val signedTx: SignedTransaction) : FlowLogic() { + @Suspendable + override fun call() { + val session = initiateFlow(reportee) + subFlow(SendTransactionFlow(session, signedTx)) + session.receive() + } + } + + @InitiatedBy(ReportTransactionFlow::class) + class ReceiveReportedTransactionFlow(private val otherSideSession: FlowSession) : FlowLogic() { + + @Suspendable + override fun call() { + subFlow(ReceiveTransactionFlow(otherSideSession, true, StatesToRecord.ALL_VISIBLE)) + otherSideSession.send(Unit) + } + } } diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 6f90613fe7..2a8ab4f66b 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -20,6 +20,13 @@ Version 5.0 ``log`` directory. This zip will contain a JSON representation of each checkpointed flow. This information can then be used to determine the state of stuck flows or flows that experienced internal errors and were kept in the node for manual intervention. +* It is now possible to re-record transactions if a node wishes to record as an observer a transaction it has participated in. If this is + done, then the node may record new output states that are not relevant to the node. + +.. warning:: Nodes may re-record transactions if they have previously recorded them as a participant and wish to record them as an observer. + However, the node cannot resolve the forward chain of transactions if this is done. This means that if you wish to re-record a chain of + transactions and get the new output states to be correctly marked as consumed, the full chain must be sent to the node *in order*. + .. _changelog_v4.2: Version 4.2 diff --git a/docs/source/tutorial-observer-nodes.rst b/docs/source/tutorial-observer-nodes.rst index 3e70892897..1ec8ddc564 100644 --- a/docs/source/tutorial-observer-nodes.rst +++ b/docs/source/tutorial-observer-nodes.rst @@ -43,12 +43,12 @@ Caveats participant/owner. See https://docs.corda.net/api-vault-query.html#example-usage for information on how to do this. This also means that ``Cash.generateSpend`` should not be used when recording ``Cash.State`` states as an observer -* Nodes only record each transaction once. If a node has already recorded a transaction in non-observer mode, it cannot - later re-record the same transaction as an observer. This issue is tracked here: - https://r3-cev.atlassian.net/browse/CORDA-883 - * When an observer node is sent a transaction with the ALL_VISIBLE flag set, any transactions in the transaction history that have not already been received will also have ALL_VISIBLE states recorded. This mean a node that is both an observer and a participant may have some transactions with all states recorded and some with only relevant states recorded, even if those transactions are part of the same chain. As a result, there may be more states present in the vault than would be expected if just those transactions sent with the ALL_VISIBLE recording flag were processed in this way. + +* Nodes may re-record transaction if they have previously recorded them as a participant and wish to record them as an observer. However, + the node cannot resolve a forward chain of transactions if this is done. This means that if you wish to re-record a chain of transactions + and get the new output states to be correctly marked as consumed, the full chain must be sent to the node *in order*. diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index 7ccd6db8b9..beeea48ffc 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -59,7 +59,16 @@ interface ServiceHubInternal : ServiceHub { database.transaction { require(txs.any()) { "No transactions passed in for recording" } - val recordedTransactions = txs.filter { validatedTransactions.addTransaction(it) } + + // Divide transactions into those seen before and those that are new to this node if ALL_VISIBLE states are being recorded. + // This allows the node to re-record transactions that have previously only been seen at the ONLY_RELEVANT level. Note that + // for transactions being recorded at ONLY_RELEVANT, if this transaction has been seen before its outputs should already + // have been recorded at ONLY_RELEVANT, so there shouldn't be anything to re-record here. + val (recordedTransactions, previouslySeenTxs) = if (statesToRecord != StatesToRecord.ALL_VISIBLE) { + Pair(txs.filter { validatedTransactions.addTransaction(it) }, emptyList()) + } else { + txs.partition { validatedTransactions.addTransaction(it) } + } val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id if (stateMachineRunId != null) { recordedTransactions.forEach { @@ -103,7 +112,7 @@ interface ServiceHubInternal : ServiceHub { // // Because the primary use case for recording irrelevant states is observer/regulator nodes, who are unlikely // to make writes to the ledger very often or at all, we choose to punt this issue for the time being. - vaultService.notifyAll(statesToRecord, recordedTransactions.map { it.coreTransaction }) + vaultService.notifyAll(statesToRecord, recordedTransactions.map { it.coreTransaction }, previouslySeenTxs.map { it.coreTransaction }) } } } diff --git a/node/src/main/kotlin/net/corda/node/services/api/VaultServiceInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/VaultServiceInternal.kt index d92b62fe8c..e95ba4b015 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/VaultServiceInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/VaultServiceInternal.kt @@ -15,8 +15,11 @@ interface VaultServiceInternal : VaultService { * indicate whether an update consists entirely of regular or notary change transactions, which may require * different processing logic. */ - fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable) + fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable, previouslySeenTxns: Iterable = emptyList()) - /** Same as notifyAll but with a single transaction. */ + /** + * Same as notifyAll but with a single transaction. + * This does not allow for passing transactions that have already been seen by the node, as this API is only used in testing. + */ fun notify(statesToRecord: StatesToRecord, tx: CoreTransaction) = notifyAll(statesToRecord, listOf(tx)) } diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index 5480c0264b..634e68a4f1 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -161,7 +161,7 @@ class NodeVaultService( } } - private fun recordUpdate(update: Vault.Update): Vault.Update { + private fun recordUpdate(update: Vault.Update, previouslySeen: Boolean): Vault.Update { if (!update.isEmpty()) { val producedStateRefs = update.produced.map { it.ref } val producedStateRefsMap = update.produced.associateBy { it.ref } @@ -181,15 +181,19 @@ class NodeVaultService( consumedStateRefs.forEach { stateRef -> val state = session.get(VaultSchemaV1.VaultStates::class.java, PersistentStateRef(stateRef)) state?.run { - stateStatus = Vault.StateStatus.CONSUMED - consumedTime = clock.instant() - // remove lock (if held) - if (lockId != null) { - lockId = null - lockUpdateTime = clock.instant() - log.trace("Releasing soft lock on consumed state: $stateRef") + // Only update the state if it has not previously been consumed (this could have happened if the transaction is being + // re-recorded. + if (stateStatus != Vault.StateStatus.CONSUMED) { + stateStatus = Vault.StateStatus.CONSUMED + consumedTime = clock.instant() + // remove lock (if held) + if (lockId != null) { + lockId = null + lockUpdateTime = clock.instant() + log.trace("Releasing soft lock on consumed state: $stateRef") + } + session.save(state) } - session.save(state) } } @@ -204,26 +208,30 @@ class NodeVaultService( get() = mutex.locked { _updatesInDbTx } /** Groups adjacent transactions into batches to generate separate net updates per transaction type. */ - override fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable) { - if (statesToRecord == StatesToRecord.NONE || !txns.any()) return + override fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable, previouslySeenTxns: Iterable) { + if (statesToRecord == StatesToRecord.NONE || (!txns.any() && !previouslySeenTxns.any())) return val batch = mutableListOf() - fun flushBatch() { - val updates = makeUpdates(batch, statesToRecord) - processAndNotify(updates) + fun flushBatch(previouslySeen: Boolean) { + val updates = makeUpdates(batch, statesToRecord, previouslySeen) + processAndNotify(updates, previouslySeen) batch.clear() } - - for (tx in txns) { - if (batch.isNotEmpty() && tx.javaClass != batch.last().javaClass) { - flushBatch() + fun processTransactions(txs: Iterable, previouslySeen: Boolean) { + for (tx in txs) { + if (batch.isNotEmpty() && tx.javaClass != batch.last().javaClass) { + flushBatch(previouslySeen) + } + batch.add(tx) } - batch.add(tx) + flushBatch(previouslySeen) } - flushBatch() + + processTransactions(previouslySeenTxns, true) + processTransactions(txns, false) } - private fun makeUpdates(batch: Iterable, statesToRecord: StatesToRecord): List> { + private fun makeUpdates(batch: Iterable, statesToRecord: StatesToRecord, previouslySeen: Boolean): List> { fun withValidDeserialization(list: List, txId: SecureHash): Map = (0 until list.size).mapNotNull { idx -> try { @@ -251,7 +259,18 @@ class NodeVaultService( StatesToRecord.ONLY_RELEVANT -> outputs.filter { (_, value) -> isRelevant(value.data, keyManagementService.filterMyKeys(outputs.values.flatMap { it.data.participants.map { it.owningKey } }).toSet()) } - StatesToRecord.ALL_VISIBLE -> outputs + StatesToRecord.ALL_VISIBLE -> if (previouslySeen) { + // For transactions being re-recorded, the node must check its vault to find out what states it has already seen. Note + // that some of the outputs previously seen may have been consumed in the meantime, so the check must look for all state + // statuses. + val outputRefs = tx.outRefsOfType().map { it.ref } + val seenRefs = loadStates(outputRefs).map { it.ref } + val unseenRefs = outputRefs - seenRefs + val unseenOutputIdxs = unseenRefs.map { it.index }.toSet() + outputs.filter { it.key in unseenOutputIdxs } + } else { + outputs + } }.map { (idx, _) -> tx.outRef(idx) } // Retrieve all unconsumed states for this transaction's inputs. @@ -334,18 +353,20 @@ class NodeVaultService( (0..(refsList.size - 1) / pageSize).forEach { val offset = it * pageSize val limit = minOf(offset + pageSize, refsList.size) - val page = queryBy(QueryCriteria.VaultQueryCriteria(stateRefs = refsList.subList(offset, limit))).states + val page = queryBy(QueryCriteria.VaultQueryCriteria( + stateRefs = refsList.subList(offset, limit), + status = Vault.StateStatus.ALL)).states states.addAll(page) } } return states } - private fun processAndNotify(updates: List>) { + private fun processAndNotify(updates: List>, previouslySeen: Boolean) { if (updates.isEmpty()) return val netUpdate = updates.reduce { update1, update2 -> update1 + update2 } if (!netUpdate.isEmpty()) { - recordUpdate(netUpdate) + recordUpdate(netUpdate, previouslySeen) mutex.locked { // flowId was required by SoftLockManager to perform auto-registration of soft locks for new states val uuid = (Strand.currentStrand() as? FlowStateMachineImpl<*>)?.id?.uuid diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/ObserverNodeTransactionTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/ObserverNodeTransactionTests.kt index e681b7cac7..9d3cb1209f 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/ObserverNodeTransactionTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/ObserverNodeTransactionTests.kt @@ -18,10 +18,7 @@ import net.corda.node.testing.MessageData import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.BOB_NAME import net.corda.testing.core.singleIdentity -import net.corda.testing.node.internal.InternalMockNetwork -import net.corda.testing.node.internal.cordappWithPackages -import net.corda.testing.node.internal.enclosedCordapp -import net.corda.testing.node.internal.startFlow +import net.corda.testing.node.internal.* import org.junit.After import org.junit.Before import org.junit.Test @@ -45,6 +42,44 @@ class ObserverNodeTransactionTests { mockNet.stopNodes() } + fun buildTransactionChain(initialMessage: MessageData, chainLength: Int, node: TestStartedNode, notary: Party) { + node.services.startFlow(StartMessageChainFlow(initialMessage, notary)).resultFuture.getOrThrow() + var result = node.services.vaultService.queryBy(MessageChainState::class.java).states.singleOrNull { + it.state.data.message.value.startsWith(initialMessage.value) + } + + for (_i in 0.until(chainLength -1 )) { + node.services.startFlow(ContinueMessageChainFlow(result!!, notary)).resultFuture.getOrThrow() + result = node.services.vaultService.queryBy(MessageChainState::class.java).states.singleOrNull { + it.state.data.message.value.startsWith(initialMessage.value) + } + } + } + + fun sendTransactionToObserver(transactionIdx: Int, node: TestStartedNode, regulator: TestStartedNode) { + val transactionList = node.services.validatedTransactions.track().snapshot + node.services.startFlow(ReportToCounterparty(regulator.info.singleIdentity(), transactionList[transactionIdx])).resultFuture.getOrThrow() + } + + fun sendTransactionToObserverOnlyRelevant(transactionIdx: Int, node: TestStartedNode, regulator: TestStartedNode) { + val transactionList = node.services.validatedTransactions.track().snapshot + node.services.startFlow(SendTransaction(regulator.info.singleIdentity(), transactionList[transactionIdx])).resultFuture.getOrThrow() + } + + fun checkObserverTransactions(expectedMessage: MessageData, regulator: TestStartedNode, numStates: Int = 1) { + val regulatorStates = regulator.services.vaultService.queryBy(MessageChainState::class.java).states.filter { + it.state.data.message.value.startsWith(expectedMessage.value[0]) + } + + assertNotNull(regulatorStates, "Could not find any regulator states") + assertEquals(numStates, regulatorStates.size, "Incorrect number of unconsumed regulator states") + for (state in regulatorStates) { + val retrievedMessage = state.state.data.message + assertEquals(expectedMessage, retrievedMessage, "Final unconsumed regulator state is incorrect") + } + } + + @Test fun `Broadcasting an old transaction does not cause 2 unconsumed states`() { val node = mockNet.createPartyNode(ALICE_NAME) @@ -52,45 +87,81 @@ class ObserverNodeTransactionTests { val notary = mockNet.defaultNotaryIdentity regulator.registerInitiatedFlow(ReceiveReportedTransaction::class.java) - fun buildTransactionChain(initialMessage: MessageData, chainLength: Int) { - node.services.startFlow(StartMessageChainFlow(initialMessage, notary)).resultFuture.getOrThrow() - var result = node.services.vaultService.queryBy(MessageChainState::class.java).states.singleOrNull { - it.state.data.message.value.startsWith(initialMessage.value) - } - - for (_i in 0.until(chainLength -1 )) { - node.services.startFlow(ContinueMessageChainFlow(result!!, notary)).resultFuture.getOrThrow() - result = node.services.vaultService.queryBy(MessageChainState::class.java).states.singleOrNull { - it.state.data.message.value.startsWith(initialMessage.value) - } - } - } - - fun sendTransactionToObserver(transactionIdx: Int) { - val transactionList = node.services.validatedTransactions.track().snapshot - node.services.startFlow(ReportToCounterparty(regulator.info.singleIdentity(), transactionList[transactionIdx])).resultFuture.getOrThrow() - } - - fun checkObserverTransactions(expectedMessage: MessageData) { - val regulatorStates = regulator.services.vaultService.queryBy(MessageChainState::class.java).states.filter { - it.state.data.message.value.startsWith(expectedMessage.value[0]) - } - - assertNotNull(regulatorStates, "Could not find any regulator states") - assertEquals(1, regulatorStates.size, "Incorrect number of unconsumed regulator states") - val retrievedMessage = regulatorStates.singleOrNull()!!.state.data.message - assertEquals(expectedMessage, retrievedMessage, "Final unconsumed regulator state is incorrect") - } - // Check that sending an old transaction doesn't result in a new unconsumed state val message = MessageData("A") - buildTransactionChain(message, 4) - sendTransactionToObserver(3) - sendTransactionToObserver(1) + buildTransactionChain(message, 4, node, notary) + sendTransactionToObserver(3, node, regulator) + sendTransactionToObserver(1, node, regulator) val outputMessage = MessageData("AAAA") - checkObserverTransactions(outputMessage) + checkObserverTransactions(outputMessage, regulator) } + @Test + fun `Non relevant states are recorded if transaction is re-received with new states to record`() { + val node = mockNet.createPartyNode(ALICE_NAME) + val regulator = mockNet.createPartyNode(BOB_NAME) + val notary = mockNet.defaultNotaryIdentity + regulator.registerInitiatedFlow(ReceiveReportedTransaction::class.java) + regulator.registerInitiatedFlow(ReceiveTransaction::class.java) + + val message = MessageData("A") + buildTransactionChain(message, 4, node, notary) + sendTransactionToObserverOnlyRelevant(3, node, regulator) + sendTransactionToObserver(3, node, regulator) + val outputMessage = MessageData("AAAA") + checkObserverTransactions(outputMessage, regulator) + } + + @Test + fun `Re-recording a transaction adds non-relevant states`() { + val alice = mockNet.createPartyNode(ALICE_NAME) + val bob = mockNet.createPartyNode(BOB_NAME) + val notary = mockNet.defaultNotaryIdentity + bob.registerInitiatedFlow(ReceiveSplitMessagesFlow::class.java) + bob.registerInitiatedFlow(ReceiveReportedTransaction::class.java) + + val message = MessageData("AA") + alice.services.startFlow(SplitMessagesFlow(message, bob.info.singleIdentity(), notary)).resultFuture.getOrThrow() + checkObserverTransactions(message, bob) + sendTransactionToObserver(0, alice, bob) + checkObserverTransactions(message, bob, 2) + } + + @Test + fun `Re-recording a transaction at only relevant does not cause failures`() { + val alice = mockNet.createPartyNode(ALICE_NAME) + val bob = mockNet.createPartyNode(BOB_NAME) + val notary = mockNet.defaultNotaryIdentity + bob.registerInitiatedFlow(ReceiveSplitMessagesFlow::class.java) + bob.registerInitiatedFlow(ReceiveReportedTransaction::class.java) + + val message = MessageData("AA") + alice.services.startFlow(SplitMessagesFlow(message, bob.info.singleIdentity(), notary)).resultFuture.getOrThrow() + checkObserverTransactions(message, bob) + sendTransactionToObserverOnlyRelevant(0, alice, bob) + checkObserverTransactions(message, bob, 1) + } + + @Test + fun `Recording a transaction twice at all visible works`() { + val alice = mockNet.createPartyNode(ALICE_NAME) + val bob = mockNet.createPartyNode(BOB_NAME) + val notary = mockNet.defaultNotaryIdentity + bob.registerInitiatedFlow(ReceiveSplitMessagesFlow::class.java) + bob.registerInitiatedFlow(ReceiveReportedTransaction::class.java) + + val message = MessageData("AA") + alice.services.startFlow(SplitMessagesFlow(message, bob.info.singleIdentity(), notary)).resultFuture.getOrThrow() + checkObserverTransactions(message, bob) + sendTransactionToObserverOnlyRelevant(0, alice, bob) + checkObserverTransactions(message, bob, 1) + sendTransactionToObserver(0, alice, bob) + checkObserverTransactions(message, bob, 2) + sendTransactionToObserver(0, alice, bob) + checkObserverTransactions(message, bob, 2) + sendTransactionToObserverOnlyRelevant(0, alice, bob) + checkObserverTransactions(message, bob, 2) + } @StartableByRPC class StartMessageChainFlow(private val message: MessageData, private val notary: Party) : FlowLogic() { @@ -167,21 +238,111 @@ class ObserverNodeTransactionTests { } } + @StartableByRPC + @InitiatingFlow + class SplitMessagesFlow(private val message: MessageData, + private val counterparty: Party, + private val notary: Party): FlowLogic() { + companion object { + object GENERATING_TRANSACTION : ProgressTracker.Step("Generating transaction based on the message.") + object VERIFYING_TRANSACTION : ProgressTracker.Step("Verifying contract constraints.") + object SIGNING_TRANSACTION : ProgressTracker.Step("Signing transaction with our private key.") + object FINALISING_TRANSACTION : ProgressTracker.Step("Obtaining notary signature and recording transaction.") { + override fun childProgressTracker() = FinalityFlow.tracker() + } + + fun tracker() = ProgressTracker(GENERATING_TRANSACTION, VERIFYING_TRANSACTION, SIGNING_TRANSACTION, FINALISING_TRANSACTION) + } + + override val progressTracker = tracker() + + @Suspendable + override fun call(): SignedTransaction { + progressTracker.currentStep = GENERATING_TRANSACTION + + val messageState = MessageChainState(message = message, by = ourIdentity) + val otherPartyState = MessageChainState(message = message, by = counterparty) + val txCommand = Command(MessageChainContract.Commands.Split(), listOf(ourIdentity.owningKey, counterparty.owningKey)) + val txBuilder = TransactionBuilder(notary) + .addOutputState(messageState) + .addOutputState(otherPartyState) + .addCommand(txCommand) + + progressTracker.currentStep = VERIFYING_TRANSACTION + txBuilder.toWireTransaction(serviceHub).toLedgerTransaction(serviceHub).verify() + + progressTracker.currentStep = SIGNING_TRANSACTION + val signedTx = serviceHub.signInitialTransaction(txBuilder) + val session = initiateFlow(counterparty) + + val stx = subFlow(CollectSignaturesFlow(signedTx, listOf(session))) + + progressTracker.currentStep = FINALISING_TRANSACTION + val finalStx = subFlow(FinalityFlow(stx, listOf(session), FINALISING_TRANSACTION.childProgressTracker())) + session.receive() + return finalStx + } + } + + @InitiatedBy(SplitMessagesFlow::class) + class ReceiveSplitMessagesFlow(private val otherSideSession: FlowSession): FlowLogic() { + + @Suspendable + override fun call(): SignedTransaction { + val flow = object : SignTransactionFlow(otherSideSession) { + @Suspendable + override fun checkTransaction(stx: SignedTransaction) { + + } + } + subFlow(flow) + val stx = subFlow(ReceiveFinalityFlow(otherSideSession)) + otherSideSession.send(Unit) + return stx + } + } + @InitiatingFlow @StartableByRPC - class ReportToCounterparty(private val regulator: Party, private val signedTx: SignedTransaction) : FlowLogic() { + class ReportToCounterparty(private val regulator: Party, + private val signedTx: SignedTransaction) : FlowLogic() { @Suspendable override fun call() { val session = initiateFlow(regulator) subFlow(SendTransactionFlow(session, signedTx)) + session.receive() } } @InitiatedBy(ReportToCounterparty::class) class ReceiveReportedTransaction(private val otherSideSession: FlowSession) : FlowLogic() { + @Suspendable override fun call() { subFlow(ReceiveTransactionFlow(otherSideSession, true, StatesToRecord.ALL_VISIBLE)) + otherSideSession.send(Unit) + } + } + + @InitiatingFlow + @StartableByRPC + class SendTransaction(private val regulator: Party, + private val signedTx: SignedTransaction) : FlowLogic() { + @Suspendable + override fun call() { + val session = initiateFlow(regulator) + subFlow(SendTransactionFlow(session, signedTx)) + session.receive() + } + } + + @InitiatedBy(SendTransaction::class) + class ReceiveTransaction(private val otherSideSession: FlowSession) : FlowLogic() { + + @Suspendable + override fun call() { + subFlow(ReceiveTransactionFlow(otherSideSession, true, StatesToRecord.ONLY_RELEVANT)) + otherSideSession.send(Unit) } } } diff --git a/node/src/test/kotlin/net/corda/node/testing/MessageChainState.kt b/node/src/test/kotlin/net/corda/node/testing/MessageChainState.kt index d8e3f0cea6..034d6a8d1f 100644 --- a/node/src/test/kotlin/net/corda/node/testing/MessageChainState.kt +++ b/node/src/test/kotlin/net/corda/node/testing/MessageChainState.kt @@ -52,7 +52,7 @@ object MessageChainSchemaV1 : MappedSchema( const val MESSAGE_CHAIN_CONTRACT_PROGRAM_ID = "net.corda.node.testing.MessageChainContract" open class MessageChainContract : Contract { - override fun verify(tx: LedgerTransaction) { + private fun verifySend(tx: LedgerTransaction) { val command = tx.commands.requireSingleCommand() requireThat { // Generic constraints around the IOU transaction. @@ -64,7 +64,22 @@ open class MessageChainContract : Contract { } } + private fun verifySplit(tx: LedgerTransaction) { + requireThat { + "Two output state should be created." using (tx.outputs.size == 2) + } + } + + override fun verify(tx: LedgerTransaction) { + val command = tx.commands.requireSingleCommand().value + when (command) { + is Commands.Send -> verifySend(tx) + is Commands.Split -> verifySplit(tx) + } + } + interface Commands : CommandData { class Send : Commands + class Split : Commands } }