mirror of
https://github.com/corda/corda.git
synced 2025-01-18 10:46:38 +00:00
CORDA-3177 Additional Back Chain Resolution performance enhancements (#5415)
* Applied backchain fetch optimisations * Removed test for bulk fetch * Removed import * Removed extraneous import * Review comment
This commit is contained in:
parent
14b2882bd7
commit
9d6922d11e
@ -20,7 +20,6 @@ import net.corda.core.utilities.getOrThrow
|
|||||||
import net.corda.core.utilities.sequence
|
import net.corda.core.utilities.sequence
|
||||||
import net.corda.core.utilities.unwrap
|
import net.corda.core.utilities.unwrap
|
||||||
import net.corda.coretests.flows.TestNoSecurityDataVendingFlow
|
import net.corda.coretests.flows.TestNoSecurityDataVendingFlow
|
||||||
import net.corda.node.services.DbTransactionsResolver.Companion.IN_MEMORY_RESOLUTION_LIMIT_PROP_NAME
|
|
||||||
import net.corda.testing.contracts.DummyContract
|
import net.corda.testing.contracts.DummyContract
|
||||||
import net.corda.testing.contracts.DummyContractV2
|
import net.corda.testing.contracts.DummyContractV2
|
||||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||||
@ -205,27 +204,6 @@ class ResolveTransactionsFlowTest {
|
|||||||
assertFailsWith<FetchDataFlow.IllegalTransactionRequest> { future.getOrThrow() }
|
assertFailsWith<FetchDataFlow.IllegalTransactionRequest> { future.getOrThrow() }
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
fun `Switches between checkpoint and DB based resolution correctly`() {
|
|
||||||
System.setProperty(IN_MEMORY_RESOLUTION_LIMIT_PROP_NAME, "20")
|
|
||||||
var numTransactions = 0
|
|
||||||
megaCorpNode.services.validatedTransactions.updates.subscribe {
|
|
||||||
numTransactions++
|
|
||||||
}
|
|
||||||
val txToResolve = makeLargeTransactionChain(50)
|
|
||||||
var numUpdates = 0
|
|
||||||
miniCorpNode.services.validatedTransactions.updates.subscribe {
|
|
||||||
numUpdates++
|
|
||||||
}
|
|
||||||
val p = TestFlow(txToResolve, megaCorp)
|
|
||||||
val future = miniCorpNode.startFlow(p)
|
|
||||||
mockNet.runNetwork()
|
|
||||||
future.getOrThrow()
|
|
||||||
// ResolveTransactionsFlow only stores transaction dependencies and not the requested transaction, so there will be one fewer
|
|
||||||
// transaction stored on the receiving node than on the sending one.
|
|
||||||
assertEquals(numTransactions - 1, numUpdates)
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `resolution works when transaction in chain is already resolved`() {
|
fun `resolution works when transaction in chain is already resolved`() {
|
||||||
val (tx1, tx2) = makeTransactions()
|
val (tx1, tx2) = makeTransactions()
|
||||||
|
@ -15,13 +15,6 @@ import net.corda.node.services.api.WritableTransactionStorage
|
|||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : TransactionsResolver {
|
class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : TransactionsResolver {
|
||||||
companion object {
|
|
||||||
const val IN_MEMORY_RESOLUTION_LIMIT_PROP_NAME = "net.corda.node.dbtransactionsresolver.InMemoryResolutionLimit"
|
|
||||||
|
|
||||||
private val MAX_CHECKPOINT_RESOLUTION: Int = Integer.getInteger(IN_MEMORY_RESOLUTION_LIMIT_PROP_NAME, 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
private var txsInCheckpoint: MutableMap<SecureHash, SignedTransaction>? = HashMap()
|
|
||||||
private var sortedDependencies: List<SecureHash>? = null
|
private var sortedDependencies: List<SecureHash>? = null
|
||||||
private val logger = flow.logger
|
private val logger = flow.logger
|
||||||
|
|
||||||
@ -59,8 +52,7 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Request the standalone transaction data (which may refer to things we don't yet have).
|
// Request the standalone transaction data (which may refer to things we don't yet have).
|
||||||
val (existingTxIds, downloadedTxs) = fetchRequiredTransactions(nextRequests)
|
val (existingTxIds, downloadedTxs) = fetchRequiredTransactions(Collections.singleton(nextRequests.first())) // Fetch first item only
|
||||||
|
|
||||||
for (tx in downloadedTxs) {
|
for (tx in downloadedTxs) {
|
||||||
val dependencies = tx.dependencies
|
val dependencies = tx.dependencies
|
||||||
topologicalSort.add(tx.id, dependencies)
|
topologicalSort.add(tx.id, dependencies)
|
||||||
@ -70,24 +62,8 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
|
|||||||
for (downloaded in downloadedTxs) {
|
for (downloaded in downloadedTxs) {
|
||||||
suspended = false
|
suspended = false
|
||||||
val dependencies = downloaded.dependencies
|
val dependencies = downloaded.dependencies
|
||||||
val checkpointedTxs = this.txsInCheckpoint
|
// Do not keep in memory as this bloats the checkpoint. Write each item to the database.
|
||||||
if (checkpointedTxs != null) {
|
transactionStorage.addUnverifiedTransaction(downloaded)
|
||||||
if (checkpointedTxs.size < MAX_CHECKPOINT_RESOLUTION) {
|
|
||||||
checkpointedTxs[downloaded.id] = downloaded
|
|
||||||
} else {
|
|
||||||
logger.debug {
|
|
||||||
"Resolving transaction dependencies has reached a checkpoint limit of $MAX_CHECKPOINT_RESOLUTION " +
|
|
||||||
"transactions. Switching to the node database for storing the unverified transactions."
|
|
||||||
}
|
|
||||||
checkpointedTxs.values.forEach(transactionStorage::addUnverifiedTransaction)
|
|
||||||
// This acts as both a flag that we've switched over to storing the backchain into the db, and to remove what's been
|
|
||||||
// built up in the checkpoint
|
|
||||||
this.txsInCheckpoint = null
|
|
||||||
transactionStorage.addUnverifiedTransaction(downloaded)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
transactionStorage.addUnverifiedTransaction(downloaded)
|
|
||||||
}
|
|
||||||
|
|
||||||
// The write locks are only released over a suspend, so need to keep track of whether the flow has been suspended to ensure
|
// The write locks are only released over a suspend, so need to keep track of whether the flow has been suspended to ensure
|
||||||
// that locks are not held beyond each while loop iteration (as doing this would result in a deadlock due to claiming locks
|
// that locks are not held beyond each while loop iteration (as doing this would result in a deadlock due to claiming locks
|
||||||
@ -117,31 +93,18 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
|
|||||||
|
|
||||||
override fun recordDependencies(usedStatesToRecord: StatesToRecord) {
|
override fun recordDependencies(usedStatesToRecord: StatesToRecord) {
|
||||||
val sortedDependencies = checkNotNull(this.sortedDependencies)
|
val sortedDependencies = checkNotNull(this.sortedDependencies)
|
||||||
val txsInCheckpoint = this.txsInCheckpoint
|
|
||||||
logger.debug { "Recording ${sortedDependencies.size} dependencies for ${flow.txHashes.size} transactions" }
|
logger.debug { "Recording ${sortedDependencies.size} dependencies for ${flow.txHashes.size} transactions" }
|
||||||
val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage
|
val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage
|
||||||
if (txsInCheckpoint != null) {
|
for (txId in sortedDependencies) {
|
||||||
for (txId in sortedDependencies) {
|
// Retrieve and delete the transaction from the unverified store.
|
||||||
val tx = txsInCheckpoint.getValue(txId)
|
val (tx, isVerified) = checkNotNull(transactionStorage.getTransactionInternal(txId)) {
|
||||||
// For each transaction, verify it and insert it into the database. As we are iterating over them in a
|
"Somehow the unverified transaction ($txId) that we stored previously is no longer there."
|
||||||
// depth-first order, we should not encounter any verification failures due to missing data. If we fail
|
}
|
||||||
// half way through, it's no big deal, although it might result in us attempting to re-download data
|
if (!isVerified) {
|
||||||
// redundantly next time we attempt verification.
|
|
||||||
tx.verify(flow.serviceHub)
|
tx.verify(flow.serviceHub)
|
||||||
flow.serviceHub.recordTransactions(usedStatesToRecord, listOf(tx))
|
flow.serviceHub.recordTransactions(usedStatesToRecord, listOf(tx))
|
||||||
}
|
} else {
|
||||||
} else {
|
logger.debug { "No need to record $txId as it's already been verified" }
|
||||||
for (txId in sortedDependencies) {
|
|
||||||
// Retrieve and delete the transaction from the unverified store.
|
|
||||||
val (tx, isVerified) = checkNotNull(transactionStorage.getTransactionInternal(txId)) {
|
|
||||||
"Somehow the unverified transaction ($txId) that we stored previously is no longer there."
|
|
||||||
}
|
|
||||||
if (!isVerified) {
|
|
||||||
tx.verify(flow.serviceHub)
|
|
||||||
flow.serviceHub.recordTransactions(usedStatesToRecord, listOf(tx))
|
|
||||||
} else {
|
|
||||||
logger.debug { "No need to record $txId as it's already been verified" }
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -161,17 +124,21 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
|
|||||||
class TopologicalSort {
|
class TopologicalSort {
|
||||||
private val forwardGraph = HashMap<SecureHash, MutableSet<SecureHash>>()
|
private val forwardGraph = HashMap<SecureHash, MutableSet<SecureHash>>()
|
||||||
val transactionIds = LinkedHashSet<SecureHash>()
|
val transactionIds = LinkedHashSet<SecureHash>()
|
||||||
|
private val nonDupeHash = HashMap<SecureHash, SecureHash>()
|
||||||
|
private fun dedupe(sh: SecureHash): SecureHash = nonDupeHash.getOrPut(sh) { sh }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a transaction to the to-be-sorted set of transactions.
|
* Add a transaction to the to-be-sorted set of transactions.
|
||||||
* @param txId The ID of the transaction.
|
* @param txId The ID of the transaction.
|
||||||
* @param dependentIds the IDs of all the transactions [txId] depends on.
|
* @param dependentIds the IDs of all the transactions [txId] depends on.
|
||||||
*/
|
*/
|
||||||
fun add(txId: SecureHash, dependentIds: Set<SecureHash>) {
|
fun add(txIdp: SecureHash, dependentIds: Set<SecureHash>) {
|
||||||
|
val txId = dedupe(txIdp)
|
||||||
require(transactionIds.add(txId)) { "Transaction ID $txId already seen" }
|
require(transactionIds.add(txId)) { "Transaction ID $txId already seen" }
|
||||||
dependentIds.forEach {
|
dependentIds.forEach {
|
||||||
// Note that we use a LinkedHashSet here to make the traversal deterministic (as long as the input list is).
|
// Note that we use a LinkedHashSet here to make the traversal deterministic (as long as the input list is).
|
||||||
forwardGraph.computeIfAbsent(it) { LinkedHashSet() }.add(txId)
|
val deDupeIt = dedupe(it)
|
||||||
|
forwardGraph.computeIfAbsent(deDupeIt) { LinkedHashSet() }.add(txId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user