diff --git a/core-tests/src/test/kotlin/net/corda/coretests/internal/ResolveTransactionsFlowTest.kt b/core-tests/src/test/kotlin/net/corda/coretests/internal/ResolveTransactionsFlowTest.kt index 130a29dfe4..8e31469636 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/internal/ResolveTransactionsFlowTest.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/internal/ResolveTransactionsFlowTest.kt @@ -20,7 +20,6 @@ import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.sequence import net.corda.core.utilities.unwrap import net.corda.coretests.flows.TestNoSecurityDataVendingFlow -import net.corda.node.services.DbTransactionsResolver.Companion.IN_MEMORY_RESOLUTION_LIMIT_PROP_NAME import net.corda.testing.contracts.DummyContract import net.corda.testing.contracts.DummyContractV2 import net.corda.testing.core.DUMMY_BANK_A_NAME @@ -205,27 +204,6 @@ class ResolveTransactionsFlowTest { assertFailsWith { future.getOrThrow() } } - @Test - fun `Switches between checkpoint and DB based resolution correctly`() { - System.setProperty(IN_MEMORY_RESOLUTION_LIMIT_PROP_NAME, "20") - var numTransactions = 0 - megaCorpNode.services.validatedTransactions.updates.subscribe { - numTransactions++ - } - val txToResolve = makeLargeTransactionChain(50) - var numUpdates = 0 - miniCorpNode.services.validatedTransactions.updates.subscribe { - numUpdates++ - } - val p = TestFlow(txToResolve, megaCorp) - val future = miniCorpNode.startFlow(p) - mockNet.runNetwork() - future.getOrThrow() - // ResolveTransactionsFlow only stores transaction dependencies and not the requested transaction, so there will be one fewer - // transaction stored on the receiving node than on the sending one. - assertEquals(numTransactions - 1, numUpdates) - } - @Test fun `resolution works when transaction in chain is already resolved`() { val (tx1, tx2) = makeTransactions() diff --git a/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt b/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt index b3e1a320a7..afd668f649 100644 --- a/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt +++ b/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt @@ -15,13 +15,6 @@ import net.corda.node.services.api.WritableTransactionStorage import java.util.* class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : TransactionsResolver { - companion object { - const val IN_MEMORY_RESOLUTION_LIMIT_PROP_NAME = "net.corda.node.dbtransactionsresolver.InMemoryResolutionLimit" - - private val MAX_CHECKPOINT_RESOLUTION: Int = Integer.getInteger(IN_MEMORY_RESOLUTION_LIMIT_PROP_NAME, 0) - } - - private var txsInCheckpoint: MutableMap? = HashMap() private var sortedDependencies: List? = null private val logger = flow.logger @@ -59,8 +52,7 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa } // Request the standalone transaction data (which may refer to things we don't yet have). - val (existingTxIds, downloadedTxs) = fetchRequiredTransactions(nextRequests) - + val (existingTxIds, downloadedTxs) = fetchRequiredTransactions(Collections.singleton(nextRequests.first())) // Fetch first item only for (tx in downloadedTxs) { val dependencies = tx.dependencies topologicalSort.add(tx.id, dependencies) @@ -70,24 +62,8 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa for (downloaded in downloadedTxs) { suspended = false val dependencies = downloaded.dependencies - val checkpointedTxs = this.txsInCheckpoint - if (checkpointedTxs != null) { - if (checkpointedTxs.size < MAX_CHECKPOINT_RESOLUTION) { - checkpointedTxs[downloaded.id] = downloaded - } else { - logger.debug { - "Resolving transaction dependencies has reached a checkpoint limit of $MAX_CHECKPOINT_RESOLUTION " + - "transactions. Switching to the node database for storing the unverified transactions." - } - checkpointedTxs.values.forEach(transactionStorage::addUnverifiedTransaction) - // This acts as both a flag that we've switched over to storing the backchain into the db, and to remove what's been - // built up in the checkpoint - this.txsInCheckpoint = null - transactionStorage.addUnverifiedTransaction(downloaded) - } - } else { - transactionStorage.addUnverifiedTransaction(downloaded) - } + // Do not keep in memory as this bloats the checkpoint. Write each item to the database. + transactionStorage.addUnverifiedTransaction(downloaded) // The write locks are only released over a suspend, so need to keep track of whether the flow has been suspended to ensure // that locks are not held beyond each while loop iteration (as doing this would result in a deadlock due to claiming locks @@ -117,31 +93,18 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa override fun recordDependencies(usedStatesToRecord: StatesToRecord) { val sortedDependencies = checkNotNull(this.sortedDependencies) - val txsInCheckpoint = this.txsInCheckpoint logger.debug { "Recording ${sortedDependencies.size} dependencies for ${flow.txHashes.size} transactions" } val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage - if (txsInCheckpoint != null) { - for (txId in sortedDependencies) { - val tx = txsInCheckpoint.getValue(txId) - // For each transaction, verify it and insert it into the database. As we are iterating over them in a - // depth-first order, we should not encounter any verification failures due to missing data. If we fail - // half way through, it's no big deal, although it might result in us attempting to re-download data - // redundantly next time we attempt verification. + for (txId in sortedDependencies) { + // Retrieve and delete the transaction from the unverified store. + val (tx, isVerified) = checkNotNull(transactionStorage.getTransactionInternal(txId)) { + "Somehow the unverified transaction ($txId) that we stored previously is no longer there." + } + if (!isVerified) { tx.verify(flow.serviceHub) flow.serviceHub.recordTransactions(usedStatesToRecord, listOf(tx)) - } - } else { - for (txId in sortedDependencies) { - // Retrieve and delete the transaction from the unverified store. - val (tx, isVerified) = checkNotNull(transactionStorage.getTransactionInternal(txId)) { - "Somehow the unverified transaction ($txId) that we stored previously is no longer there." - } - if (!isVerified) { - tx.verify(flow.serviceHub) - flow.serviceHub.recordTransactions(usedStatesToRecord, listOf(tx)) - } else { - logger.debug { "No need to record $txId as it's already been verified" } - } + } else { + logger.debug { "No need to record $txId as it's already been verified" } } } } @@ -161,17 +124,21 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa class TopologicalSort { private val forwardGraph = HashMap>() val transactionIds = LinkedHashSet() + private val nonDupeHash = HashMap() + private fun dedupe(sh: SecureHash): SecureHash = nonDupeHash.getOrPut(sh) { sh } /** * Add a transaction to the to-be-sorted set of transactions. * @param txId The ID of the transaction. * @param dependentIds the IDs of all the transactions [txId] depends on. */ - fun add(txId: SecureHash, dependentIds: Set) { + fun add(txIdp: SecureHash, dependentIds: Set) { + val txId = dedupe(txIdp) require(transactionIds.add(txId)) { "Transaction ID $txId already seen" } dependentIds.forEach { // Note that we use a LinkedHashSet here to make the traversal deterministic (as long as the input list is). - forwardGraph.computeIfAbsent(it) { LinkedHashSet() }.add(txId) + val deDupeIt = dedupe(it) + forwardGraph.computeIfAbsent(deDupeIt) { LinkedHashSet() }.add(txId) } }