mirror of
https://github.com/corda/corda.git
synced 2024-12-19 21:17:58 +00:00
CORDA-1096 - Performance when loading multiple states from the vault (#2614)
* CORDA-1096 - Performance when loading multiple states from the vault (#2609) * Provide efficient `loadStates()` implementation * Replace loops using `loadState` with calls to `loadStates` * Replace `map`/`flatMap` with just a single `flatMap`
This commit is contained in:
parent
50ccb32700
commit
a483e7e8ce
@ -53,7 +53,7 @@ object IdentitySyncFlow {
|
||||
}
|
||||
|
||||
private fun extractOurConfidentialIdentities(): Map<AbstractParty, PartyAndCertificate?> {
|
||||
val states: List<ContractState> = (tx.inputs.map { serviceHub.loadState(it) }.requireNoNulls().map { it.data } + tx.outputs.map { it.data })
|
||||
val states: List<ContractState> = (serviceHub.loadStates(tx.inputs.toSet()).map { it.state.data } + tx.outputs.map { it.data })
|
||||
val identities: Set<AbstractParty> = states.flatMap(ContractState::participants).toSet()
|
||||
// Filter participants down to the set of those not in the network map (are not well known)
|
||||
val confidentialIdentities = identities
|
||||
|
@ -63,7 +63,7 @@ class NotaryFlow {
|
||||
protected fun checkTransaction(): Party {
|
||||
val notaryParty = stx.notary ?: throw IllegalStateException("Transaction does not specify a Notary")
|
||||
check(serviceHub.networkMapCache.isNotary(notaryParty)) { "$notaryParty is not a notary on the network" }
|
||||
check(stx.inputs.all { stateRef -> serviceHub.loadState(stateRef).notary == notaryParty }) {
|
||||
check(serviceHub.loadStates(stx.inputs.toSet()).all { it.state.notary == notaryParty }) {
|
||||
"Input states must have the same Notary"
|
||||
}
|
||||
|
||||
|
@ -26,6 +26,10 @@ interface StateLoader {
|
||||
/**
|
||||
* Given a [StateRef] loads the referenced transaction and looks up the specified output [ContractState].
|
||||
*
|
||||
* *WARNING* Do not use this method unless you really only want a single state - any batch loading should
|
||||
* go through [loadStates] as repeatedly calling [loadState] can lead to repeat deserialsiation work and
|
||||
* severe performance degradation.
|
||||
*
|
||||
* @throws TransactionResolutionException if [stateRef] points to a non-existent transaction.
|
||||
*/
|
||||
@Throws(TransactionResolutionException::class)
|
||||
@ -39,9 +43,7 @@ interface StateLoader {
|
||||
// TODO: future implementation to use a Vault state ref -> contract state BLOB table and perform single query bulk load
|
||||
// as the existing transaction store will become encrypted at some point
|
||||
@Throws(TransactionResolutionException::class)
|
||||
fun loadStates(stateRefs: Set<StateRef>): Set<StateAndRef<ContractState>> {
|
||||
return stateRefs.map { StateAndRef(loadState(it), it) }.toSet()
|
||||
}
|
||||
fun loadStates(stateRefs: Set<StateRef>): Set<StateAndRef<ContractState>>
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1,9 +1,7 @@
|
||||
package net.corda.core.node.services
|
||||
|
||||
import net.corda.core.DoNotImplement
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TransactionResolutionException
|
||||
import net.corda.core.contracts.TransactionState
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.node.StateLoader
|
||||
@ -26,6 +24,15 @@ interface TransactionStorage : StateLoader {
|
||||
return stx.resolveBaseTransaction(this).outputs[stateRef.index]
|
||||
}
|
||||
|
||||
@Throws(TransactionResolutionException::class)
|
||||
override fun loadStates(stateRefs: Set<StateRef>): Set<StateAndRef<ContractState>> {
|
||||
return stateRefs.groupBy { it.txhash }.flatMap {
|
||||
val stx = getTransaction(it.key) ?: throw TransactionResolutionException(it.key)
|
||||
val baseTx = stx.resolveBaseTransaction(this)
|
||||
it.value.map { StateAndRef(baseTx.outputs[it.index], it) }
|
||||
}.toSet()
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a synchronous Observable of updates. When observations are pushed to the Observer, the vault will already
|
||||
* incorporate the update.
|
||||
|
@ -42,9 +42,7 @@ data class NotaryChangeWireTransaction(
|
||||
|
||||
fun resolve(services: ServiceHub, sigs: List<TransactionSignature>) = resolve(services as StateLoader, sigs)
|
||||
fun resolve(stateLoader: StateLoader, sigs: List<TransactionSignature>): NotaryChangeLedgerTransaction {
|
||||
val resolvedInputs = inputs.map { ref ->
|
||||
stateLoader.loadState(ref).let { StateAndRef(it, ref) }
|
||||
}
|
||||
val resolvedInputs = stateLoader.loadStates(inputs.toSet()).toList()
|
||||
return NotaryChangeLedgerTransaction(resolvedInputs, notary, newNotary, id, sigs)
|
||||
}
|
||||
}
|
||||
|
@ -97,7 +97,7 @@ object TwoPartyTradeFlow {
|
||||
val signTransactionFlow = object : SignTransactionFlow(otherSideSession, VERIFYING_AND_SIGNING.childProgressTracker()) {
|
||||
override fun checkTransaction(stx: SignedTransaction) {
|
||||
// Verify that we know who all the participants in the transaction are
|
||||
val states: Iterable<ContractState> = stx.tx.inputs.map { serviceHub.loadState(it).data } + stx.tx.outputs.map { it.data }
|
||||
val states: Iterable<ContractState> = serviceHub.loadStates(stx.tx.inputs.toSet()).map { it.state.data } + stx.tx.outputs.map { it.data }
|
||||
states.forEach { state ->
|
||||
state.participants.forEach { anon ->
|
||||
require(serviceHub.identityService.wellKnownPartyFromAnonymous(anon) != null) {
|
||||
|
@ -2,10 +2,7 @@ package net.corda.node.services
|
||||
|
||||
import com.nhaarman.mockito_kotlin.doReturn
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.core.contracts.Contract
|
||||
import net.corda.core.contracts.PartyAndReference
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TransactionState
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.cordapp.CordappProvider
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.UnexpectedFlowEndException
|
||||
@ -87,6 +84,7 @@ class AttachmentLoadingTests {
|
||||
|
||||
private val services = object : ServicesForResolution {
|
||||
override fun loadState(stateRef: StateRef): TransactionState<*> = throw NotImplementedError()
|
||||
override fun loadStates(stateRefs: Set<StateRef>): Set<StateAndRef<ContractState>> = throw NotImplementedError()
|
||||
override val identityService = rigorousMock<IdentityService>().apply {
|
||||
doReturn(null).whenever(this).partyFromKey(DUMMY_BANK_A.owningKey)
|
||||
}
|
||||
|
@ -77,6 +77,9 @@ data class TestTransactionDSLInterpreter private constructor(
|
||||
|
||||
val services = object : ServicesForResolution by ledgerInterpreter.services {
|
||||
override fun loadState(stateRef: StateRef) = ledgerInterpreter.resolveStateRef<ContractState>(stateRef)
|
||||
override fun loadStates(stateRefs: Set<StateRef>): Set<StateAndRef<ContractState>> {
|
||||
return stateRefs.map { StateAndRef(loadState(it), it) }.toSet()
|
||||
}
|
||||
override val cordappProvider: CordappProvider = ledgerInterpreter.services.cordappProvider
|
||||
}
|
||||
|
||||
|
@ -50,6 +50,13 @@ data class GeneratedLedger(
|
||||
override fun loadState(stateRef: StateRef): TransactionState<*> {
|
||||
return hashTransactionMap[stateRef.txhash]?.outputs?.get(stateRef.index) ?: throw TransactionResolutionException(stateRef.txhash)
|
||||
}
|
||||
|
||||
override fun loadStates(stateRefs: Set<StateRef>): Set<StateAndRef<ContractState>> {
|
||||
return stateRefs.groupBy { it.txhash }.flatMap {
|
||||
val outputs = hashTransactionMap[it.key]?.outputs ?: throw TransactionResolutionException(it.key)
|
||||
it.value.map { StateAndRef(outputs[it.index], it) }
|
||||
}.toSet()
|
||||
}
|
||||
override val identityService = rigorousMock<IdentityService>().apply {
|
||||
doAnswer { identityMap[it.arguments[0]] }.whenever(this).partyFromKey(any())
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user