[CORDA-2086] Allow transactions to be re-recorded using StatesToRecord.ALL_VISIBLE (#5184)

This commit is contained in:
James Higgs 2019-07-04 14:00:42 +01:00 committed by Shams Asari
parent 479d61cfde
commit 075f68f179
8 changed files with 349 additions and 80 deletions

View File

@ -8,7 +8,6 @@ import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.Vault import net.corda.core.node.services.Vault
import net.corda.core.node.services.queryBy import net.corda.core.node.services.queryBy
import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.toFuture
import net.corda.core.transactions.LedgerTransaction import net.corda.core.transactions.LedgerTransaction
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder import net.corda.core.transactions.TransactionBuilder
@ -32,7 +31,7 @@ class ReferencedStatesFlowTests {
@Before @Before
fun setup() { fun setup() {
nodes = (0..1).map { nodes = (0..2).map {
mockNet.createNode( mockNet.createNode(
parameters = InternalMockNodeParameters(version = VersionInfo(4, "Blah", "Blah", "Blah")) parameters = InternalMockNodeParameters(version = VersionInfo(4, "Blah", "Blah", "Blah"))
) )
@ -77,7 +76,7 @@ class ReferencedStatesFlowTests {
// 2. Use the "newRefState" a transaction involving another party (nodes[1]) which creates a new state. They should store the new state and the reference state. // 2. Use the "newRefState" a transaction involving another party (nodes[1]) which creates a new state. They should store the new state and the reference state.
val newTx = nodes[0].services.startFlow(UseRefState(nodes[1].info.legalIdentities.first(), newRefState.state.data.linearId)).resultFuture.getOrThrow() val newTx = nodes[0].services.startFlow(UseRefState(nodes[1].info.legalIdentities.first(), newRefState.state.data.linearId)).resultFuture.getOrThrow()
// Wait until node 1 stores the new tx. // Wait until node 1 stores the new tx.
nodes[1].services.validatedTransactions.updates.filter { it.id == newTx.id }.toFuture().getOrThrow() nodes[1].services.validatedTransactions.trackTransaction(newTx.id).getOrThrow()
// Check that nodes[1] has finished recording the transaction (and updating the vault.. hopefully!). // Check that nodes[1] has finished recording the transaction (and updating the vault.. hopefully!).
// nodes[1] should have two states. The newly created output of type "Regular.State" and the reference state created by nodes[0]. // nodes[1] should have two states. The newly created output of type "Regular.State" and the reference state created by nodes[0].
assertEquals(2, nodes[1].services.vaultService.queryBy<LinearState>().states.size) assertEquals(2, nodes[1].services.vaultService.queryBy<LinearState>().states.size)
@ -108,7 +107,7 @@ class ReferencedStatesFlowTests {
// 2. Use the "newRefState" a transaction involving another party (nodes[1]) which creates a new state. They should store the new state and the reference state. // 2. Use the "newRefState" a transaction involving another party (nodes[1]) which creates a new state. They should store the new state and the reference state.
val newTx = nodes[0].services.startFlow(UseRefState(nodes[1].info.legalIdentities.first(), newRefState.state.data.linearId)).resultFuture.getOrThrow() val newTx = nodes[0].services.startFlow(UseRefState(nodes[1].info.legalIdentities.first(), newRefState.state.data.linearId)).resultFuture.getOrThrow()
// Wait until node 1 stores the new tx. // Wait until node 1 stores the new tx.
nodes[1].services.validatedTransactions.updates.filter { it.id == newTx.id }.toFuture().getOrThrow() nodes[1].services.validatedTransactions.trackTransaction(newTx.id).getOrThrow()
// Check that nodes[1] has finished recording the transaction (and updating the vault.. hopefully!). // Check that nodes[1] has finished recording the transaction (and updating the vault.. hopefully!).
val allRefStates = nodes[1].services.vaultService.queryBy<LinearState>() val allRefStates = nodes[1].services.vaultService.queryBy<LinearState>()
// nodes[1] should have two states. The newly created output and the reference state created by nodes[0]. // nodes[1] should have two states. The newly created output and the reference state created by nodes[0].
@ -125,7 +124,7 @@ class ReferencedStatesFlowTests {
val newTx = nodes[0].services.startFlow(UseRefState(nodes[1].info.legalIdentities.first(), newRefState.state.data.linearId)) val newTx = nodes[0].services.startFlow(UseRefState(nodes[1].info.legalIdentities.first(), newRefState.state.data.linearId))
.resultFuture.getOrThrow() .resultFuture.getOrThrow()
// Wait until node 1 stores the new tx. // Wait until node 1 stores the new tx.
nodes[1].services.validatedTransactions.updates.filter { it.id == newTx.id }.toFuture().getOrThrow() nodes[1].services.validatedTransactions.trackTransaction(newTx.id).getOrThrow()
// Check that nodes[1] has finished recording the transaction (and updating the vault.. hopefully!). // Check that nodes[1] has finished recording the transaction (and updating the vault.. hopefully!).
// nodes[1] should have two states. The newly created output of type "Regular.State" and the reference state created by nodes[0]. // nodes[1] should have two states. The newly created output of type "Regular.State" and the reference state created by nodes[0].
assertEquals(2, nodes[1].services.vaultService.queryBy<LinearState>().states.size) assertEquals(2, nodes[1].services.vaultService.queryBy<LinearState>().states.size)
@ -144,13 +143,13 @@ class ReferencedStatesFlowTests {
assertEquals(Vault.StateStatus.UNCONSUMED, theReferencedStateOnNodeZero.statesMetadata.single().status) assertEquals(Vault.StateStatus.UNCONSUMED, theReferencedStateOnNodeZero.statesMetadata.single().status)
// 3. Update the reference state but don't share the update. // 3. Update the reference state but don't share the update.
val updatedRefTx = nodes[0].services.startFlow(UpdateRefState(newRefState)).resultFuture.getOrThrow() nodes[0].services.startFlow(UpdateRefState(newRefState)).resultFuture.getOrThrow()
// 4. Use the evolved state as a reference state. // 4. Use the evolved state as a reference state.
val updatedTx = nodes[0].services.startFlow(UseRefState(nodes[1].info.legalIdentities.first(), newRefState.state.data.linearId)) val updatedTx = nodes[0].services.startFlow(UseRefState(nodes[1].info.legalIdentities.first(), newRefState.state.data.linearId))
.resultFuture.getOrThrow() .resultFuture.getOrThrow()
// Wait until node 1 stores the new tx. // Wait until node 1 stores the new tx.
nodes[1].services.validatedTransactions.updates.filter { it.id == updatedTx.id }.toFuture().getOrThrow() nodes[1].services.validatedTransactions.trackTransaction(updatedTx.id).getOrThrow()
// Check that nodes[1] has finished recording the transaction (and updating the vault.. hopefully!). // Check that nodes[1] has finished recording the transaction (and updating the vault.. hopefully!).
// nodes[1] should have four states. The originals, plus the newly created output of type "Regular.State" and the reference state created by nodes[0]. // nodes[1] should have four states. The originals, plus the newly created output of type "Regular.State" and the reference state created by nodes[0].
assertEquals(4, nodes[1].services.vaultService.queryBy<LinearState>(QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL)).states.size) assertEquals(4, nodes[1].services.vaultService.queryBy<LinearState>(QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL)).states.size)
@ -167,6 +166,37 @@ class ReferencedStatesFlowTests {
assertEquals(Vault.StateStatus.CONSUMED, theOriginalReferencedStateOnNodeZero.statesMetadata.single().status) assertEquals(Vault.StateStatus.CONSUMED, theOriginalReferencedStateOnNodeZero.statesMetadata.single().status)
} }
@Test
fun `check consumed reference state is found if a transaction refers to it`() {
// 1. Create a state to be used as a reference state. Don't share it.
val newRefTx = nodes[0].services.startFlow(CreateRefState()).resultFuture.getOrThrow()
val newRefState = newRefTx.tx.outRefsOfType<RefState.State>().single()
// 2. Use the "newRefState" in a transaction involving another party (nodes[1]) which creates a new state. They should store the new state and the reference state.
val newTx = nodes[0].services.startFlow(UseRefState(nodes[1].info.legalIdentities.first(), newRefState.state.data.linearId))
.resultFuture.getOrThrow()
// Wait until node 1 stores the new tx.
nodes[1].services.validatedTransactions.trackTransaction(newTx.id).getOrThrow()
// Check that nodes[1] has finished recording the transaction (and updating the vault.. hopefully!).
// nodes[1] should have two states. The newly created output of type "Regular.State" and the reference state created by nodes[0].
assertEquals(2, nodes[1].services.vaultService.queryBy<LinearState>().states.size)
// 3. Update the reference state but don't share the update.
val updatedRefTx = nodes[0].services.startFlow(UpdateRefState(newRefState)).resultFuture.getOrThrow()
// 4. Now report the transactions that created the two reference states to a third party.
nodes[0].services.startFlow(ReportTransactionFlow(nodes[2].info.legalIdentities.first(), newRefTx)).resultFuture.getOrThrow()
nodes[0].services.startFlow(ReportTransactionFlow(nodes[2].info.legalIdentities.first(), updatedRefTx)).resultFuture.getOrThrow()
// Check that there are two linear states in the vault (note that one is consumed)
assertEquals(2, nodes[2].services.vaultService.queryBy<LinearState>(QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL)).states.size)
// 5. Report the transaction that uses the consumed reference state
nodes[0].services.startFlow(ReportTransactionFlow(nodes[2].info.legalIdentities.first(), newTx)).resultFuture.getOrThrow()
// There should be 3 linear states in the vault
assertEquals(3, nodes[2].services.vaultService.queryBy<LinearState>(QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL)).states.size)
}
// A dummy reference state contract. // A dummy reference state contract.
class RefState : Contract { class RefState : Contract {
companion object { companion object {
@ -284,4 +314,27 @@ class ReferencedStatesFlowTests {
return subFlow(ReceiveFinalityFlow(otherSession, statesToRecord = StatesToRecord.ONLY_RELEVANT)) return subFlow(ReceiveFinalityFlow(otherSession, statesToRecord = StatesToRecord.ONLY_RELEVANT))
} }
} }
// A flow to report a transaction to a third party.
@InitiatingFlow
@StartableByRPC
class ReportTransactionFlow(private val reportee: Party,
private val signedTx: SignedTransaction) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(reportee)
subFlow(SendTransactionFlow(session, signedTx))
session.receive<Unit>()
}
}
@InitiatedBy(ReportTransactionFlow::class)
class ReceiveReportedTransactionFlow(private val otherSideSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
subFlow(ReceiveTransactionFlow(otherSideSession, true, StatesToRecord.ALL_VISIBLE))
otherSideSession.send(Unit)
}
}
} }

View File

@ -20,6 +20,13 @@ Version 5.0
``log`` directory. This zip will contain a JSON representation of each checkpointed flow. This information can then be used to determine the ``log`` directory. This zip will contain a JSON representation of each checkpointed flow. This information can then be used to determine the
state of stuck flows or flows that experienced internal errors and were kept in the node for manual intervention. state of stuck flows or flows that experienced internal errors and were kept in the node for manual intervention.
* It is now possible to re-record transactions if a node wishes to record as an observer a transaction it has participated in. If this is
done, then the node may record new output states that are not relevant to the node.
.. warning:: Nodes may re-record transactions if they have previously recorded them as a participant and wish to record them as an observer.
However, the node cannot resolve the forward chain of transactions if this is done. This means that if you wish to re-record a chain of
transactions and get the new output states to be correctly marked as consumed, the full chain must be sent to the node *in order*.
.. _changelog_v4.2: .. _changelog_v4.2:
Version 4.2 Version 4.2

View File

@ -43,12 +43,12 @@ Caveats
participant/owner. See https://docs.corda.net/api-vault-query.html#example-usage for information on how to do this. participant/owner. See https://docs.corda.net/api-vault-query.html#example-usage for information on how to do this.
This also means that ``Cash.generateSpend`` should not be used when recording ``Cash.State`` states as an observer This also means that ``Cash.generateSpend`` should not be used when recording ``Cash.State`` states as an observer
* Nodes only record each transaction once. If a node has already recorded a transaction in non-observer mode, it cannot
later re-record the same transaction as an observer. This issue is tracked here:
https://r3-cev.atlassian.net/browse/CORDA-883
* When an observer node is sent a transaction with the ALL_VISIBLE flag set, any transactions in the transaction history * When an observer node is sent a transaction with the ALL_VISIBLE flag set, any transactions in the transaction history
that have not already been received will also have ALL_VISIBLE states recorded. This mean a node that is both an observer that have not already been received will also have ALL_VISIBLE states recorded. This mean a node that is both an observer
and a participant may have some transactions with all states recorded and some with only relevant states recorded, even and a participant may have some transactions with all states recorded and some with only relevant states recorded, even
if those transactions are part of the same chain. As a result, there may be more states present in the vault than would be if those transactions are part of the same chain. As a result, there may be more states present in the vault than would be
expected if just those transactions sent with the ALL_VISIBLE recording flag were processed in this way. expected if just those transactions sent with the ALL_VISIBLE recording flag were processed in this way.
* Nodes may re-record transaction if they have previously recorded them as a participant and wish to record them as an observer. However,
the node cannot resolve a forward chain of transactions if this is done. This means that if you wish to re-record a chain of transactions
and get the new output states to be correctly marked as consumed, the full chain must be sent to the node *in order*.

View File

@ -59,7 +59,16 @@ interface ServiceHubInternal : ServiceHub {
database.transaction { database.transaction {
require(txs.any()) { "No transactions passed in for recording" } require(txs.any()) { "No transactions passed in for recording" }
val recordedTransactions = txs.filter { validatedTransactions.addTransaction(it) }
// Divide transactions into those seen before and those that are new to this node if ALL_VISIBLE states are being recorded.
// This allows the node to re-record transactions that have previously only been seen at the ONLY_RELEVANT level. Note that
// for transactions being recorded at ONLY_RELEVANT, if this transaction has been seen before its outputs should already
// have been recorded at ONLY_RELEVANT, so there shouldn't be anything to re-record here.
val (recordedTransactions, previouslySeenTxs) = if (statesToRecord != StatesToRecord.ALL_VISIBLE) {
Pair(txs.filter { validatedTransactions.addTransaction(it) }, emptyList())
} else {
txs.partition { validatedTransactions.addTransaction(it) }
}
val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id
if (stateMachineRunId != null) { if (stateMachineRunId != null) {
recordedTransactions.forEach { recordedTransactions.forEach {
@ -103,7 +112,7 @@ interface ServiceHubInternal : ServiceHub {
// //
// Because the primary use case for recording irrelevant states is observer/regulator nodes, who are unlikely // Because the primary use case for recording irrelevant states is observer/regulator nodes, who are unlikely
// to make writes to the ledger very often or at all, we choose to punt this issue for the time being. // to make writes to the ledger very often or at all, we choose to punt this issue for the time being.
vaultService.notifyAll(statesToRecord, recordedTransactions.map { it.coreTransaction }) vaultService.notifyAll(statesToRecord, recordedTransactions.map { it.coreTransaction }, previouslySeenTxs.map { it.coreTransaction })
} }
} }
} }

View File

@ -15,8 +15,11 @@ interface VaultServiceInternal : VaultService {
* indicate whether an update consists entirely of regular or notary change transactions, which may require * indicate whether an update consists entirely of regular or notary change transactions, which may require
* different processing logic. * different processing logic.
*/ */
fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable<CoreTransaction>) fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable<CoreTransaction>, previouslySeenTxns: Iterable<CoreTransaction> = emptyList())
/** Same as notifyAll but with a single transaction. */ /**
* Same as notifyAll but with a single transaction.
* This does not allow for passing transactions that have already been seen by the node, as this API is only used in testing.
*/
fun notify(statesToRecord: StatesToRecord, tx: CoreTransaction) = notifyAll(statesToRecord, listOf(tx)) fun notify(statesToRecord: StatesToRecord, tx: CoreTransaction) = notifyAll(statesToRecord, listOf(tx))
} }

