Vault: return a list of unconsumed states rather than a sequence. The lazy evaluation captures some extra context, serialization of which in flows results in errors.

This commit is contained in:
Andrius Dagys 2016-12-15 14:15:46 +00:00
parent f44dd969ce
commit 6f3ed327a0
3 changed files with 10 additions and 13 deletions

View File

@ -38,7 +38,7 @@ val DEFAULT_SESSION_ID = 0L
* Active means they haven't been consumed yet (or we don't know about it).
* Relevant means they contain at least one of our pubkeys.
*/
class Vault(val states: Iterable<StateAndRef<ContractState>>) {
class Vault(val states: List<StateAndRef<ContractState>>) {
@Suppress("UNCHECKED_CAST")
inline fun <reified T : ContractState> statesOfType() = states.filter { it.state.data is T } as List<StateAndRef<T>>
@ -151,14 +151,12 @@ interface VaultService {
* Possibly update the vault by marking as spent states that these transactions consume, and adding any relevant
* new states that they create. You should only insert transactions that have been successfully verified here!
*
* Returns the new vault that resulted from applying the transactions (note: it may quickly become out of date).
*
* TODO: Consider if there's a good way to enforce the must-be-verified requirement in the type system.
*/
fun notifyAll(txns: Iterable<WireTransaction>): Vault
fun notifyAll(txns: Iterable<WireTransaction>)
/** Same as notifyAll but with a single transaction. */
fun notify(tx: WireTransaction): Vault = notifyAll(listOf(tx))
fun notify(tx: WireTransaction) = notifyAll(listOf(tx))
/**
* Provide a [Future] for when a [StateRef] is consumed, which can be very useful in building tests.

View File

@ -52,7 +52,7 @@ class CordaRPCOpsImpl(
override fun vaultAndUpdates(): Pair<List<StateAndRef<ContractState>>, Observable<Vault.Update>> {
return databaseTransaction(database) {
val (vault, updates) = services.vaultService.track()
Pair(vault.states.toList(), updates)
Pair(vault.states, updates)
}
}

View File

@ -103,13 +103,13 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
// For use during publishing only.
val updatesPublisher: rx.Observer<Vault.Update> get() = _updatesPublisher.bufferUntilDatabaseCommit().tee(_rawUpdatesPublisher)
fun allUnconsumedStates(): Iterable<StateAndRef<ContractState>> {
// Order by txhash for if and when transaction storage has some caching.
// Map to StateRef and then to StateAndRef. Use Sequence to avoid conversion to ArrayList that Iterable.map() performs.
return unconsumedStates.asSequence().map {
fun allUnconsumedStates(): List<StateAndRef<ContractState>> {
// Ideally we'd map this transform onto a sequence, but we can't have a lazy list here, since accessing it
// from a flow might end up trying to serialize the captured context - vault internal state or db context.
return unconsumedStates.map {
val storedTx = services.storageService.validatedTransactions.getTransaction(it.txhash) ?: throw Error("Found transaction hash ${it.txhash} in unconsumed contract states that is not in transaction storage.")
StateAndRef(storedTx.tx.outputs[it.index], it)
}.asIterable()
}
}
fun recordUpdate(update: Vault.Update): Vault.Update {
@ -169,7 +169,7 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
override val linearHeads: Map<UniqueIdentifier, StateAndRef<LinearState>>
get() = currentVault.states.filterStatesOfType<LinearState>().associateBy { it.state.data.linearId }.mapValues { it.value }
override fun notifyAll(txns: Iterable<WireTransaction>): Vault {
override fun notifyAll(txns: Iterable<WireTransaction>) {
val ourKeys = services.keyManagementService.keys.keys
val netDelta = txns.fold(Vault.NoUpdate) { netDelta, txn -> netDelta + makeUpdate(txn, netDelta, ourKeys) }
if (netDelta != Vault.NoUpdate) {
@ -179,7 +179,6 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
updatesPublisher.onNext(netDelta)
}
}
return currentVault
}
override fun addNoteToTransaction(txnId: SecureHash, noteText: String) {