From a4cada4a2efe4bb1e8c3f8d5a5f2ae13d01dde7c Mon Sep 17 00:00:00 2001 From: Nick Dunstone <49945179+nickdunstone13@users.noreply.github.com> Date: Tue, 28 Jan 2020 09:04:18 +0000 Subject: [PATCH] ENT-4474 O/S Changes to support bulk backchain fetching (#5894) * Ongoing dev multi backchain fetch * Final dev pre logging message improvements * Trace messages * Trace messages * Code tidy up * Code review comments * Code review comments * Whitespace removed * Code review changes * Code reformatting --- .../corda/core/flows/SendTransactionFlow.kt | 125 +++++++++++++++++- .../net/corda/core/internal/FetchDataFlow.kt | 107 ++++++++++++--- .../core/internal/ResolveTransactionsFlow.kt | 9 +- .../core/internal/ServiceHubCoreInternal.kt | 2 +- .../node/services/DbTransactionsResolver.kt | 13 +- 5 files changed, 232 insertions(+), 24 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt b/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt index b9a9b5b480..233a89236b 100644 --- a/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/SendTransactionFlow.kt @@ -1,11 +1,62 @@ package net.corda.core.flows import co.paralleluniverse.fibers.Suspendable +import net.corda.core.contracts.NamedByHash import net.corda.core.contracts.StateAndRef import net.corda.core.crypto.SecureHash import net.corda.core.internal.* +import net.corda.core.serialization.CordaSerializable +import net.corda.core.serialization.SerializedBytes +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.unwrap +import net.corda.core.utilities.trace + +/** + * In the words of Matt working code is more important then pretty code. This class that contains code that may + * be serialized. If it were always serialized then the local disk fetch would need to serialize then de-serialize + * which wastes time. However over the wire we get batch fetch items serialized. This is because we need to get the exact + * length of the objects to pack them into the 10MB max message size buffer. We do not want to serialize them multiple times + * so it's a lot more efficient to send the byte stream. + */ +@CordaSerializable +class MaybeSerializedSignedTransaction(override val id: SecureHash, val serialized: SerializedBytes?, + val nonSerialised: SignedTransaction?) : NamedByHash { + init { + check(serialized == null || nonSerialised == null) { + "MaybeSerializedSignedTransaction: Serialized and non-serialized may not both be non-null." + } + } + + fun get(): SignedTransaction? { + return if (nonSerialised != null) { + nonSerialised + } else if (serialized != null) { + val tranBytes = SerializedBytes(serialized.bytes) + tranBytes.deserialize() + } else { + null + } + } + + fun isNull(): Boolean { + return serialized == null && nonSerialised == null + } + + fun serializedByteCount(): Int { + return serialized?.bytes?.size ?: 0 + } + + fun payloadContentDescription(): String { + val tranSize = serializedByteCount() + val isSer = serialized != null + val isObj = nonSerialised != null + return if (isNull()) { + "" + } else "size = $tranSize, serialized = $isSer, isObj = $isObj" + } +} /** * The [SendTransactionFlow] should be used to send a transaction to another peer that wishes to verify that transaction's @@ -40,6 +91,11 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any) @Suspendable override fun call(): Void? { + val networkMaxMessageSize = serviceHub.networkParameters.maxMessageSize + val maxPayloadSize = networkMaxMessageSize / 2 + + logger.trace { "DataVendingFlow: Call: Network max message size = $networkMaxMessageSize, Max Payload Size = $maxPayloadSize" } + // The first payload will be the transaction data, subsequent payload will be the transaction/attachment/network parameters data. var payload = payload @@ -64,20 +120,33 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any) // This loop will receive [FetchDataFlow.Request] continuously until the `otherSideSession` has all the data they need // to resolve the transaction, a [FetchDataFlow.EndRequest] will be sent from the `otherSideSession` to indicate end of // data request. + var loopCount = 0 while (true) { + val loopCnt = loopCount++ + logger.trace { "DataVendingFlow: Main While [$loopCnt]..." } val dataRequest = sendPayloadAndReceiveDataRequest(otherSideSession, payload).unwrap { request -> + logger.trace { "sendPayloadAndReceiveDataRequest(): ${request.javaClass.name}" } when (request) { is FetchDataFlow.Request.Data -> { // Security TODO: Check for abnormally large or malformed data requests verifyDataRequest(request) request } - FetchDataFlow.Request.End -> return null + FetchDataFlow.Request.End -> { + logger.trace { "DataVendingFlow: END" } + return null + } } } + logger.trace { "Sending data (Type = ${dataRequest.dataType.name})" } + var totalByteCount = 0 + var firstItem = true + var batchFetchCountExceeded = false + var numSent = 0 payload = when (dataRequest.dataType) { FetchDataFlow.DataType.TRANSACTION -> dataRequest.hashes.map { txId -> + logger.trace { "Sending: TRANSACTION (dataRequest.hashes.size=${dataRequest.hashes.size})" } if (!authorisedTransactions.isAuthorised(txId)) { throw FetchDataFlow.IllegalTransactionRequest(txId) } @@ -85,17 +154,71 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any) ?: throw FetchDataFlow.HashNotFound(txId) authorisedTransactions.removeAuthorised(tx.id) authorisedTransactions.addAuthorised(getInputTransactions(tx)) + totalByteCount += tx.txBits.size + numSent++ tx } + // Loop on all items returned using dataRequest.hashes.map: + FetchDataFlow.DataType.BATCH_TRANSACTION -> dataRequest.hashes.map { txId -> + if (!authorisedTransactions.isAuthorised(txId)) { + throw FetchDataFlow.IllegalTransactionRequest(txId) + } + // Maybe we should not just throw here as it's not recoverable on the client side. Might be better to send a reason code or + // remove the restriction on sending once. + logger.trace { "Transaction authorised OK: '$txId'" } + var serialized: SerializedBytes? = null + if (!batchFetchCountExceeded) { + // Only fetch and serialize if we have not already exceeded the maximum byte count. Once we have, no more fetching + // is required, just reject all additional items. + val tx = serviceHub.validatedTransactions.getTransaction(txId) + ?: throw FetchDataFlow.HashNotFound(txId) + logger.trace { "Transaction get OK: '$txId'" } + serialized = tx.serialize() + + val itemByteCount = serialized.size + logger.trace { "Batch-Send '$txId': first = $firstItem, Total bytes = $totalByteCount, Item byte count = $itemByteCount, Maximum = $maxPayloadSize" } + if (firstItem || (totalByteCount + itemByteCount) < maxPayloadSize) { + totalByteCount += itemByteCount + numSent++ + // Always include at least one item else if the max is set too low nothing will ever get returned. + // Splitting items will be a separate Jira if need be + authorisedTransactions.removeAuthorised(tx.id) + authorisedTransactions.addAuthorised(getInputTransactions(tx)) + logger.trace { "Adding item to return set: '$txId'" } + } else { + logger.trace { "Fetch block size EXCEEDED at '$txId'." } + batchFetchCountExceeded = true + } + } // end + + if (batchFetchCountExceeded) { + logger.trace { "Excluding '$txId' from return set due to exceeded count." } + } + + // Send null if limit is exceeded + val maybeserialized = MaybeSerializedSignedTransaction(txId, if (batchFetchCountExceeded) { + null + } else { + serialized + }, null) + firstItem = false + maybeserialized + } // Batch response loop end FetchDataFlow.DataType.ATTACHMENT -> dataRequest.hashes.map { + logger.trace { "Sending: Attachments for '$it'" } serviceHub.attachments.openAttachment(it)?.open()?.readFully() ?: throw FetchDataFlow.HashNotFound(it) } FetchDataFlow.DataType.PARAMETERS -> dataRequest.hashes.map { + logger.trace { "Sending: Parameters for '$it'" } (serviceHub.networkParametersService as NetworkParametersStorage).lookupSigned(it) ?: throw FetchDataFlow.MissingNetworkParameters(it) } + FetchDataFlow.DataType.UNKNOWN -> dataRequest.hashes.map { + logger.warn("Message from from a future version of Corda with UNKNOWN enum value for FetchDataFlow.DataType: ID='$it'") + } } + logger.trace { "Block total size = $totalByteCount: Num Items = ($numSent of ${dataRequest.hashes.size} total)" } } } diff --git a/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt b/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt index 9959f82804..3f05c782cb 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FetchDataFlow.kt @@ -8,10 +8,13 @@ import net.corda.core.crypto.sha256 import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowSession +import net.corda.core.flows.MaybeSerializedSignedTransaction import net.corda.core.internal.FetchDataFlow.DownloadedVsRequestedDataMismatch import net.corda.core.internal.FetchDataFlow.HashNotFound import net.corda.core.node.NetworkParameters import net.corda.core.serialization.CordaSerializable +import net.corda.core.serialization.CordaSerializationTransformEnumDefault +import net.corda.core.serialization.CordaSerializationTransformEnumDefaults import net.corda.core.serialization.SerializationToken import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SerializeAsTokenContext @@ -20,6 +23,7 @@ import net.corda.core.utilities.NonEmptySet import net.corda.core.utilities.UntrustworthyData import net.corda.core.utilities.debug import net.corda.core.utilities.unwrap +import net.corda.core.utilities.trace import java.nio.file.FileAlreadyExistsException import java.util.* @@ -39,6 +43,7 @@ import java.util.* * @param T The ultimate type of the data being fetched. * @param W The wire type of the data being fetched, for when it isn't the same as the ultimate type. */ + sealed class FetchDataFlow( protected val requests: Set, protected val otherSideSession: FlowSession, @@ -54,7 +59,8 @@ sealed class FetchDataFlow( class MissingNetworkParameters(val requested: SecureHash) : FlowException("Failed to fetch network parameters with hash: $requested") - class IllegalTransactionRequest(val requested: SecureHash) : FlowException("Illegal attempt to request a transaction (${requested}) that is not in the transitive dependency graph of the sent transaction.") + class IllegalTransactionRequest(val requested: SecureHash) : FlowException("Illegal attempt to request a transaction ($requested)" + + " that is not in the transitive dependency graph of the sent transaction.") @CordaSerializable data class Result(val fromDisk: List, val downloaded: List) @@ -65,9 +71,18 @@ sealed class FetchDataFlow( object End : Request() } + // https://docs.corda.net/serialization-enum-evolution.html + // Below annotations added to map two new enum values (BATCH_TRANSACTION and UNKNOWN) onto TRANSACTION. The effect of this is that + // if a that does not have these enum values receives it will not throw an error during deserialization. The purpose of adding + // UNKNOWN is such that future additions can default to UNKNOWN rather than an existing value. In this instance we are protecting + // against not having unknown by using the platform version as a guard. + @CordaSerializationTransformEnumDefaults( + CordaSerializationTransformEnumDefault("BATCH_TRANSACTION", "TRANSACTION"), + CordaSerializationTransformEnumDefault("UNKNOWN", "TRANSACTION") + ) @CordaSerializable enum class DataType { - TRANSACTION, ATTACHMENT, PARAMETERS + TRANSACTION, ATTACHMENT, PARAMETERS, BATCH_TRANSACTION, UNKNOWN } @Suspendable @@ -77,9 +92,11 @@ sealed class FetchDataFlow( val (fromDisk, toFetch) = loadWhatWeHave() return if (toFetch.isEmpty()) { + logger.trace { "FetchDataFlow.call(): loadWhatWeHave(): From disk size = ${fromDisk.size}: No items to fetch." } val loadedFromDisk = loadExpected(fromDisk) Result(loadedFromDisk, emptyList()) } else { + logger.trace { "FetchDataFlow.call(): loadWhatWeHave(): From disk size = ${fromDisk.size}, To-fetch size = ${toFetch.size}" } logger.debug { "Requesting ${toFetch.size} dependency(s) for verification from ${otherSideSession.counterparty.name}" } // TODO: Support "large message" response streaming so response sizes are not limited by RAM. @@ -89,17 +106,27 @@ sealed class FetchDataFlow( // configured Artemis to not fragment messages up to 10mb so we can send 10mb messages without problems. // Above that, we start losing authentication data on the message fragments and take exceptions in the // network layer. - val maybeItems = ArrayList(toFetch.size) - for (hash in toFetch) { + val maybeItems = ArrayList() + if (toFetch.size == 1) { + val hash = toFetch.single() // We skip the validation here (with unwrap { it }) because we will do it below in validateFetchResponse. - // The only thing checked is the object type. It is a protocol violation to send results out of order. + // The only thing checked is the object type. // TODO We need to page here after large messages will work. + logger.trace { "[Single fetch]: otherSideSession.sendAndReceive($hash): Fetch type: ${dataType.name}" } + // should only pass single item dataType below. maybeItems += otherSideSession.sendAndReceive>(Request.Data(NonEmptySet.of(hash), dataType)).unwrap { it } + } else { + logger.trace { "[Batch fetch]: otherSideSession.sendAndReceive(set of ${toFetch.size}): Fetch type: ${dataType.name})" } + maybeItems += otherSideSession.sendAndReceive>(Request.Data(NonEmptySet.copyOf(toFetch), dataType)) + .unwrap { it } + logger.trace { "[Batch fetch]: otherSideSession.sendAndReceive Done: count= ${maybeItems.size})" } } + // Check for a buggy/malicious peer answering with something that we didn't ask for. val downloaded = validateFetchResponse(UntrustworthyData(maybeItems), toFetch) - logger.debug { "Fetched ${downloaded.size} elements from ${otherSideSession.counterparty.name}" } + logger.trace { "Fetched ${downloaded.size} elements from ${otherSideSession.counterparty.name}, maybeItems.size = ${maybeItems.size}" } maybeWriteToDisk(downloaded) + // Re-load items already present before the download procedure. This ensures these objects are not unnecessarily checkpointed. val loadedFromDisk = loadExpected(fromDisk) Result(loadedFromDisk, downloaded) @@ -110,9 +137,9 @@ sealed class FetchDataFlow( // Do nothing by default. } - private fun loadWhatWeHave(): Pair, List> { + private fun loadWhatWeHave(): Pair, Set> { val fromDisk = ArrayList() - val toFetch = ArrayList() + val toFetch = LinkedHashSet() for (txid in requests) { val stx = load(txid) if (stx == null) @@ -137,18 +164,52 @@ sealed class FetchDataFlow( protected open fun convert(wire: W): T = uncheckedCast(wire) + @Suppress("ComplexMethod") private fun validateFetchResponse(maybeItems: UntrustworthyData>, - requests: List): List { + requests: Set): List { return maybeItems.unwrap { response -> - if (response.size != requests.size) - throw DownloadedVsRequestedSizeMismatch(requests.size, response.size) + logger.trace { "validateFetchResponse(): Response size = ${response.size}, Request size = ${requests.size}" } + if (response.size != requests.size) { + logger.trace { "maybeItems.unwrap: RespType Response.size (${requests.size}) != requests.size (${response.size})" } + throw FetchDataFlow.DownloadedVsRequestedSizeMismatch(requests.size, response.size) + } + + if (logger.isTraceEnabled()) { + logger.trace { "Request size = ${requests.size}" } + for ((reqInd, req) in requests.withIndex()) { + logger.trace { "Requested[$reqInd] = '$req'" } + } + } + val answers = response.map { convert(it) } + if (logger.isTraceEnabled()) { + logger.trace { "Answers size = ${answers.size}" } + for ((respInd, item) in answers.withIndex()) { + if (item is MaybeSerializedSignedTransaction) { + logger.trace { "ValidateItem[$respInd]: '${item.id}': Type = MaybeSerializedSignedTransaction: ${item.payloadContentDescription()}" } + } else { + logger.trace("ValidateItem[$respInd]: Type = ${item.javaClass.name}") + } + } + } + // Check transactions actually hash to what we requested, if this fails the remote node // is a malicious flow violator or buggy. - for ((index, item) in answers.withIndex()) { - if (item.id != requests[index]) - throw DownloadedVsRequestedDataMismatch(requests[index], item.id) + var badDataIndex = -1 + var badDataId: SecureHash? = null + for ((index, item) in requests.withIndex()) { + if (item != answers[index].id) { + badDataIndex = index + badDataId = item + logger.info("Will Throw on DownloadedVsRequestedDataMismatch(Req item = '$item', Resp item = '${answers[index].id}'") + } } + + if (badDataIndex >= 0 && badDataId != null) { + logger.error("Throwing DownloadedVsRequestedDataMismatch due to bad verification on: ID = $badDataId, Answer[$badDataIndex]='${answers[badDataIndex].id}'") + throw DownloadedVsRequestedDataMismatch(badDataId, answers[badDataIndex].id) + } + answers } } @@ -212,13 +273,27 @@ class FetchTransactionsFlow(requests: Set, otherSide: FlowSession) : override fun load(txid: SecureHash): SignedTransaction? = serviceHub.validatedTransactions.getTransaction(txid) } +class FetchBatchTransactionsFlow(requests: Set, otherSide: FlowSession) : + FetchDataFlow(requests, otherSide, DataType.BATCH_TRANSACTION) { + + override fun load(txid: SecureHash): MaybeSerializedSignedTransaction? { + val tran = serviceHub.validatedTransactions.getTransaction(txid) + return if (tran == null) { + null + } else { + MaybeSerializedSignedTransaction(txid, null, tran) + } + } +} + /** * Given a set of hashes either loads from local network parameters storage or requests them from the other peer. Downloaded * network parameters are saved to local parameters storage automatically. This flow can be used only if the minimumPlatformVersion is >= 4. * Nodes on lower versions won't respond to this flow. */ class FetchNetworkParametersFlow(requests: Set, - otherSide: FlowSession) : FetchDataFlow, SignedDataWithCert>(requests, otherSide, DataType.PARAMETERS) { + otherSide: FlowSession) : FetchDataFlow, + SignedDataWithCert>(requests, otherSide, DataType.PARAMETERS) { override fun load(txid: SecureHash): SignedDataWithCert? { return (serviceHub.networkParametersService as NetworkParametersStorage).lookupSigned(txid) } @@ -235,4 +310,4 @@ class FetchNetworkParametersFlow(requests: Set, } } } -} \ No newline at end of file +} diff --git a/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt b/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt index 7136f9f353..40df9502b5 100644 --- a/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt +++ b/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt @@ -9,6 +9,8 @@ import net.corda.core.node.StatesToRecord import net.corda.core.transactions.ContractUpgradeWireTransaction import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.WireTransaction +import net.corda.core.utilities.debug +import net.corda.core.utilities.trace /** * Resolves transactions for the specified [txHashes] along with their full history (dependency graph) from [otherSide]. @@ -36,15 +38,19 @@ class ResolveTransactionsFlow private constructor( private var fetchNetParamsFromCounterpart = false + @Suppress("MagicNumber") @Suspendable override fun call() { // TODO This error should actually cause the flow to be sent to the flow hospital to be retried val counterpartyPlatformVersion = checkNotNull(serviceHub.networkMapCache.getNodeByLegalIdentity(otherSide.counterparty)?.platformVersion) { "Couldn't retrieve party's ${otherSide.counterparty} platform version from NetworkMapCache" } + // Fetch missing parameters flow was added in version 4. This check is needed so we don't end up with node V4 sending parameters // request to node V3 that doesn't know about this protocol. fetchNetParamsFromCounterpart = counterpartyPlatformVersion >= 4 + val batchMode = counterpartyPlatformVersion >= 6 + logger.debug { "ResolveTransactionsFlow.call(): Otherside Platform Version = '$counterpartyPlatformVersion': Batch mode = $batchMode" } if (initialTx != null) { fetchMissingAttachments(initialTx) @@ -52,8 +58,9 @@ class ResolveTransactionsFlow private constructor( } val resolver = (serviceHub as ServiceHubCoreInternal).createTransactionsResolver(this) - resolver.downloadDependencies() + resolver.downloadDependencies(batchMode) + logger.trace { "ResolveTransactionsFlow: Sending END." } otherSide.send(FetchDataFlow.Request.End) // Finish fetching data. // If transaction resolution is performed for a transaction where some states are relevant, then those should be diff --git a/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt b/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt index 5cf4ad2e59..04cc543b81 100644 --- a/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt +++ b/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt @@ -19,7 +19,7 @@ interface ServiceHubCoreInternal : ServiceHub { interface TransactionsResolver { @Suspendable - fun downloadDependencies() + fun downloadDependencies(batchMode: Boolean) fun recordDependencies(usedStatesToRecord: StatesToRecord) } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt b/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt index afd668f649..28ec4ed962 100644 --- a/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt +++ b/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt @@ -10,6 +10,7 @@ import net.corda.core.internal.dependencies import net.corda.core.node.StatesToRecord import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.debug +import net.corda.core.utilities.trace import net.corda.core.utilities.seconds import net.corda.node.services.api.WritableTransactionStorage import java.util.* @@ -19,7 +20,7 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa private val logger = flow.logger @Suspendable - override fun downloadDependencies() { + override fun downloadDependencies(batchMode: Boolean) { logger.debug { "Downloading dependencies for transactions ${flow.txHashes}" } val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage @@ -39,10 +40,12 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa // the db contain the identities that were resolved when the transaction was first checked, or should we // accept this kind of change is possible? Most likely solution is for identity data to be an attachment. - val nextRequests = LinkedHashSet(flow.txHashes) // Keep things unique but ordered, for unit test stability. + val nextRequests = LinkedHashSet(flow.txHashes) // Keep things unique but ordered, for unit test stability. val topologicalSort = TopologicalSort() + logger.debug { "DbTransactionsResolver.downloadDependencies(batchMode=$batchMode)" } while (nextRequests.isNotEmpty()) { + logger.debug { "Main fetch loop: size_remaining=${nextRequests.size}" } // Don't re-download the same tx when we haven't verified it yet but it's referenced multiple times in the // graph we're traversing. nextRequests.removeAll(topologicalSort.transactionIds) @@ -76,8 +79,8 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa nextRequests.addAll(dependencies) } - // If the flow did not suspend on the last iteration of the downloaded loop above, perform a suspend here to ensure no write - // locks are held going into the next while loop iteration. + // If the flow did not suspend on the last iteration of the downloaded loop above, perform a suspend here to ensure that + // all data is flushed to the database. if (!suspended) { FlowLogic.sleep(0.seconds) } @@ -93,7 +96,7 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa override fun recordDependencies(usedStatesToRecord: StatesToRecord) { val sortedDependencies = checkNotNull(this.sortedDependencies) - logger.debug { "Recording ${sortedDependencies.size} dependencies for ${flow.txHashes.size} transactions" } + logger.trace { "Recording ${sortedDependencies.size} dependencies for ${flow.txHashes.size} transactions" } val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage for (txId in sortedDependencies) { // Retrieve and delete the transaction from the unverified store.