mirror of
https://github.com/corda/corda.git
synced 2024-12-18 20:47:57 +00:00
CORDA-3138: Removed InMemoryTransactionsResolver as it's not needed and other resolution cleanup (#5370)
This commit is contained in:
parent
ee1aa9fea1
commit
69d1b4e4e5
@ -20,7 +20,7 @@ import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.sequence
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.coretests.flows.TestNoSecurityDataVendingFlow
|
||||
import net.corda.node.services.DbTransactionsResolver
|
||||
import net.corda.node.services.DbTransactionsResolver.Companion.IN_MEMORY_RESOLUTION_LIMIT_PROP_NAME
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.contracts.DummyContractV2
|
||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||
@ -207,7 +207,7 @@ class ResolveTransactionsFlowTest {
|
||||
|
||||
@Test
|
||||
fun `Switches between checkpoint and DB based resolution correctly`() {
|
||||
System.setProperty("${DbTransactionsResolver::class.java.name}.max-checkpoint-resolution", "20")
|
||||
System.setProperty(IN_MEMORY_RESOLUTION_LIMIT_PROP_NAME, "20")
|
||||
var numTransactions = 0
|
||||
megaCorpNode.services.validatedTransactions.updates.subscribe {
|
||||
numTransactions++
|
||||
|
@ -14,14 +14,14 @@ import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.api.WritableTransactionStorage
|
||||
import java.util.*
|
||||
|
||||
private const val IN_MEMORY_RESOLUTION_LIMIT_PROP_NAME = "net.corda.node.dbtransactionsresolver.InMemoryResolutionLimit"
|
||||
|
||||
class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : TransactionsResolver {
|
||||
companion object {
|
||||
const val IN_MEMORY_RESOLUTION_LIMIT_PROP_NAME = "net.corda.node.dbtransactionsresolver.InMemoryResolutionLimit"
|
||||
|
||||
private val MAX_CHECKPOINT_RESOLUTION: Int = Integer.getInteger(IN_MEMORY_RESOLUTION_LIMIT_PROP_NAME, 0)
|
||||
}
|
||||
|
||||
private var downloadedTxs: MutableMap<SecureHash, SignedTransaction>? = HashMap()
|
||||
private var txsInCheckpoint: MutableMap<SecureHash, SignedTransaction>? = HashMap()
|
||||
private var sortedDependencies: List<SecureHash>? = null
|
||||
private val logger = flow.logger
|
||||
|
||||
@ -59,26 +59,18 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
|
||||
}
|
||||
|
||||
// Request the standalone transaction data (which may refer to things we don't yet have).
|
||||
val requestedTxs = fetchRequiredTransactions(nextRequests)
|
||||
val (existingTxIds, downloadedTxs) = fetchRequiredTransactions(nextRequests)
|
||||
|
||||
// When acquiring the write locks for the transaction chain, it is important that all required locks are acquired in the same
|
||||
// order when recording both verified and unverified transactions. In the verified case, the transactions must be recorded in
|
||||
// back chain order (i.e. oldest first), so this must also happen for unverified transactions. This sort ensures that locks are
|
||||
// acquired in the right order in the case the transactions should be stored in the database as unverified. The main topological
|
||||
// sort is also updated here to ensure that this contains everything that needs locking in cases where the resolver switches
|
||||
// from checkpointing to storing unverified transactions in the database.
|
||||
val lockingSort = TopologicalSort()
|
||||
for (tx in requestedTxs.second) {
|
||||
for (tx in downloadedTxs) {
|
||||
val dependencies = tx.dependencies
|
||||
lockingSort.add(tx.id, dependencies)
|
||||
topologicalSort.add(tx.id, dependencies)
|
||||
}
|
||||
|
||||
var suspended = true
|
||||
for (downloaded in requestedTxs.second) {
|
||||
for (downloaded in downloadedTxs) {
|
||||
suspended = false
|
||||
val dependencies = downloaded.dependencies
|
||||
val downloadedTxs = this.downloadedTxs
|
||||
val downloadedTxs = this.txsInCheckpoint
|
||||
if (downloadedTxs != null) {
|
||||
if (downloadedTxs.size < MAX_CHECKPOINT_RESOLUTION) {
|
||||
downloadedTxs[downloaded.id] = downloaded
|
||||
@ -90,7 +82,7 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
|
||||
downloadedTxs.values.forEach(transactionStorage::addUnverifiedTransaction)
|
||||
// This acts as both a flag that we've switched over to storing the backchain into the db, and to remove what's been
|
||||
// built up in the checkpoint
|
||||
this.downloadedTxs = null
|
||||
this.txsInCheckpoint = null
|
||||
transactionStorage.addUnverifiedTransaction(downloaded)
|
||||
}
|
||||
} else {
|
||||
@ -115,21 +107,21 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
|
||||
|
||||
// It's possible that the node has a transaction in storage already. Dependencies should also be present for this transaction,
|
||||
// so just remove these IDs from the set of next requests.
|
||||
nextRequests.removeAll(requestedTxs.first)
|
||||
nextRequests.removeAll(existingTxIds)
|
||||
}
|
||||
|
||||
sortedDependencies = topologicalSort.complete()
|
||||
logger.debug { "Downloaded ${sortedDependencies?.size ?: 0} dependencies from remote peer for transactions ${flow.txHashes}" }
|
||||
logger.debug { "Downloaded ${sortedDependencies?.size} dependencies from remote peer for transactions ${flow.txHashes}" }
|
||||
}
|
||||
|
||||
override fun recordDependencies(usedStatesToRecord: StatesToRecord) {
|
||||
logger.debug { "Recording ${this.sortedDependencies?.size ?: 0} dependencies for ${flow.txHashes.size} transactions" }
|
||||
val downloadedTxs = this.downloadedTxs
|
||||
val sortedDependencies = checkNotNull(this.sortedDependencies)
|
||||
val txsInCheckpoint = this.txsInCheckpoint
|
||||
logger.debug { "Recording ${sortedDependencies.size} dependencies for ${flow.txHashes.size} transactions" }
|
||||
val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage
|
||||
if (downloadedTxs != null) {
|
||||
if (txsInCheckpoint != null) {
|
||||
for (txId in sortedDependencies) {
|
||||
val tx = downloadedTxs.getValue(txId)
|
||||
val tx = txsInCheckpoint.getValue(txId)
|
||||
// For each transaction, verify it and insert it into the database. As we are iterating over them in a
|
||||
// depth-first order, we should not encounter any verification failures due to missing data. If we fail
|
||||
// half way through, it's no big deal, although it might result in us attempting to re-download data
|
||||
|
@ -1,142 +0,0 @@
|
||||
package net.corda.node.services
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.internal.FetchTransactionsFlow
|
||||
import net.corda.core.internal.ResolveTransactionsFlow
|
||||
import net.corda.core.internal.TransactionsResolver
|
||||
import net.corda.core.internal.dependencies
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.debug
|
||||
import java.util.*
|
||||
|
||||
class InMemoryTransactionsResolver(private val flow: ResolveTransactionsFlow) : TransactionsResolver {
|
||||
companion object {
|
||||
/** The maximum number of transactions this flow will try to download before bailing out. */
|
||||
var transactionCountLimit = 5000
|
||||
set(value) {
|
||||
require(value > 0) { "$value is not a valid count limit" }
|
||||
field = value
|
||||
}
|
||||
}
|
||||
|
||||
private var sortedDependencies: List<SignedTransaction>? = null
|
||||
private val logger = flow.logger
|
||||
|
||||
@Suspendable
|
||||
override fun downloadDependencies() {
|
||||
// Maintain a work queue of all hashes to load/download, initialised with our starting set. Then do a breadth
|
||||
// first traversal across the dependency graph.
|
||||
//
|
||||
// TODO: This approach has two problems. Analyze and resolve them:
|
||||
//
|
||||
// (1) This flow leaks private data. If you download a transaction and then do NOT request a
|
||||
// dependency, it means you already have it, which in turn means you must have been involved with it before
|
||||
// somehow, either in the tx itself or in any following spend of it. If there were no following spends, then
|
||||
// your peer knows for sure that you were involved ... this is bad! The only obvious ways to fix this are
|
||||
// something like onion routing of requests, secure hardware, or both.
|
||||
//
|
||||
// (2) If the identity service changes the assumed identity of one of the public keys, it's possible
|
||||
// that the "tx in db is valid" invariant is violated if one of the contracts checks the identity! Should
|
||||
// the db contain the identities that were resolved when the transaction was first checked, or should we
|
||||
// accept this kind of change is possible? Most likely solution is for identity data to be an attachment.
|
||||
|
||||
logger.debug { "Downloading dependencies for transactions ${flow.txHashes}" }
|
||||
val nextRequests = LinkedHashSet<SecureHash>(flow.txHashes) // Keep things unique but ordered, for unit test stability.
|
||||
val topologicalSort = TopologicalSort()
|
||||
val seenIds = HashSet<SecureHash>()
|
||||
|
||||
while (nextRequests.isNotEmpty()) {
|
||||
// Don't re-download the same tx when we haven't verified it yet but it's referenced multiple times in the
|
||||
// graph we're traversing.
|
||||
nextRequests.removeAll(seenIds)
|
||||
if (nextRequests.isEmpty()) {
|
||||
// Done early.
|
||||
break
|
||||
}
|
||||
|
||||
// Request the standalone transaction data (which may refer to things we don't yet have).
|
||||
val requestedTxs = flow.subFlow(FetchTransactionsFlow(nextRequests, flow.otherSide))
|
||||
val freshDownloads = requestedTxs.downloaded
|
||||
val existingTxs = requestedTxs.fromDisk
|
||||
|
||||
for (downloaded in freshDownloads) {
|
||||
require(seenIds.add(downloaded.id)) { "Transaction ID ${downloaded.id} already seen" }
|
||||
if (seenIds.size > transactionCountLimit) {
|
||||
throw ExcessivelyLargeTransactionGraph()
|
||||
}
|
||||
|
||||
val dependencies = downloaded.dependencies
|
||||
topologicalSort.add(downloaded, dependencies)
|
||||
|
||||
flow.fetchMissingAttachments(downloaded)
|
||||
flow.fetchMissingNetworkParameters(downloaded)
|
||||
|
||||
// Add all input states and reference input states to the work queue.
|
||||
nextRequests.addAll(dependencies)
|
||||
}
|
||||
|
||||
// It's possible that the node has a transaction in storage already. Dependencies should also be present for this transaction,
|
||||
// so just remove these IDs from the set of next requests.
|
||||
nextRequests.removeAll(existingTxs.map { it.id })
|
||||
}
|
||||
|
||||
sortedDependencies = topologicalSort.complete()
|
||||
logger.debug { "Downloaded ${sortedDependencies?.size ?: 0} dependencies from remote peer for transactions ${flow.txHashes}" }
|
||||
}
|
||||
|
||||
override fun recordDependencies(usedStatesToRecord: StatesToRecord) {
|
||||
logger.debug { "Recording ${this.sortedDependencies?.size ?: 0} dependencies for ${flow.txHashes.size} transactions" }
|
||||
for (tx in checkNotNull(sortedDependencies)) {
|
||||
// For each transaction, verify it and insert it into the database. As we are iterating over them in a
|
||||
// depth-first order, we should not encounter any verification failures due to missing data. If we fail
|
||||
// half way through, it's no big deal, although it might result in us attempting to re-download data
|
||||
// redundantly next time we attempt verification.
|
||||
tx.verify(flow.serviceHub)
|
||||
flow.serviceHub.recordTransactions(usedStatesToRecord, listOf(tx))
|
||||
}
|
||||
}
|
||||
|
||||
class ExcessivelyLargeTransactionGraph : FlowException()
|
||||
|
||||
/**
|
||||
* Provides a way to topologically sort SignedTransactions. This means that given any two transactions T1 and T2 in the
|
||||
* list returned by [complete] if T1 is a dependency of T2 then T1 will occur earlier than T2.
|
||||
*/
|
||||
class TopologicalSort {
|
||||
private val forwardGraph = HashMap<SecureHash, MutableSet<SignedTransaction>>()
|
||||
private val transactions = ArrayList<SignedTransaction>()
|
||||
|
||||
/**
|
||||
* Add a transaction to the to-be-sorted set of transactions.
|
||||
*/
|
||||
fun add(stx: SignedTransaction, dependencies: Set<SecureHash>) {
|
||||
dependencies.forEach {
|
||||
// Note that we use a LinkedHashSet here to make the traversal deterministic (as long as the input list is).
|
||||
forwardGraph.computeIfAbsent(it) { LinkedHashSet() }.add(stx)
|
||||
}
|
||||
transactions += stx
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the sorted list of signed transactions.
|
||||
*/
|
||||
fun complete(): List<SignedTransaction> {
|
||||
val visited = HashSet<SecureHash>(transactions.size)
|
||||
val result = ArrayList<SignedTransaction>(transactions.size)
|
||||
|
||||
fun visit(transaction: SignedTransaction) {
|
||||
if (visited.add(transaction.id)) {
|
||||
forwardGraph[transaction.id]?.forEach(::visit)
|
||||
result += transaction
|
||||
}
|
||||
}
|
||||
|
||||
transactions.forEach(::visit)
|
||||
|
||||
return result.apply(Collections::reverse)
|
||||
}
|
||||
}
|
||||
}
|
@ -19,7 +19,6 @@ import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.internal.InitiatedFlowFactory
|
||||
import net.corda.node.internal.cordapp.CordappProviderInternal
|
||||
import net.corda.node.services.DbTransactionsResolver
|
||||
import net.corda.node.services.InMemoryTransactionsResolver
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.network.NetworkMapUpdater
|
||||
@ -28,6 +27,7 @@ import net.corda.node.services.statemachine.ExternalEvent
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import java.security.PublicKey
|
||||
import java.util.*
|
||||
|
||||
interface NetworkMapCacheInternal : NetworkMapCache, NetworkMapCacheBase {
|
||||
override val nodeReady: OpenFuture<Void?>
|
||||
@ -53,25 +53,24 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
|
||||
private fun topologicalSort(transactions: Iterable<SignedTransaction>): List<SignedTransaction> {
|
||||
if ((transactions as? List)?.size == 1) return transactions
|
||||
val sort = InMemoryTransactionsResolver.TopologicalSort()
|
||||
private fun topologicalSort(transactions: Collection<SignedTransaction>): Collection<SignedTransaction> {
|
||||
if (transactions.size == 1) return transactions
|
||||
val sort = TopologicalSort()
|
||||
for (tx in transactions) {
|
||||
sort.add(tx, tx.dependencies)
|
||||
}
|
||||
return sort.complete()
|
||||
}
|
||||
|
||||
// TODO Why is txs an Iterable and not a Collection??
|
||||
fun recordTransactions(statesToRecord: StatesToRecord,
|
||||
txs: Iterable<SignedTransaction>,
|
||||
txs: Collection<SignedTransaction>,
|
||||
validatedTransactions: WritableTransactionStorage,
|
||||
stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage,
|
||||
vaultService: VaultServiceInternal,
|
||||
database: CordaPersistence) {
|
||||
|
||||
database.transaction {
|
||||
require(txs.any()) { "No transactions passed in for recording" }
|
||||
require(txs.isNotEmpty()) { "No transactions passed in for recording" }
|
||||
|
||||
val orderedTxs = topologicalSort(txs)
|
||||
// Divide transactions into those seen before and those that are new to this node if ALL_VISIBLE states are being recorded.
|
||||
@ -79,9 +78,9 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
|
||||
// for transactions being recorded at ONLY_RELEVANT, if this transaction has been seen before its outputs should already
|
||||
// have been recorded at ONLY_RELEVANT, so there shouldn't be anything to re-record here.
|
||||
val (recordedTransactions, previouslySeenTxs) = if (statesToRecord != StatesToRecord.ALL_VISIBLE) {
|
||||
Pair(orderedTxs.filter { validatedTransactions.addTransaction(it) }, emptyList())
|
||||
orderedTxs.filter(validatedTransactions::addTransaction) to emptyList()
|
||||
} else {
|
||||
orderedTxs.partition { validatedTransactions.addTransaction(it) }
|
||||
orderedTxs.partition(validatedTransactions::addTransaction)
|
||||
}
|
||||
val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id
|
||||
if (stateMachineRunId != null) {
|
||||
@ -156,11 +155,55 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
|
||||
val cacheFactory: NamedCacheFactory
|
||||
|
||||
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
|
||||
recordTransactions(statesToRecord, txs, validatedTransactions, stateMachineRecordedTransactionMapping, vaultService, database)
|
||||
recordTransactions(
|
||||
statesToRecord,
|
||||
txs as? Collection ?: txs.toList(), // We can't change txs to a Collection as it's now part of the public API
|
||||
validatedTransactions,
|
||||
stateMachineRecordedTransactionMapping,
|
||||
vaultService,
|
||||
database
|
||||
)
|
||||
}
|
||||
|
||||
override fun createTransactionsResolver(flow: ResolveTransactionsFlow): TransactionsResolver {
|
||||
return DbTransactionsResolver(flow)
|
||||
override fun createTransactionsResolver(flow: ResolveTransactionsFlow): TransactionsResolver = DbTransactionsResolver(flow)
|
||||
|
||||
/**
|
||||
* Provides a way to topologically sort SignedTransactions. This means that given any two transactions T1 and T2 in the
|
||||
* list returned by [complete] if T1 is a dependency of T2 then T1 will occur earlier than T2.
|
||||
*/
|
||||
private class TopologicalSort {
|
||||
private val forwardGraph = HashMap<SecureHash, MutableSet<SignedTransaction>>()
|
||||
private val transactions = ArrayList<SignedTransaction>()
|
||||
|
||||
/**
|
||||
* Add a transaction to the to-be-sorted set of transactions.
|
||||
*/
|
||||
fun add(stx: SignedTransaction, dependencies: Set<SecureHash>) {
|
||||
dependencies.forEach {
|
||||
// Note that we use a LinkedHashSet here to make the traversal deterministic (as long as the input list is).
|
||||
forwardGraph.computeIfAbsent(it) { LinkedHashSet() }.add(stx)
|
||||
}
|
||||
transactions += stx
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the sorted list of signed transactions.
|
||||
*/
|
||||
fun complete(): List<SignedTransaction> {
|
||||
val visited = HashSet<SecureHash>(transactions.size)
|
||||
val result = ArrayList<SignedTransaction>(transactions.size)
|
||||
|
||||
fun visit(transaction: SignedTransaction) {
|
||||
if (visited.add(transaction.id)) {
|
||||
forwardGraph[transaction.id]?.forEach(::visit)
|
||||
result += transaction
|
||||
}
|
||||
}
|
||||
|
||||
transactions.forEach(::visit)
|
||||
|
||||
return result.apply(Collections::reverse)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -196,11 +196,14 @@ open class MockServices private constructor(
|
||||
override var networkParametersService: NetworkParametersService = MockNetworkParametersStorage(networkParameters)
|
||||
override val vaultService: VaultService = makeVaultService(schemaService, persistence, cordappLoader)
|
||||
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
|
||||
ServiceHubInternal.recordTransactions(statesToRecord, txs,
|
||||
ServiceHubInternal.recordTransactions(
|
||||
statesToRecord,
|
||||
txs as? Collection ?: txs.toList(),
|
||||
validatedTransactions as WritableTransactionStorage,
|
||||
mockStateMachineRecordedTransactionMappingStorage,
|
||||
vaultService as VaultServiceInternal,
|
||||
persistence)
|
||||
persistence
|
||||
)
|
||||
}
|
||||
|
||||
override fun jdbcSession(): Connection = persistence.createSession()
|
||||
|
Loading…
Reference in New Issue
Block a user