View File

@ -161,7 +161,7 @@ class NodeVaultService(
} }
} }
private fun recordUpdate(update: Vault.Update<ContractState>): Vault.Update<ContractState> { private fun recordUpdate(update: Vault.Update<ContractState>, previouslySeen: Boolean): Vault.Update<ContractState> {
if (!update.isEmpty()) { if (!update.isEmpty()) {
val producedStateRefs = update.produced.map { it.ref } val producedStateRefs = update.produced.map { it.ref }
val producedStateRefsMap = update.produced.associateBy { it.ref } val producedStateRefsMap = update.produced.associateBy { it.ref }
@ -181,15 +181,19 @@ class NodeVaultService(
consumedStateRefs.forEach { stateRef -> consumedStateRefs.forEach { stateRef ->
val state = session.get<VaultSchemaV1.VaultStates>(VaultSchemaV1.VaultStates::class.java, PersistentStateRef(stateRef)) val state = session.get<VaultSchemaV1.VaultStates>(VaultSchemaV1.VaultStates::class.java, PersistentStateRef(stateRef))
state?.run { state?.run {
stateStatus = Vault.StateStatus.CONSUMED // Only update the state if it has not previously been consumed (this could have happened if the transaction is being
consumedTime = clock.instant() // re-recorded.
// remove lock (if held) if (stateStatus != Vault.StateStatus.CONSUMED) {
if (lockId != null) { stateStatus = Vault.StateStatus.CONSUMED
lockId = null consumedTime = clock.instant()
lockUpdateTime = clock.instant() // remove lock (if held)
log.trace("Releasing soft lock on consumed state: $stateRef") if (lockId != null) {
lockId = null
lockUpdateTime = clock.instant()
log.trace("Releasing soft lock on consumed state: $stateRef")
}
session.save(state)
} }
session.save(state)
} }
} }
@ -204,26 +208,30 @@ class NodeVaultService(
get() = mutex.locked { _updatesInDbTx } get() = mutex.locked { _updatesInDbTx }
/** Groups adjacent transactions into batches to generate separate net updates per transaction type. */ /** Groups adjacent transactions into batches to generate separate net updates per transaction type. */
override fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable<CoreTransaction>) { override fun notifyAll(statesToRecord: StatesToRecord, txns: Iterable<CoreTransaction>, previouslySeenTxns: Iterable<CoreTransaction>) {
if (statesToRecord == StatesToRecord.NONE || !txns.any()) return if (statesToRecord == StatesToRecord.NONE || (!txns.any() && !previouslySeenTxns.any())) return
val batch = mutableListOf<CoreTransaction>() val batch = mutableListOf<CoreTransaction>()
fun flushBatch() { fun flushBatch(previouslySeen: Boolean) {
val updates = makeUpdates(batch, statesToRecord) val updates = makeUpdates(batch, statesToRecord, previouslySeen)
processAndNotify(updates) processAndNotify(updates, previouslySeen)
batch.clear() batch.clear()
} }
fun processTransactions(txs: Iterable<CoreTransaction>, previouslySeen: Boolean) {
for (tx in txns) { for (tx in txs) {
if (batch.isNotEmpty() && tx.javaClass != batch.last().javaClass) { if (batch.isNotEmpty() && tx.javaClass != batch.last().javaClass) {
flushBatch() flushBatch(previouslySeen)
}
batch.add(tx)
} }
batch.add(tx) flushBatch(previouslySeen)
} }
flushBatch()
processTransactions(previouslySeenTxns, true)
processTransactions(txns, false)
} }
private fun makeUpdates(batch: Iterable<CoreTransaction>, statesToRecord: StatesToRecord): List<Vault.Update<ContractState>> { private fun makeUpdates(batch: Iterable<CoreTransaction>, statesToRecord: StatesToRecord, previouslySeen: Boolean): List<Vault.Update<ContractState>> {
fun <T> withValidDeserialization(list: List<T>, txId: SecureHash): Map<Int, T> = (0 until list.size).mapNotNull { idx -> fun <T> withValidDeserialization(list: List<T>, txId: SecureHash): Map<Int, T> = (0 until list.size).mapNotNull { idx ->
try { try {
@ -251,7 +259,18 @@ class NodeVaultService(
StatesToRecord.ONLY_RELEVANT -> outputs.filter { (_, value) -> StatesToRecord.ONLY_RELEVANT -> outputs.filter { (_, value) ->
isRelevant(value.data, keyManagementService.filterMyKeys(outputs.values.flatMap { it.data.participants.map { it.owningKey } }).toSet()) isRelevant(value.data, keyManagementService.filterMyKeys(outputs.values.flatMap { it.data.participants.map { it.owningKey } }).toSet())
} }
StatesToRecord.ALL_VISIBLE -> outputs StatesToRecord.ALL_VISIBLE -> if (previouslySeen) {
// For transactions being re-recorded, the node must check its vault to find out what states it has already seen. Note
// that some of the outputs previously seen may have been consumed in the meantime, so the check must look for all state
// statuses.
val outputRefs = tx.outRefsOfType<ContractState>().map { it.ref }
val seenRefs = loadStates(outputRefs).map { it.ref }
val unseenRefs = outputRefs - seenRefs
val unseenOutputIdxs = unseenRefs.map { it.index }.toSet()
outputs.filter { it.key in unseenOutputIdxs }
} else {
outputs
}
}.map { (idx, _) -> tx.outRef<ContractState>(idx) } }.map { (idx, _) -> tx.outRef<ContractState>(idx) }
// Retrieve all unconsumed states for this transaction's inputs. // Retrieve all unconsumed states for this transaction's inputs.
@ -334,18 +353,20 @@ class NodeVaultService(
(0..(refsList.size - 1) / pageSize).forEach { (0..(refsList.size - 1) / pageSize).forEach {
val offset = it * pageSize val offset = it * pageSize
val limit = minOf(offset + pageSize, refsList.size) val limit = minOf(offset + pageSize, refsList.size)
val page = queryBy<ContractState>(QueryCriteria.VaultQueryCriteria(stateRefs = refsList.subList(offset, limit))).states val page = queryBy<ContractState>(QueryCriteria.VaultQueryCriteria(
stateRefs = refsList.subList(offset, limit),
status = Vault.StateStatus.ALL)).states
states.addAll(page) states.addAll(page)
} }
} }
return states return states
} }
private fun processAndNotify(updates: List<Vault.Update<ContractState>>) { private fun processAndNotify(updates: List<Vault.Update<ContractState>>, previouslySeen: Boolean) {
if (updates.isEmpty()) return if (updates.isEmpty()) return
val netUpdate = updates.reduce { update1, update2 -> update1 + update2 } val netUpdate = updates.reduce { update1, update2 -> update1 + update2 }
if (!netUpdate.isEmpty()) { if (!netUpdate.isEmpty()) {
recordUpdate(netUpdate) recordUpdate(netUpdate, previouslySeen)
mutex.locked { mutex.locked {
// flowId was required by SoftLockManager to perform auto-registration of soft locks for new states // flowId was required by SoftLockManager to perform auto-registration of soft locks for new states
val uuid = (Strand.currentStrand() as? FlowStateMachineImpl<*>)?.id?.uuid val uuid = (Strand.currentStrand() as? FlowStateMachineImpl<*>)?.id?.uuid

View File

@ -18,10 +18,7 @@ import net.corda.node.testing.MessageData
import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity import net.corda.testing.core.singleIdentity
import net.corda.testing.node.internal.InternalMockNetwork import net.corda.testing.node.internal.*
import net.corda.testing.node.internal.cordappWithPackages
import net.corda.testing.node.internal.enclosedCordapp
import net.corda.testing.node.internal.startFlow
import org.junit.After import org.junit.After
import org.junit.Before import org.junit.Before
import org.junit.Test import org.junit.Test
@ -45,6 +42,44 @@ class ObserverNodeTransactionTests {
mockNet.stopNodes() mockNet.stopNodes()
} }
fun buildTransactionChain(initialMessage: MessageData, chainLength: Int, node: TestStartedNode, notary: Party) {
node.services.startFlow(StartMessageChainFlow(initialMessage, notary)).resultFuture.getOrThrow()
var result = node.services.vaultService.queryBy(MessageChainState::class.java).states.singleOrNull {
it.state.data.message.value.startsWith(initialMessage.value)
}
for (_i in 0.until(chainLength -1 )) {
node.services.startFlow(ContinueMessageChainFlow(result!!, notary)).resultFuture.getOrThrow()
result = node.services.vaultService.queryBy(MessageChainState::class.java).states.singleOrNull {
it.state.data.message.value.startsWith(initialMessage.value)
}
}
}
fun sendTransactionToObserver(transactionIdx: Int, node: TestStartedNode, regulator: TestStartedNode) {
val transactionList = node.services.validatedTransactions.track().snapshot
node.services.startFlow(ReportToCounterparty(regulator.info.singleIdentity(), transactionList[transactionIdx])).resultFuture.getOrThrow()
}
fun sendTransactionToObserverOnlyRelevant(transactionIdx: Int, node: TestStartedNode, regulator: TestStartedNode) {
val transactionList = node.services.validatedTransactions.track().snapshot
node.services.startFlow(SendTransaction(regulator.info.singleIdentity(), transactionList[transactionIdx])).resultFuture.getOrThrow()
}
fun checkObserverTransactions(expectedMessage: MessageData, regulator: TestStartedNode, numStates: Int = 1) {
val regulatorStates = regulator.services.vaultService.queryBy(MessageChainState::class.java).states.filter {
it.state.data.message.value.startsWith(expectedMessage.value[0])
}
assertNotNull(regulatorStates, "Could not find any regulator states")
assertEquals(numStates, regulatorStates.size, "Incorrect number of unconsumed regulator states")
for (state in regulatorStates) {
val retrievedMessage = state.state.data.message
assertEquals(expectedMessage, retrievedMessage, "Final unconsumed regulator state is incorrect")
}
}
@Test @Test
fun `Broadcasting an old transaction does not cause 2 unconsumed states`() { fun `Broadcasting an old transaction does not cause 2 unconsumed states`() {
val node = mockNet.createPartyNode(ALICE_NAME) val node = mockNet.createPartyNode(ALICE_NAME)
@ -52,45 +87,81 @@ class ObserverNodeTransactionTests {
val notary = mockNet.defaultNotaryIdentity val notary = mockNet.defaultNotaryIdentity
regulator.registerInitiatedFlow(ReceiveReportedTransaction::class.java) regulator.registerInitiatedFlow(ReceiveReportedTransaction::class.java)
fun buildTransactionChain(initialMessage: MessageData, chainLength: Int) {
node.services.startFlow(StartMessageChainFlow(initialMessage, notary)).resultFuture.getOrThrow()
var result = node.services.vaultService.queryBy(MessageChainState::class.java).states.singleOrNull {
it.state.data.message.value.startsWith(initialMessage.value)
}
for (_i in 0.until(chainLength -1 )) {
node.services.startFlow(ContinueMessageChainFlow(result!!, notary)).resultFuture.getOrThrow()
result = node.services.vaultService.queryBy(MessageChainState::class.java).states.singleOrNull {
it.state.data.message.value.startsWith(initialMessage.value)
}
}
}
fun sendTransactionToObserver(transactionIdx: Int) {
val transactionList = node.services.validatedTransactions.track().snapshot
node.services.startFlow(ReportToCounterparty(regulator.info.singleIdentity(), transactionList[transactionIdx])).resultFuture.getOrThrow()
}
fun checkObserverTransactions(expectedMessage: MessageData) {
val regulatorStates = regulator.services.vaultService.queryBy(MessageChainState::class.java).states.filter {
it.state.data.message.value.startsWith(expectedMessage.value[0])
}
assertNotNull(regulatorStates, "Could not find any regulator states")
assertEquals(1, regulatorStates.size, "Incorrect number of unconsumed regulator states")
val retrievedMessage = regulatorStates.singleOrNull()!!.state.data.message
assertEquals(expectedMessage, retrievedMessage, "Final unconsumed regulator state is incorrect")
}
// Check that sending an old transaction doesn't result in a new unconsumed state // Check that sending an old transaction doesn't result in a new unconsumed state
val message = MessageData("A") val message = MessageData("A")
buildTransactionChain(message, 4) buildTransactionChain(message, 4, node, notary)
sendTransactionToObserver(3) sendTransactionToObserver(3, node, regulator)
sendTransactionToObserver(1) sendTransactionToObserver(1, node, regulator)
val outputMessage = MessageData("AAAA") val outputMessage = MessageData("AAAA")
checkObserverTransactions(outputMessage) checkObserverTransactions(outputMessage, regulator)
} }
@Test
fun `Non relevant states are recorded if transaction is re-received with new states to record`() {
val node = mockNet.createPartyNode(ALICE_NAME)
val regulator = mockNet.createPartyNode(BOB_NAME)
val notary = mockNet.defaultNotaryIdentity
regulator.registerInitiatedFlow(ReceiveReportedTransaction::class.java)
regulator.registerInitiatedFlow(ReceiveTransaction::class.java)
val message = MessageData("A")
buildTransactionChain(message, 4, node, notary)
sendTransactionToObserverOnlyRelevant(3, node, regulator)
sendTransactionToObserver(3, node, regulator)
val outputMessage = MessageData("AAAA")
checkObserverTransactions(outputMessage, regulator)
}
@Test
fun `Re-recording a transaction adds non-relevant states`() {
val alice = mockNet.createPartyNode(ALICE_NAME)
val bob = mockNet.createPartyNode(BOB_NAME)
val notary = mockNet.defaultNotaryIdentity
bob.registerInitiatedFlow(ReceiveSplitMessagesFlow::class.java)
bob.registerInitiatedFlow(ReceiveReportedTransaction::class.java)
val message = MessageData("AA")
alice.services.startFlow(SplitMessagesFlow(message, bob.info.singleIdentity(), notary)).resultFuture.getOrThrow()
checkObserverTransactions(message, bob)
sendTransactionToObserver(0, alice, bob)
checkObserverTransactions(message, bob, 2)
}
@Test
fun `Re-recording a transaction at only relevant does not cause failures`() {
val alice = mockNet.createPartyNode(ALICE_NAME)
val bob = mockNet.createPartyNode(BOB_NAME)
val notary = mockNet.defaultNotaryIdentity
bob.registerInitiatedFlow(ReceiveSplitMessagesFlow::class.java)
bob.registerInitiatedFlow(ReceiveReportedTransaction::class.java)
val message = MessageData("AA")
alice.services.startFlow(SplitMessagesFlow(message, bob.info.singleIdentity(), notary)).resultFuture.getOrThrow()
checkObserverTransactions(message, bob)
sendTransactionToObserverOnlyRelevant(0, alice, bob)
checkObserverTransactions(message, bob, 1)
}
@Test
fun `Recording a transaction twice at all visible works`() {
val alice = mockNet.createPartyNode(ALICE_NAME)
val bob = mockNet.createPartyNode(BOB_NAME)
val notary = mockNet.defaultNotaryIdentity
bob.registerInitiatedFlow(ReceiveSplitMessagesFlow::class.java)
bob.registerInitiatedFlow(ReceiveReportedTransaction::class.java)
val message = MessageData("AA")
alice.services.startFlow(SplitMessagesFlow(message, bob.info.singleIdentity(), notary)).resultFuture.getOrThrow()
checkObserverTransactions(message, bob)
sendTransactionToObserverOnlyRelevant(0, alice, bob)
checkObserverTransactions(message, bob, 1)
sendTransactionToObserver(0, alice, bob)
checkObserverTransactions(message, bob, 2)
sendTransactionToObserver(0, alice, bob)
checkObserverTransactions(message, bob, 2)
sendTransactionToObserverOnlyRelevant(0, alice, bob)
checkObserverTransactions(message, bob, 2)
}
@StartableByRPC @StartableByRPC
class StartMessageChainFlow(private val message: MessageData, private val notary: Party) : FlowLogic<SignedTransaction>() { class StartMessageChainFlow(private val message: MessageData, private val notary: Party) : FlowLogic<SignedTransaction>() {
@ -167,21 +238,111 @@ class ObserverNodeTransactionTests {
} }
} }
@StartableByRPC
@InitiatingFlow
class SplitMessagesFlow(private val message: MessageData,
private val counterparty: Party,
private val notary: Party): FlowLogic<SignedTransaction>() {
companion object {
object GENERATING_TRANSACTION : ProgressTracker.Step("Generating transaction based on the message.")
object VERIFYING_TRANSACTION : ProgressTracker.Step("Verifying contract constraints.")
object SIGNING_TRANSACTION : ProgressTracker.Step("Signing transaction with our private key.")
object FINALISING_TRANSACTION : ProgressTracker.Step("Obtaining notary signature and recording transaction.") {
override fun childProgressTracker() = FinalityFlow.tracker()
}
fun tracker() = ProgressTracker(GENERATING_TRANSACTION, VERIFYING_TRANSACTION, SIGNING_TRANSACTION, FINALISING_TRANSACTION)
}
override val progressTracker = tracker()
@Suspendable
override fun call(): SignedTransaction {
progressTracker.currentStep = GENERATING_TRANSACTION
val messageState = MessageChainState(message = message, by = ourIdentity)
val otherPartyState = MessageChainState(message = message, by = counterparty)
val txCommand = Command(MessageChainContract.Commands.Split(), listOf(ourIdentity.owningKey, counterparty.owningKey))
val txBuilder = TransactionBuilder(notary)
.addOutputState(messageState)
.addOutputState(otherPartyState)
.addCommand(txCommand)
progressTracker.currentStep = VERIFYING_TRANSACTION
txBuilder.toWireTransaction(serviceHub).toLedgerTransaction(serviceHub).verify()
progressTracker.currentStep = SIGNING_TRANSACTION
val signedTx = serviceHub.signInitialTransaction(txBuilder)
val session = initiateFlow(counterparty)
val stx = subFlow(CollectSignaturesFlow(signedTx, listOf(session)))
progressTracker.currentStep = FINALISING_TRANSACTION
val finalStx = subFlow(FinalityFlow(stx, listOf(session), FINALISING_TRANSACTION.childProgressTracker()))
session.receive<Unit>()
return finalStx
}
}
@InitiatedBy(SplitMessagesFlow::class)
class ReceiveSplitMessagesFlow(private val otherSideSession: FlowSession): FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
val flow = object : SignTransactionFlow(otherSideSession) {
@Suspendable
override fun checkTransaction(stx: SignedTransaction) {
}
}
subFlow(flow)
val stx = subFlow(ReceiveFinalityFlow(otherSideSession))
otherSideSession.send(Unit)
return stx
}
}
@InitiatingFlow @InitiatingFlow
@StartableByRPC @StartableByRPC
class ReportToCounterparty(private val regulator: Party, private val signedTx: SignedTransaction) : FlowLogic<Unit>() { class ReportToCounterparty(private val regulator: Party,
private val signedTx: SignedTransaction) : FlowLogic<Unit>() {
@Suspendable @Suspendable
override fun call() { override fun call() {
val session = initiateFlow(regulator) val session = initiateFlow(regulator)
subFlow(SendTransactionFlow(session, signedTx)) subFlow(SendTransactionFlow(session, signedTx))
session.receive<Unit>()
} }
} }
@InitiatedBy(ReportToCounterparty::class) @InitiatedBy(ReportToCounterparty::class)
class ReceiveReportedTransaction(private val otherSideSession: FlowSession) : FlowLogic<Unit>() { class ReceiveReportedTransaction(private val otherSideSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable @Suspendable
override fun call() { override fun call() {
subFlow(ReceiveTransactionFlow(otherSideSession, true, StatesToRecord.ALL_VISIBLE)) subFlow(ReceiveTransactionFlow(otherSideSession, true, StatesToRecord.ALL_VISIBLE))
otherSideSession.send(Unit)
}
}
@InitiatingFlow
@StartableByRPC
class SendTransaction(private val regulator: Party,
private val signedTx: SignedTransaction) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(regulator)
subFlow(SendTransactionFlow(session, signedTx))
session.receive<Unit>()
}
}
@InitiatedBy(SendTransaction::class)
class ReceiveTransaction(private val otherSideSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
subFlow(ReceiveTransactionFlow(otherSideSession, true, StatesToRecord.ONLY_RELEVANT))
otherSideSession.send(Unit)
} }
} }
} }

View File

@ -52,7 +52,7 @@ object MessageChainSchemaV1 : MappedSchema(
const val MESSAGE_CHAIN_CONTRACT_PROGRAM_ID = "net.corda.node.testing.MessageChainContract" const val MESSAGE_CHAIN_CONTRACT_PROGRAM_ID = "net.corda.node.testing.MessageChainContract"
open class MessageChainContract : Contract { open class MessageChainContract : Contract {
override fun verify(tx: LedgerTransaction) { private fun verifySend(tx: LedgerTransaction) {
val command = tx.commands.requireSingleCommand<Commands.Send>() val command = tx.commands.requireSingleCommand<Commands.Send>()
requireThat { requireThat {
// Generic constraints around the IOU transaction. // Generic constraints around the IOU transaction.
@ -64,7 +64,22 @@ open class MessageChainContract : Contract {
} }
} }
private fun verifySplit(tx: LedgerTransaction) {
requireThat {
"Two output state should be created." using (tx.outputs.size == 2)
}
}
override fun verify(tx: LedgerTransaction) {
val command = tx.commands.requireSingleCommand<Commands>().value
when (command) {
is Commands.Send -> verifySend(tx)
is Commands.Split -> verifySplit(tx)
}
}
interface Commands : CommandData { interface Commands : CommandData {
class Send : Commands class Send : Commands
class Split : Commands
} }
} }