From 69d1b4e4e5d21289f5a107b903e1dbe7519a5f12 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Fri, 16 Aug 2019 12:34:25 +0100 Subject: [PATCH] CORDA-3138: Removed InMemoryTransactionsResolver as it's not needed and other resolution cleanup (#5370) --- .../internal/ResolveTransactionsFlowTest.kt | 4 +- .../node/services/DbTransactionsResolver.kt | 36 ++--- .../services/InMemoryTransactionsResolver.kt | 142 ------------------ .../node/services/api/ServiceHubInternal.kt | 67 +++++++-- .../net/corda/testing/node/MockServices.kt | 7 +- 5 files changed, 76 insertions(+), 180 deletions(-) delete mode 100644 node/src/main/kotlin/net/corda/node/services/InMemoryTransactionsResolver.kt diff --git a/core-tests/src/test/kotlin/net/corda/coretests/internal/ResolveTransactionsFlowTest.kt b/core-tests/src/test/kotlin/net/corda/coretests/internal/ResolveTransactionsFlowTest.kt index a0ef9c3f24..a21177b2f3 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/internal/ResolveTransactionsFlowTest.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/internal/ResolveTransactionsFlowTest.kt @@ -20,7 +20,7 @@ import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.sequence import net.corda.core.utilities.unwrap import net.corda.coretests.flows.TestNoSecurityDataVendingFlow -import net.corda.node.services.DbTransactionsResolver +import net.corda.node.services.DbTransactionsResolver.Companion.IN_MEMORY_RESOLUTION_LIMIT_PROP_NAME import net.corda.testing.contracts.DummyContract import net.corda.testing.contracts.DummyContractV2 import net.corda.testing.core.DUMMY_BANK_A_NAME @@ -207,7 +207,7 @@ class ResolveTransactionsFlowTest { @Test fun `Switches between checkpoint and DB based resolution correctly`() { - System.setProperty("${DbTransactionsResolver::class.java.name}.max-checkpoint-resolution", "20") + System.setProperty(IN_MEMORY_RESOLUTION_LIMIT_PROP_NAME, "20") var numTransactions = 0 megaCorpNode.services.validatedTransactions.updates.subscribe { numTransactions++ diff --git a/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt b/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt index d278cb1c5f..3814c6a687 100644 --- a/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt +++ b/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt @@ -14,14 +14,14 @@ import net.corda.core.utilities.seconds import net.corda.node.services.api.WritableTransactionStorage import java.util.* -private const val IN_MEMORY_RESOLUTION_LIMIT_PROP_NAME = "net.corda.node.dbtransactionsresolver.InMemoryResolutionLimit" - class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : TransactionsResolver { companion object { + const val IN_MEMORY_RESOLUTION_LIMIT_PROP_NAME = "net.corda.node.dbtransactionsresolver.InMemoryResolutionLimit" + private val MAX_CHECKPOINT_RESOLUTION: Int = Integer.getInteger(IN_MEMORY_RESOLUTION_LIMIT_PROP_NAME, 0) } - private var downloadedTxs: MutableMap? = HashMap() + private var txsInCheckpoint: MutableMap? = HashMap() private var sortedDependencies: List? = null private val logger = flow.logger @@ -59,26 +59,18 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa } // Request the standalone transaction data (which may refer to things we don't yet have). - val requestedTxs = fetchRequiredTransactions(nextRequests) + val (existingTxIds, downloadedTxs) = fetchRequiredTransactions(nextRequests) - // When acquiring the write locks for the transaction chain, it is important that all required locks are acquired in the same - // order when recording both verified and unverified transactions. In the verified case, the transactions must be recorded in - // back chain order (i.e. oldest first), so this must also happen for unverified transactions. This sort ensures that locks are - // acquired in the right order in the case the transactions should be stored in the database as unverified. The main topological - // sort is also updated here to ensure that this contains everything that needs locking in cases where the resolver switches - // from checkpointing to storing unverified transactions in the database. - val lockingSort = TopologicalSort() - for (tx in requestedTxs.second) { + for (tx in downloadedTxs) { val dependencies = tx.dependencies - lockingSort.add(tx.id, dependencies) topologicalSort.add(tx.id, dependencies) } var suspended = true - for (downloaded in requestedTxs.second) { + for (downloaded in downloadedTxs) { suspended = false val dependencies = downloaded.dependencies - val downloadedTxs = this.downloadedTxs + val downloadedTxs = this.txsInCheckpoint if (downloadedTxs != null) { if (downloadedTxs.size < MAX_CHECKPOINT_RESOLUTION) { downloadedTxs[downloaded.id] = downloaded @@ -90,7 +82,7 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa downloadedTxs.values.forEach(transactionStorage::addUnverifiedTransaction) // This acts as both a flag that we've switched over to storing the backchain into the db, and to remove what's been // built up in the checkpoint - this.downloadedTxs = null + this.txsInCheckpoint = null transactionStorage.addUnverifiedTransaction(downloaded) } } else { @@ -115,21 +107,21 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa // It's possible that the node has a transaction in storage already. Dependencies should also be present for this transaction, // so just remove these IDs from the set of next requests. - nextRequests.removeAll(requestedTxs.first) + nextRequests.removeAll(existingTxIds) } sortedDependencies = topologicalSort.complete() - logger.debug { "Downloaded ${sortedDependencies?.size ?: 0} dependencies from remote peer for transactions ${flow.txHashes}" } + logger.debug { "Downloaded ${sortedDependencies?.size} dependencies from remote peer for transactions ${flow.txHashes}" } } override fun recordDependencies(usedStatesToRecord: StatesToRecord) { - logger.debug { "Recording ${this.sortedDependencies?.size ?: 0} dependencies for ${flow.txHashes.size} transactions" } - val downloadedTxs = this.downloadedTxs val sortedDependencies = checkNotNull(this.sortedDependencies) + val txsInCheckpoint = this.txsInCheckpoint + logger.debug { "Recording ${sortedDependencies.size} dependencies for ${flow.txHashes.size} transactions" } val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage - if (downloadedTxs != null) { + if (txsInCheckpoint != null) { for (txId in sortedDependencies) { - val tx = downloadedTxs.getValue(txId) + val tx = txsInCheckpoint.getValue(txId) // For each transaction, verify it and insert it into the database. As we are iterating over them in a // depth-first order, we should not encounter any verification failures due to missing data. If we fail // half way through, it's no big deal, although it might result in us attempting to re-download data diff --git a/node/src/main/kotlin/net/corda/node/services/InMemoryTransactionsResolver.kt b/node/src/main/kotlin/net/corda/node/services/InMemoryTransactionsResolver.kt deleted file mode 100644 index 7e2ff16e71..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/InMemoryTransactionsResolver.kt +++ /dev/null @@ -1,142 +0,0 @@ -package net.corda.node.services - -import co.paralleluniverse.fibers.Suspendable -import net.corda.core.crypto.SecureHash -import net.corda.core.flows.FlowException -import net.corda.core.internal.FetchTransactionsFlow -import net.corda.core.internal.ResolveTransactionsFlow -import net.corda.core.internal.TransactionsResolver -import net.corda.core.internal.dependencies -import net.corda.core.node.StatesToRecord -import net.corda.core.transactions.SignedTransaction -import net.corda.core.utilities.debug -import java.util.* - -class InMemoryTransactionsResolver(private val flow: ResolveTransactionsFlow) : TransactionsResolver { - companion object { - /** The maximum number of transactions this flow will try to download before bailing out. */ - var transactionCountLimit = 5000 - set(value) { - require(value > 0) { "$value is not a valid count limit" } - field = value - } - } - - private var sortedDependencies: List? = null - private val logger = flow.logger - - @Suspendable - override fun downloadDependencies() { - // Maintain a work queue of all hashes to load/download, initialised with our starting set. Then do a breadth - // first traversal across the dependency graph. - // - // TODO: This approach has two problems. Analyze and resolve them: - // - // (1) This flow leaks private data. If you download a transaction and then do NOT request a - // dependency, it means you already have it, which in turn means you must have been involved with it before - // somehow, either in the tx itself or in any following spend of it. If there were no following spends, then - // your peer knows for sure that you were involved ... this is bad! The only obvious ways to fix this are - // something like onion routing of requests, secure hardware, or both. - // - // (2) If the identity service changes the assumed identity of one of the public keys, it's possible - // that the "tx in db is valid" invariant is violated if one of the contracts checks the identity! Should - // the db contain the identities that were resolved when the transaction was first checked, or should we - // accept this kind of change is possible? Most likely solution is for identity data to be an attachment. - - logger.debug { "Downloading dependencies for transactions ${flow.txHashes}" } - val nextRequests = LinkedHashSet(flow.txHashes) // Keep things unique but ordered, for unit test stability. - val topologicalSort = TopologicalSort() - val seenIds = HashSet() - - while (nextRequests.isNotEmpty()) { - // Don't re-download the same tx when we haven't verified it yet but it's referenced multiple times in the - // graph we're traversing. - nextRequests.removeAll(seenIds) - if (nextRequests.isEmpty()) { - // Done early. - break - } - - // Request the standalone transaction data (which may refer to things we don't yet have). - val requestedTxs = flow.subFlow(FetchTransactionsFlow(nextRequests, flow.otherSide)) - val freshDownloads = requestedTxs.downloaded - val existingTxs = requestedTxs.fromDisk - - for (downloaded in freshDownloads) { - require(seenIds.add(downloaded.id)) { "Transaction ID ${downloaded.id} already seen" } - if (seenIds.size > transactionCountLimit) { - throw ExcessivelyLargeTransactionGraph() - } - - val dependencies = downloaded.dependencies - topologicalSort.add(downloaded, dependencies) - - flow.fetchMissingAttachments(downloaded) - flow.fetchMissingNetworkParameters(downloaded) - - // Add all input states and reference input states to the work queue. - nextRequests.addAll(dependencies) - } - - // It's possible that the node has a transaction in storage already. Dependencies should also be present for this transaction, - // so just remove these IDs from the set of next requests. - nextRequests.removeAll(existingTxs.map { it.id }) - } - - sortedDependencies = topologicalSort.complete() - logger.debug { "Downloaded ${sortedDependencies?.size ?: 0} dependencies from remote peer for transactions ${flow.txHashes}" } - } - - override fun recordDependencies(usedStatesToRecord: StatesToRecord) { - logger.debug { "Recording ${this.sortedDependencies?.size ?: 0} dependencies for ${flow.txHashes.size} transactions" } - for (tx in checkNotNull(sortedDependencies)) { - // For each transaction, verify it and insert it into the database. As we are iterating over them in a - // depth-first order, we should not encounter any verification failures due to missing data. If we fail - // half way through, it's no big deal, although it might result in us attempting to re-download data - // redundantly next time we attempt verification. - tx.verify(flow.serviceHub) - flow.serviceHub.recordTransactions(usedStatesToRecord, listOf(tx)) - } - } - - class ExcessivelyLargeTransactionGraph : FlowException() - - /** - * Provides a way to topologically sort SignedTransactions. This means that given any two transactions T1 and T2 in the - * list returned by [complete] if T1 is a dependency of T2 then T1 will occur earlier than T2. - */ - class TopologicalSort { - private val forwardGraph = HashMap>() - private val transactions = ArrayList() - - /** - * Add a transaction to the to-be-sorted set of transactions. - */ - fun add(stx: SignedTransaction, dependencies: Set) { - dependencies.forEach { - // Note that we use a LinkedHashSet here to make the traversal deterministic (as long as the input list is). - forwardGraph.computeIfAbsent(it) { LinkedHashSet() }.add(stx) - } - transactions += stx - } - - /** - * Return the sorted list of signed transactions. - */ - fun complete(): List { - val visited = HashSet(transactions.size) - val result = ArrayList(transactions.size) - - fun visit(transaction: SignedTransaction) { - if (visited.add(transaction.id)) { - forwardGraph[transaction.id]?.forEach(::visit) - result += transaction - } - } - - transactions.forEach(::visit) - - return result.apply(Collections::reverse) - } - } -} diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index 1a383b2a40..984d0b216f 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -19,7 +19,6 @@ import net.corda.core.utilities.contextLogger import net.corda.node.internal.InitiatedFlowFactory import net.corda.node.internal.cordapp.CordappProviderInternal import net.corda.node.services.DbTransactionsResolver -import net.corda.node.services.InMemoryTransactionsResolver import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.messaging.MessagingService import net.corda.node.services.network.NetworkMapUpdater @@ -28,6 +27,7 @@ import net.corda.node.services.statemachine.ExternalEvent import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.nodeapi.internal.persistence.CordaPersistence import java.security.PublicKey +import java.util.* interface NetworkMapCacheInternal : NetworkMapCache, NetworkMapCacheBase { override val nodeReady: OpenFuture @@ -53,25 +53,24 @@ interface ServiceHubInternal : ServiceHubCoreInternal { companion object { private val log = contextLogger() - private fun topologicalSort(transactions: Iterable): List { - if ((transactions as? List)?.size == 1) return transactions - val sort = InMemoryTransactionsResolver.TopologicalSort() + private fun topologicalSort(transactions: Collection): Collection { + if (transactions.size == 1) return transactions + val sort = TopologicalSort() for (tx in transactions) { sort.add(tx, tx.dependencies) } return sort.complete() } - // TODO Why is txs an Iterable and not a Collection?? fun recordTransactions(statesToRecord: StatesToRecord, - txs: Iterable, + txs: Collection, validatedTransactions: WritableTransactionStorage, stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage, vaultService: VaultServiceInternal, database: CordaPersistence) { database.transaction { - require(txs.any()) { "No transactions passed in for recording" } + require(txs.isNotEmpty()) { "No transactions passed in for recording" } val orderedTxs = topologicalSort(txs) // Divide transactions into those seen before and those that are new to this node if ALL_VISIBLE states are being recorded. @@ -79,9 +78,9 @@ interface ServiceHubInternal : ServiceHubCoreInternal { // for transactions being recorded at ONLY_RELEVANT, if this transaction has been seen before its outputs should already // have been recorded at ONLY_RELEVANT, so there shouldn't be anything to re-record here. val (recordedTransactions, previouslySeenTxs) = if (statesToRecord != StatesToRecord.ALL_VISIBLE) { - Pair(orderedTxs.filter { validatedTransactions.addTransaction(it) }, emptyList()) + orderedTxs.filter(validatedTransactions::addTransaction) to emptyList() } else { - orderedTxs.partition { validatedTransactions.addTransaction(it) } + orderedTxs.partition(validatedTransactions::addTransaction) } val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id if (stateMachineRunId != null) { @@ -156,11 +155,55 @@ interface ServiceHubInternal : ServiceHubCoreInternal { val cacheFactory: NamedCacheFactory override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable) { - recordTransactions(statesToRecord, txs, validatedTransactions, stateMachineRecordedTransactionMapping, vaultService, database) + recordTransactions( + statesToRecord, + txs as? Collection ?: txs.toList(), // We can't change txs to a Collection as it's now part of the public API + validatedTransactions, + stateMachineRecordedTransactionMapping, + vaultService, + database + ) } - override fun createTransactionsResolver(flow: ResolveTransactionsFlow): TransactionsResolver { - return DbTransactionsResolver(flow) + override fun createTransactionsResolver(flow: ResolveTransactionsFlow): TransactionsResolver = DbTransactionsResolver(flow) + + /** + * Provides a way to topologically sort SignedTransactions. This means that given any two transactions T1 and T2 in the + * list returned by [complete] if T1 is a dependency of T2 then T1 will occur earlier than T2. + */ + private class TopologicalSort { + private val forwardGraph = HashMap>() + private val transactions = ArrayList() + + /** + * Add a transaction to the to-be-sorted set of transactions. + */ + fun add(stx: SignedTransaction, dependencies: Set) { + dependencies.forEach { + // Note that we use a LinkedHashSet here to make the traversal deterministic (as long as the input list is). + forwardGraph.computeIfAbsent(it) { LinkedHashSet() }.add(stx) + } + transactions += stx + } + + /** + * Return the sorted list of signed transactions. + */ + fun complete(): List { + val visited = HashSet(transactions.size) + val result = ArrayList(transactions.size) + + fun visit(transaction: SignedTransaction) { + if (visited.add(transaction.id)) { + forwardGraph[transaction.id]?.forEach(::visit) + result += transaction + } + } + + transactions.forEach(::visit) + + return result.apply(Collections::reverse) + } } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt index 0b6c28e37d..87f7973646 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -196,11 +196,14 @@ open class MockServices private constructor( override var networkParametersService: NetworkParametersService = MockNetworkParametersStorage(networkParameters) override val vaultService: VaultService = makeVaultService(schemaService, persistence, cordappLoader) override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable) { - ServiceHubInternal.recordTransactions(statesToRecord, txs, + ServiceHubInternal.recordTransactions( + statesToRecord, + txs as? Collection ?: txs.toList(), validatedTransactions as WritableTransactionStorage, mockStateMachineRecordedTransactionMappingStorage, vaultService as VaultServiceInternal, - persistence) + persistence + ) } override fun jdbcSession(): Connection = persistence.createSession()