ENT-3801 Backport to OS (#5355)

* [ENT-3801] Store transactions in the database during transaction resolution (#2305)

* ENT-3801: Store downloaded txns as part of the backchain resolution into the db rather than the checkpoint

It's very inefficient to store the downloaded backchain in the checkpoint as more of it downloaded. Instead, if a threshold is reached (which currently defaults at 0) then the backchain is stored in the transactions table as unverified. A new is_verified column has been added to track this. Initially testing on the OS codebase has been very promising but unfortunately this current code is not quite ready. I had to quickly port it to ENT as this is meant to be an ENT-only optimisation.

To that effect, there is a TransactionResolver abstraction with two implementations: an in-memory one which has the old behaviour, and which will be the behaviour for OS, and a db one.

DBTransactionStorage hasn't been fully updated and I had to comment out the optimistic path for now.

Most of these changes will need to be ported to OS to keep the merge conflicts in check, but obviously not DbTransactionsResolver and the "is_verified" changes in DBTransactionStorage. DBTransactionStorage does have other refactoring which will make sense to port though.

* [ENT-3801] Start work on allowing modifications in AppendOnlyPersistentMap

* [ENT-3801] Add transaction resolver tests

* [ENT-3801] Adjust suspendable annotations

* [ENT-3801] Fix the ResolveTransactionFlow tests

* [ENT-3801] Update ResolveTransactionsFlow tests

* [ENT-3801] Add a liquibase migration script for isVerified

* [ENT-3801] Ensure the migration runs in the correct place

* [ENT-3801] Handle resolution of already present transactions

* [ENT-3801] Fix compile error in performance test app

* [ENT-3801] Logging and comment updates, plus a test case

* [ENT-3801] Add a notary change resolution test

* [ENT-3801] Add a contract upgrade transaction test

* [ENT-3801] Change new column to be a character based status

* [ENT-3801] Migration script type change

* [ENT-3801] Address first round of review comments

* [ENT-3801] Update variable names in AppendOnlyPersistentMap

* [ENT-3801] Another variable name clarification

* [ENT-3801] Fix missing name changes

* [ENT-3801] Make the signature list immutable when constructing cache value

* [ENT-3801] Add a locking strategy for unverified transactions

* [ENT-3801] Address tidying up review comments

* [ENT-3801] First attempt at ensuring locks are released after commit

* [ENT-3801] Remove references to old cache name

* [ENT-3801] Update locking logic

* [ENT-3801] Fix potential deadlock with read/write transaction locks

* [ENT-3801] Remove read locks, and ensure minimal extra suspends

* [ENT-3801] Fix build issues in tests

* [ENT-3801] Use the correct clock when calculating sleep durations

* [ENT-3801] Add a pessimism flag for writing verified transactions

* [ENT-3801] Change logging statement to debug

(cherry picked from commit 8ab6a55e17)

* [NOTICK] Fix up imports for some changed files

* [NOTICK] Fix transaction resolution tests

* [NOTICK] Reinstate the DBTransactionsResolver

* [NOTICK] Add the topological sort back to recordTransactions

* [NOTICK] Adjust test case to remove dependency on query ordering

* [NOTICK] Make test code match that in ENT
This commit is contained in:
James Higgs
2019-08-07 16:49:49 +01:00
committed by Rick Parker
parent 39094f1918
commit 44428b6048
25 changed files with 1184 additions and 517 deletions

View File

@ -77,7 +77,8 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
val (fromDisk, toFetch) = loadWhatWeHave()
return if (toFetch.isEmpty()) {
Result(fromDisk, emptyList())
val loadedFromDisk = loadExpected(fromDisk)
Result(loadedFromDisk, emptyList())
} else {
logger.debug { "Requesting ${toFetch.size} dependency(s) for verification from ${otherSideSession.counterparty.name}" }
@ -99,7 +100,9 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
val downloaded = validateFetchResponse(UntrustworthyData(maybeItems), toFetch)
logger.debug { "Fetched ${downloaded.size} elements from ${otherSideSession.counterparty.name}" }
maybeWriteToDisk(downloaded)
Result(fromDisk, downloaded)
// Re-load items already present before the download procedure. This ensures these objects are not unnecessarily checkpointed.
val loadedFromDisk = loadExpected(fromDisk)
Result(loadedFromDisk, downloaded)
}
}
@ -107,19 +110,29 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
// Do nothing by default.
}
private fun loadWhatWeHave(): Pair<List<T>, List<SecureHash>> {
val fromDisk = ArrayList<T>()
private fun loadWhatWeHave(): Pair<List<SecureHash>, List<SecureHash>> {
val fromDisk = ArrayList<SecureHash>()
val toFetch = ArrayList<SecureHash>()
for (txid in requests) {
val stx = load(txid)
if (stx == null)
toFetch += txid
else
fromDisk += stx
// Although the full object is loaded here, only return the id. This prevents the full set of objects already present from
// being checkpointed every time a request is made to download an object the node does not yet have.
fromDisk += txid
}
return Pair(fromDisk, toFetch)
}
private fun loadExpected(ids: List<SecureHash>): List<T> {
val loaded = ids.mapNotNull { load(it) }
require(ids.size == loaded.size) {
"Expected to find ${ids.size} items in database but only found ${loaded.size} items"
}
return loaded
}
protected abstract fun load(txid: SecureHash): T?
protected open fun convert(wire: W): T = uncheckedCast(wire)

View File

@ -2,37 +2,28 @@ package net.corda.core.internal
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.DeleteForDJVM
import net.corda.core.contracts.TransactionResolutionException
import net.corda.core.contracts.TransactionVerificationException
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.node.ServiceHub
import net.corda.core.node.StatesToRecord
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.ContractUpgradeWireTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.exactAdd
import java.util.*
import kotlin.collections.ArrayList
import kotlin.math.min
// TODO: This code is currently unit tested by TwoPartyTradeFlowTests, it should have its own tests.
/**
* Resolves transactions for the specified [txHashes] along with their full history (dependency graph) from [otherSide].
* Each retrieved transaction is validated and inserted into the local transaction storage.
*/
@DeleteForDJVM
class ResolveTransactionsFlow(txHashesArg: Set<SecureHash>,
private val otherSide: FlowSession,
private val statesToRecord: StatesToRecord = StatesToRecord.NONE) : FlowLogic<Unit>() {
class ResolveTransactionsFlow private constructor(
val initialTx: SignedTransaction?,
val txHashes: Set<SecureHash>,
val otherSide: FlowSession,
val statesToRecord: StatesToRecord
) : FlowLogic<Unit>() {
// Need it ordered in terms of iteration. Needs to be a variable for the check-pointing logic to work.
private val txHashes = txHashesArg.toList()
/** Transaction to fetch attachments for. */
private var signedTransaction: SignedTransaction? = null
constructor(txHashes: Set<SecureHash>, otherSide: FlowSession, statesToRecord: StatesToRecord = StatesToRecord.NONE)
: this(null, txHashes, otherSide, statesToRecord)
/**
* Resolves and validates the dependencies of the specified [SignedTransaction]. Fetches the attachments, but does
@ -40,163 +31,74 @@ class ResolveTransactionsFlow(txHashesArg: Set<SecureHash>,
*
* @return a list of verified [SignedTransaction] objects, in a depth-first order.
*/
constructor(signedTransaction: SignedTransaction, otherSide: FlowSession) : this(dependencyIDs(signedTransaction), otherSide) {
this.signedTransaction = signedTransaction
}
constructor(transaction: SignedTransaction, otherSide: FlowSession, statesToRecord: StatesToRecord = StatesToRecord.NONE)
: this(transaction, transaction.dependencies, otherSide, statesToRecord)
constructor(signedTransaction: SignedTransaction, otherSide: FlowSession, statesToRecord: StatesToRecord) : this(dependencyIDs(signedTransaction), otherSide, statesToRecord) {
this.signedTransaction = signedTransaction
}
@DeleteForDJVM
companion object {
private fun dependencyIDs(stx: SignedTransaction) = stx.inputs.map { it.txhash }.toSet() + stx.references.map { it.txhash }.toSet()
private const val RESOLUTION_PAGE_SIZE = 100
/** Topologically sorts the given transactions such that dependencies are listed before dependers. */
@JvmStatic
fun topologicalSort(transactions: Collection<SignedTransaction>): List<SignedTransaction> {
val sort = TopologicalSort()
for (tx in transactions) {
sort.add(tx)
}
return sort.complete()
}
}
class ExcessivelyLargeTransactionGraph : FlowException()
// TODO: Figure out a more appropriate DOS limit here, 5000 is simply a very bad guess.
/** The maximum number of transactions this flow will try to download before bailing out. */
var transactionCountLimit = 5000
set(value) {
require(value > 0) { "$value is not a valid count limit" }
field = value
}
private var fetchNetParamsFromCounterpart = false
@Suspendable
@Throws(FetchDataFlow.HashNotFound::class, FetchDataFlow.IllegalTransactionRequest::class)
override fun call() {
val counterpartyPlatformVersion = serviceHub.networkMapCache.getNodeByLegalIdentity(otherSide.counterparty)?.platformVersion
?: throw FlowException("Couldn't retrieve party's ${otherSide.counterparty} platform version from NetworkMapCache")
val newTxns = ArrayList<SignedTransaction>(txHashes.size)
// Start fetching data.
for (pageNumber in 0..(txHashes.size - 1) / RESOLUTION_PAGE_SIZE) {
val page = page(pageNumber, RESOLUTION_PAGE_SIZE)
newTxns += downloadDependencies(page)
val txsWithMissingAttachments = if (pageNumber == 0) signedTransaction?.let { newTxns + it }
?: newTxns else newTxns
fetchMissingAttachments(txsWithMissingAttachments)
// Fetch missing parameters flow was added in version 4. This check is needed so we don't end up with node V4 sending parameters
// request to node V3 that doesn't know about this protocol.
if (counterpartyPlatformVersion >= 4) {
fetchMissingParameters(txsWithMissingAttachments)
}
// TODO This error should actually cause the flow to be sent to the flow hospital to be retried
val counterpartyPlatformVersion = checkNotNull(serviceHub.networkMapCache.getNodeByLegalIdentity(otherSide.counterparty)?.platformVersion) {
"Couldn't retrieve party's ${otherSide.counterparty} platform version from NetworkMapCache"
}
otherSide.send(FetchDataFlow.Request.End)
// Finish fetching data.
// Fetch missing parameters flow was added in version 4. This check is needed so we don't end up with node V4 sending parameters
// request to node V3 that doesn't know about this protocol.
fetchNetParamsFromCounterpart = counterpartyPlatformVersion >= 4
if (initialTx != null) {
fetchMissingAttachments(initialTx)
fetchMissingNetworkParameters(initialTx)
}
val resolver = (serviceHub as ServiceHubCoreInternal).createTransactionsResolver(this)
resolver.downloadDependencies()
otherSide.send(FetchDataFlow.Request.End) // Finish fetching data.
val result = topologicalSort(newTxns)
// If transaction resolution is performed for a transaction where some states are relevant, then those should be
// recorded if this has not already occurred.
val usedStatesToRecord = if (statesToRecord == StatesToRecord.NONE) StatesToRecord.ONLY_RELEVANT else statesToRecord
result.forEach {
// For each transaction, verify it and insert it into the database. As we are iterating over them in a
// depth-first order, we should not encounter any verification failures due to missing data. If we fail
// half way through, it's no big deal, although it might result in us attempting to re-download data
// redundantly next time we attempt verification.
it.verify(serviceHub)
serviceHub.recordTransactions(usedStatesToRecord, listOf(it))
}
}
private fun page(pageNumber: Int, pageSize: Int): Set<SecureHash> {
val offset = pageNumber * pageSize
val limit = min(offset + pageSize, txHashes.size)
// call toSet() is needed because sub-lists are not checkpoint-friendly.
return txHashes.subList(offset, limit).toSet()
}
@Suspendable
// TODO use paging here (we literally get the entire dependencies graph in memory)
private fun downloadDependencies(depsToCheck: Set<SecureHash>): List<SignedTransaction> {
// Maintain a work queue of all hashes to load/download, initialised with our starting set. Then do a breadth
// first traversal across the dependency graph.
//
// TODO: This approach has two problems. Analyze and resolve them:
//
// (1) This flow leaks private data. If you download a transaction and then do NOT request a
// dependency, it means you already have it, which in turn means you must have been involved with it before
// somehow, either in the tx itself or in any following spend of it. If there were no following spends, then
// your peer knows for sure that you were involved ... this is bad! The only obvious ways to fix this are
// something like onion routing of requests, secure hardware, or both.
//
// (2) If the identity service changes the assumed identity of one of the public keys, it's possible
// that the "tx in db is valid" invariant is violated if one of the contracts checks the identity! Should
// the db contain the identities that were resolved when the transaction was first checked, or should we
// accept this kind of change is possible? Most likely solution is for identity data to be an attachment.
val nextRequests = LinkedHashSet<SecureHash>() // Keep things unique but ordered, for unit test stability.
nextRequests.addAll(depsToCheck)
val resultQ = LinkedHashMap<SecureHash, SignedTransaction>()
val limit = transactionCountLimit
var limitCounter = 0
while (nextRequests.isNotEmpty()) {
// Don't re-download the same tx when we haven't verified it yet but it's referenced multiple times in the
// graph we're traversing.
val notAlreadyFetched: Set<SecureHash> = nextRequests - resultQ.keys
nextRequests.clear()
if (notAlreadyFetched.isEmpty()) // Done early.
break
// Request the standalone transaction data (which may refer to things we don't yet have).
// TODO use paging here
val downloads: List<SignedTransaction> = subFlow(FetchTransactionsFlow(notAlreadyFetched, otherSide)).downloaded
for (stx in downloads)
check(resultQ.putIfAbsent(stx.id, stx) == null) // Assert checks the filter at the start.
// Add all input states and reference input states to the work queue.
val inputHashes = downloads.flatMap { it.inputs + it.references }.map { it.txhash }
nextRequests.addAll(inputHashes)
limitCounter = limitCounter exactAdd nextRequests.size
if (limitCounter > limit)
throw ExcessivelyLargeTransactionGraph()
}
return resultQ.values.toList()
resolver.recordDependencies(usedStatesToRecord)
}
/**
* Returns a list of all the dependencies of the given transactions, deepest first i.e. the last downloaded comes
* first in the returned list and thus doesn't have any unverified dependencies.
* Fetches the set of attachments required to verify the given transaction. If these are not already present, they will be fetched from
* a remote peer.
*
* @param transaction The transaction to fetch attachments for
* @return True if any attachments were fetched from a remote peer, false otherwise
*/
// TODO: This could be done in parallel with other fetches for extra speed.
@Suspendable
private fun fetchMissingAttachments(downloads: List<SignedTransaction>) {
val attachments = downloads.map(SignedTransaction::coreTransaction).flatMap { tx ->
when (tx) {
is WireTransaction -> tx.attachments
is ContractUpgradeWireTransaction -> listOf(tx.legacyContractAttachmentId, tx.upgradedContractAttachmentId)
else -> emptyList()
}
fun fetchMissingAttachments(transaction: SignedTransaction): Boolean {
val tx = transaction.coreTransaction
val attachmentIds = when (tx) {
is WireTransaction -> tx.attachments.toSet()
is ContractUpgradeWireTransaction -> setOf(tx.legacyContractAttachmentId, tx.upgradedContractAttachmentId)
else -> return false
}
val missingAttachments = attachments.filter { serviceHub.attachments.openAttachment(it) == null }
if (missingAttachments.isNotEmpty())
subFlow(FetchAttachmentsFlow(missingAttachments.toSet(), otherSide))
val downloads = subFlow(FetchAttachmentsFlow(attachmentIds, otherSide)).downloaded
return (downloads.isNotEmpty())
}
/**
* Fetches the network parameters under which the given transaction was created. Note that if the transaction was created pre-V4, or if
* the counterparty does not understand that network parameters may need to be fetched, no parameters will be requested.
*
* @param transaction The transaction to fetch the network parameters for, if the parameters are not already present
* @return True if the network parameters were fetched from a remote peer, false otherwise
*/
// TODO This can also be done in parallel. See comment to [fetchMissingAttachments] above.
@Suspendable
private fun fetchMissingParameters(downloads: List<SignedTransaction>) {
val parameters = downloads.mapNotNull { it.networkParametersHash }
val missingParameters = parameters.filter { !(serviceHub.networkParametersService as NetworkParametersStorage).hasParameters(it) }
if (missingParameters.isNotEmpty())
subFlow(FetchNetworkParametersFlow(missingParameters.toSet(), otherSide))
fun fetchMissingNetworkParameters(transaction: SignedTransaction): Boolean {
return if (fetchNetParamsFromCounterpart) {
transaction.networkParametersHash?.let {
val downloads = subFlow(FetchNetworkParametersFlow(setOf(it), otherSide)).downloaded
downloads.isNotEmpty()
} ?: false
} else {
false
}
}
}

View File

@ -0,0 +1,17 @@
package net.corda.core.internal
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.node.ServiceHub
import net.corda.core.node.StatesToRecord
// TODO: This should really be called ServiceHubInternal but that name is already taken by net.corda.node.services.api.ServiceHubInternal.
interface ServiceHubCoreInternal : ServiceHub {
fun createTransactionsResolver(flow: ResolveTransactionsFlow): TransactionsResolver
}
interface TransactionsResolver {
@Suspendable
fun downloadDependencies()
fun recordDependencies(usedStatesToRecord: StatesToRecord)
}

View File

@ -1,118 +0,0 @@
package net.corda.core.internal
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.SecureHash
import net.corda.core.transactions.SignedTransaction
import rx.Observable
/**
* Provides a way to topologically sort SignedTransactions. This means that given any two transactions T1 and T2 in the
* list returned by [complete] if T1 is a dependency of T2 then T1 will occur earlier than T2.
*/
class TopologicalSort {
private val forwardGraph = HashMap<SecureHash, LinkedHashSet<SignedTransaction>>()
private val transactions = ArrayList<SignedTransaction>()
/**
* Add a transaction to the to-be-sorted set of transactions.
*/
fun add(stx: SignedTransaction) {
val stateRefs = stx.references + stx.inputs
stateRefs.forEach { (txhash) ->
// Note that we use a LinkedHashSet here to make the traversal deterministic (as long as the input list is).
forwardGraph.getOrPut(txhash) { LinkedHashSet() }.add(stx)
}
transactions.add(stx)
}
/**
* Return the sorted list of signed transactions.
*/
fun complete(): List<SignedTransaction> {
val visited = HashSet<SecureHash>(transactions.size)
val result = ArrayList<SignedTransaction>(transactions.size)
fun visit(transaction: SignedTransaction) {
if (transaction.id !in visited) {
visited.add(transaction.id)
forwardGraph[transaction.id]?.forEach(::visit)
result.add(transaction)
}
}
transactions.forEach(::visit)
return result.reversed()
}
}
private fun getOutputStateRefs(stx: SignedTransaction): List<StateRef> {
return stx.coreTransaction.outputs.mapIndexed { i, _ -> StateRef(stx.id, i) }
}
/**
* Topologically sort a SignedTransaction Observable on the fly by buffering transactions until all dependencies are met.
* @param initialUnspentRefs the list of unspent references that may be spent by transactions in the observable. This is
* the initial set of references the sort uses to decide whether to buffer transactions or not. For example if this
* is empty then the Observable should start with issue transactions that don't have inputs.
*/
fun Observable<SignedTransaction>.topologicalSort(initialUnspentRefs: Collection<StateRef>): Observable<SignedTransaction> {
data class State(
val unspentRefs: HashSet<StateRef>,
val bufferedTopologicalSort: TopologicalSort,
val bufferedInputs: HashSet<StateRef>,
val bufferedOutputs: HashSet<StateRef>
)
var state = State(
unspentRefs = HashSet(initialUnspentRefs),
bufferedTopologicalSort = TopologicalSort(),
bufferedInputs = HashSet(),
bufferedOutputs = HashSet()
)
return concatMapIterable { stx ->
val results = ArrayList<SignedTransaction>()
if (state.unspentRefs.containsAll(stx.inputs)) {
// Dependencies are satisfied
state.unspentRefs.removeAll(stx.inputs)
state.unspentRefs.addAll(getOutputStateRefs(stx))
results.add(stx)
} else {
// Dependencies are not satisfied, buffer
state.bufferedTopologicalSort.add(stx)
state.bufferedInputs.addAll(stx.inputs)
for (outputRef in getOutputStateRefs(stx)) {
if (!state.bufferedInputs.remove(outputRef)) {
state.bufferedOutputs.add(outputRef)
}
}
for (inputRef in stx.inputs) {
if (!state.bufferedOutputs.remove(inputRef)) {
state.bufferedInputs.add(inputRef)
}
}
}
if (state.unspentRefs.containsAll(state.bufferedInputs)) {
// Buffer satisfied
results.addAll(state.bufferedTopologicalSort.complete())
state.unspentRefs.removeAll(state.bufferedInputs)
state.unspentRefs.addAll(state.bufferedOutputs)
state = State(
unspentRefs = state.unspentRefs,
bufferedTopologicalSort = TopologicalSort(),
bufferedInputs = HashSet(),
bufferedOutputs = HashSet()
)
results
} else {
// Buffer not satisfied
state = State(
unspentRefs = state.unspentRefs,
bufferedTopologicalSort = state.bufferedTopologicalSort,
bufferedInputs = state.bufferedInputs,
bufferedOutputs = state.bufferedOutputs
)
results
}
}
}

View File

@ -228,4 +228,7 @@ fun isAttachmentTrusted(attachment: Attachment, service: AttachmentStorage?): Bo
} else {
false
}
}
}
val SignedTransaction.dependencies: Set<SecureHash>
get() = (inputs.asSequence() + references.asSequence()).map { it.txhash }.toSet()