From 6f3ed327a0fd9262f8511d4bea19c1794e49be3d Mon Sep 17 00:00:00 2001 From: Andrius Dagys Date: Thu, 15 Dec 2016 14:15:46 +0000 Subject: [PATCH] Vault: return a list of unconsumed states rather than a sequence. The lazy evaluation captures some extra context, serialization of which in flows results in errors. --- .../kotlin/net/corda/core/node/services/Services.kt | 8 +++----- .../net/corda/node/internal/CordaRPCOpsImpl.kt | 2 +- .../corda/node/services/vault/NodeVaultService.kt | 13 ++++++------- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/node/services/Services.kt b/core/src/main/kotlin/net/corda/core/node/services/Services.kt index 5103ed5a9c..5b20338bb8 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/Services.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/Services.kt @@ -38,7 +38,7 @@ val DEFAULT_SESSION_ID = 0L * Active means they haven't been consumed yet (or we don't know about it). * Relevant means they contain at least one of our pubkeys. */ -class Vault(val states: Iterable>) { +class Vault(val states: List>) { @Suppress("UNCHECKED_CAST") inline fun statesOfType() = states.filter { it.state.data is T } as List> @@ -151,14 +151,12 @@ interface VaultService { * Possibly update the vault by marking as spent states that these transactions consume, and adding any relevant * new states that they create. You should only insert transactions that have been successfully verified here! * - * Returns the new vault that resulted from applying the transactions (note: it may quickly become out of date). - * * TODO: Consider if there's a good way to enforce the must-be-verified requirement in the type system. */ - fun notifyAll(txns: Iterable): Vault + fun notifyAll(txns: Iterable) /** Same as notifyAll but with a single transaction. */ - fun notify(tx: WireTransaction): Vault = notifyAll(listOf(tx)) + fun notify(tx: WireTransaction) = notifyAll(listOf(tx)) /** * Provide a [Future] for when a [StateRef] is consumed, which can be very useful in building tests. diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index 533ec5e42c..06afda26c8 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -52,7 +52,7 @@ class CordaRPCOpsImpl( override fun vaultAndUpdates(): Pair>, Observable> { return databaseTransaction(database) { val (vault, updates) = services.vaultService.track() - Pair(vault.states.toList(), updates) + Pair(vault.states, updates) } } diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index bce32f0b26..74b98fc42f 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -103,13 +103,13 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT // For use during publishing only. val updatesPublisher: rx.Observer get() = _updatesPublisher.bufferUntilDatabaseCommit().tee(_rawUpdatesPublisher) - fun allUnconsumedStates(): Iterable> { - // Order by txhash for if and when transaction storage has some caching. - // Map to StateRef and then to StateAndRef. Use Sequence to avoid conversion to ArrayList that Iterable.map() performs. - return unconsumedStates.asSequence().map { + fun allUnconsumedStates(): List> { + // Ideally we'd map this transform onto a sequence, but we can't have a lazy list here, since accessing it + // from a flow might end up trying to serialize the captured context - vault internal state or db context. + return unconsumedStates.map { val storedTx = services.storageService.validatedTransactions.getTransaction(it.txhash) ?: throw Error("Found transaction hash ${it.txhash} in unconsumed contract states that is not in transaction storage.") StateAndRef(storedTx.tx.outputs[it.index], it) - }.asIterable() + } } fun recordUpdate(update: Vault.Update): Vault.Update { @@ -169,7 +169,7 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT override val linearHeads: Map> get() = currentVault.states.filterStatesOfType().associateBy { it.state.data.linearId }.mapValues { it.value } - override fun notifyAll(txns: Iterable): Vault { + override fun notifyAll(txns: Iterable) { val ourKeys = services.keyManagementService.keys.keys val netDelta = txns.fold(Vault.NoUpdate) { netDelta, txn -> netDelta + makeUpdate(txn, netDelta, ourKeys) } if (netDelta != Vault.NoUpdate) { @@ -179,7 +179,6 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT updatesPublisher.onNext(netDelta) } } - return currentVault } override fun addNoteToTransaction(txnId: SecureHash, noteText: String) {