VaultTrack returns undesired states #3276 (#3336)

* filter by contract state in _trackBy

* write tests to check that _trackBy is filtering the states correct and tidy up filtering functions

* remove un needed function

* add change log message for filtering unrelated ContractStates from trackBy
This commit is contained in:
Dan Newton 2018-06-11 17:53:31 +01:00 committed by Joel Dudley
parent 6cc08776b5
commit 5ceb61606a
4 changed files with 119 additions and 6 deletions

View File

@ -7,6 +7,9 @@ release, see :doc:`upgrade-notes`.
Unreleased
==========
* Fixed an issue where ``trackBy`` was returning ``ContractStates`` from a transaction that were not being tracked. The
unrelated ``ContractStates`` will now be filtered out from the returned ``Vault.Update``.
* Introducing the flow hospital - a component of the node that manages flows that have errored and whether they should
be retried from their previous checkpoints or have their errors propagate. Currently it will respond to any error that
occurs during the resolution of a received transaction as part of ``FinalityFlow``. In such a scenerio the receiving

View File

@ -489,12 +489,21 @@ class NodeVaultService(
return database.transaction {
mutex.locked {
val snapshotResults = _queryBy(criteria, paging, sorting, contractStateType)
val updates: Observable<Vault.Update<T>> = uncheckedCast(_updatesPublisher.bufferUntilSubscribed().filter { it.containsType(contractStateType, snapshotResults.stateTypes) })
val updates: Observable<Vault.Update<T>> = uncheckedCast(_updatesPublisher.bufferUntilSubscribed()
.filter { it.containsType(contractStateType, snapshotResults.stateTypes) }
.map { filterContractStates(it, contractStateType) })
DataFeed(snapshotResults, updates)
}
}
}
private fun <T : ContractState> filterContractStates(update: Vault.Update<T>, contractStateType: Class<out T>) =
update.copy(consumed = filterByContractState(contractStateType, update.consumed),
produced = filterByContractState(contractStateType, update.produced))
private fun <T : ContractState> filterByContractState(contractStateType: Class<out T>, stateAndRefs: Set<StateAndRef<T>>) =
stateAndRefs.filter { contractStateType.isAssignableFrom(it.state.data.javaClass) }.toSet()
private fun getSession() = database.currentOrNew().session
/**
* Derive list from existing vault states and then incrementally update using vault observables

View File

@ -28,10 +28,7 @@ import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.testing.core.*
import net.corda.testing.internal.TEST_TX_TIME
import net.corda.testing.internal.rigorousMock
import net.corda.testing.internal.vault.DUMMY_LINEAR_CONTRACT_PROGRAM_ID
import net.corda.testing.internal.vault.DummyLinearContract
import net.corda.testing.internal.vault.DummyLinearStateSchemaV1
import net.corda.testing.internal.vault.VaultFiller
import net.corda.testing.internal.vault.*
import net.corda.testing.node.MockServices
import net.corda.testing.node.MockServices.Companion.makeTestDatabaseAndMockServices
import net.corda.testing.node.makeTestIdentityService
@ -2282,4 +2279,73 @@ class VaultQueryTests : VaultQueryTestsBase(), VaultQueryParties by delegate {
)
}
}
@Test
fun `track by only returns updates of tracked type`() {
val updates = database.transaction {
val (snapshot, updates) = vaultService.trackBy<DummyDealContract.State>()
assertThat(snapshot.states).hasSize(0)
val states = vaultFiller.fillWithSomeTestLinearAndDealStates(10).states
this.session.flush()
vaultFiller.consumeLinearStates(states.toList())
updates
}
updates.expectEvents {
sequence(
expect { (consumed, produced, flowId) ->
require(flowId == null) {}
require(consumed.isEmpty()) {}
require(produced.size == 10) {}
require(produced.filter { DummyDealContract.State::class.java.isAssignableFrom(it.state.data::class.java) }.size == 10) {}
}
)
}
}
@Test
fun `track by of super class only returns updates of sub classes of tracked type`() {
val updates = database.transaction {
val (snapshot, updates) = vaultService.trackBy<DealState>()
assertThat(snapshot.states).hasSize(0)
val states = vaultFiller.fillWithSomeTestLinearAndDealStates(10).states
this.session.flush()
vaultFiller.consumeLinearStates(states.toList())
updates
}
updates.expectEvents {
sequence(
expect { (consumed, produced, flowId) ->
require(flowId == null) {}
require(consumed.isEmpty()) {}
require(produced.size == 10) {}
require(produced.filter { DealState::class.java.isAssignableFrom(it.state.data::class.java) }.size == 10) {}
}
)
}
}
@Test
fun `track by of contract state interface returns updates of all states`() {
val updates = database.transaction {
val (snapshot, updates) = vaultService.trackBy<ContractState>()
assertThat(snapshot.states).hasSize(0)
val states = vaultFiller.fillWithSomeTestLinearAndDealStates(10).states
this.session.flush()
vaultFiller.consumeLinearStates(states.toList())
updates
}
updates.expectEvents {
sequence(
expect { (consumed, produced, flowId) ->
require(flowId == null) {}
require(consumed.isEmpty()) {}
require(produced.size == 20) {}
require(produced.filter { ContractState::class.java.isAssignableFrom(it.state.data::class.java) }.size == 20) {}
}
)
}
}
}

View File

@ -127,6 +127,42 @@ class VaultFiller @JvmOverloads constructor(
return Vault(states)
}
@JvmOverloads
fun fillWithSomeTestLinearAndDealStates(numberToCreate: Int,
externalId: String? = null,
participants: List<AbstractParty> = emptyList(),
linearString: String = "",
linearNumber: Long = 0L,
linearBoolean: Boolean = false,
linearTimestamp: Instant = now()): Vault<LinearState> {
val myKey: PublicKey = services.myInfo.chooseIdentity().owningKey
val me = AnonymousParty(myKey)
val issuerKey = defaultNotary.keyPair
val signatureMetadata = SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(issuerKey.public).schemeNumberID)
val transactions: List<SignedTransaction> = (1..numberToCreate).map {
val dummyIssue = TransactionBuilder(notary = defaultNotary.party).apply {
// Issue a Linear state
addOutputState(DummyLinearContract.State(
linearId = UniqueIdentifier(externalId),
participants = participants.plus(me),
linearString = linearString,
linearNumber = linearNumber,
linearBoolean = linearBoolean,
linearTimestamp = linearTimestamp), DUMMY_LINEAR_CONTRACT_PROGRAM_ID)
// Issue a Deal state
addOutputState(DummyDealContract.State(ref = "test ref", participants = participants.plus(me)), DUMMY_DEAL_PROGRAM_ID)
addCommand(dummyCommand())
}
return@map services.signInitialTransaction(dummyIssue).withAdditionalSignature(issuerKey, signatureMetadata)
}
services.recordTransactions(transactions)
// Get all the StateAndRefs of all the generated transactions.
val states = transactions.flatMap { stx ->
stx.tx.outputs.indices.map { i -> stx.tx.outRef<LinearState>(i) }
}
return Vault(states)
}
@JvmOverloads
fun fillWithSomeTestCash(howMuch: Amount<Currency>,
issuerServices: ServiceHub,
@ -167,7 +203,6 @@ class VaultFiller @JvmOverloads constructor(
return Vault(states)
}
/**
* Puts together an issuance transaction for the specified amount that starts out being owned by the given pubkey.
*/