ENT-4474 O/S Changes to support bulk backchain fetching (#5894)

* Ongoing dev multi backchain fetch

* Final dev pre logging message improvements

* Trace messages

* Trace messages

* Code tidy up

* Code review comments

* Code review comments

* Whitespace removed

* Code review changes

* Code reformatting
This commit is contained in:
Nick Dunstone 2020-01-28 09:04:18 +00:00 committed by Christian Sailer
parent 16532299c5
commit a4cada4a2e
5 changed files with 232 additions and 24 deletions

View File

@ -1,11 +1,62 @@
package net.corda.core.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.NamedByHash
import net.corda.core.contracts.StateAndRef
import net.corda.core.crypto.SecureHash
import net.corda.core.internal.*
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.unwrap
import net.corda.core.utilities.trace
/**
* In the words of Matt working code is more important then pretty code. This class that contains code that may
* be serialized. If it were always serialized then the local disk fetch would need to serialize then de-serialize
* which wastes time. However over the wire we get batch fetch items serialized. This is because we need to get the exact
* length of the objects to pack them into the 10MB max message size buffer. We do not want to serialize them multiple times
* so it's a lot more efficient to send the byte stream.
*/
@CordaSerializable
class MaybeSerializedSignedTransaction(override val id: SecureHash, val serialized: SerializedBytes<SignedTransaction>?,
val nonSerialised: SignedTransaction?) : NamedByHash {
init {
check(serialized == null || nonSerialised == null) {
"MaybeSerializedSignedTransaction: Serialized and non-serialized may not both be non-null."
}
}
fun get(): SignedTransaction? {
return if (nonSerialised != null) {
nonSerialised
} else if (serialized != null) {
val tranBytes = SerializedBytes<SignedTransaction>(serialized.bytes)
tranBytes.deserialize()
} else {
null
}
}
fun isNull(): Boolean {
return serialized == null && nonSerialised == null
}
fun serializedByteCount(): Int {
return serialized?.bytes?.size ?: 0
}
fun payloadContentDescription(): String {
val tranSize = serializedByteCount()
val isSer = serialized != null
val isObj = nonSerialised != null
return if (isNull()) {
"<Null>"
} else "size = $tranSize, serialized = $isSer, isObj = $isObj"
}
}
/**
* The [SendTransactionFlow] should be used to send a transaction to another peer that wishes to verify that transaction's
@ -40,6 +91,11 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any)
@Suspendable
override fun call(): Void? {
val networkMaxMessageSize = serviceHub.networkParameters.maxMessageSize
val maxPayloadSize = networkMaxMessageSize / 2
logger.trace { "DataVendingFlow: Call: Network max message size = $networkMaxMessageSize, Max Payload Size = $maxPayloadSize" }
// The first payload will be the transaction data, subsequent payload will be the transaction/attachment/network parameters data.
var payload = payload
@ -64,20 +120,33 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any)
// This loop will receive [FetchDataFlow.Request] continuously until the `otherSideSession` has all the data they need
// to resolve the transaction, a [FetchDataFlow.EndRequest] will be sent from the `otherSideSession` to indicate end of
// data request.
var loopCount = 0
while (true) {
val loopCnt = loopCount++
logger.trace { "DataVendingFlow: Main While [$loopCnt]..." }
val dataRequest = sendPayloadAndReceiveDataRequest(otherSideSession, payload).unwrap { request ->
logger.trace { "sendPayloadAndReceiveDataRequest(): ${request.javaClass.name}" }
when (request) {
is FetchDataFlow.Request.Data -> {
// Security TODO: Check for abnormally large or malformed data requests
verifyDataRequest(request)
request
}
FetchDataFlow.Request.End -> return null
FetchDataFlow.Request.End -> {
logger.trace { "DataVendingFlow: END" }
return null
}
}
}
logger.trace { "Sending data (Type = ${dataRequest.dataType.name})" }
var totalByteCount = 0
var firstItem = true
var batchFetchCountExceeded = false
var numSent = 0
payload = when (dataRequest.dataType) {
FetchDataFlow.DataType.TRANSACTION -> dataRequest.hashes.map { txId ->
logger.trace { "Sending: TRANSACTION (dataRequest.hashes.size=${dataRequest.hashes.size})" }
if (!authorisedTransactions.isAuthorised(txId)) {
throw FetchDataFlow.IllegalTransactionRequest(txId)
}
@ -85,17 +154,71 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any)
?: throw FetchDataFlow.HashNotFound(txId)
authorisedTransactions.removeAuthorised(tx.id)
authorisedTransactions.addAuthorised(getInputTransactions(tx))
totalByteCount += tx.txBits.size
numSent++
tx
}
// Loop on all items returned using dataRequest.hashes.map:
FetchDataFlow.DataType.BATCH_TRANSACTION -> dataRequest.hashes.map { txId ->
if (!authorisedTransactions.isAuthorised(txId)) {
throw FetchDataFlow.IllegalTransactionRequest(txId)
}
// Maybe we should not just throw here as it's not recoverable on the client side. Might be better to send a reason code or
// remove the restriction on sending once.
logger.trace { "Transaction authorised OK: '$txId'" }
var serialized: SerializedBytes<SignedTransaction>? = null
if (!batchFetchCountExceeded) {
// Only fetch and serialize if we have not already exceeded the maximum byte count. Once we have, no more fetching
// is required, just reject all additional items.
val tx = serviceHub.validatedTransactions.getTransaction(txId)
?: throw FetchDataFlow.HashNotFound(txId)
logger.trace { "Transaction get OK: '$txId'" }
serialized = tx.serialize()
val itemByteCount = serialized.size
logger.trace { "Batch-Send '$txId': first = $firstItem, Total bytes = $totalByteCount, Item byte count = $itemByteCount, Maximum = $maxPayloadSize" }
if (firstItem || (totalByteCount + itemByteCount) < maxPayloadSize) {
totalByteCount += itemByteCount
numSent++
// Always include at least one item else if the max is set too low nothing will ever get returned.
// Splitting items will be a separate Jira if need be
authorisedTransactions.removeAuthorised(tx.id)
authorisedTransactions.addAuthorised(getInputTransactions(tx))
logger.trace { "Adding item to return set: '$txId'" }
} else {
logger.trace { "Fetch block size EXCEEDED at '$txId'." }
batchFetchCountExceeded = true
}
} // end
if (batchFetchCountExceeded) {
logger.trace { "Excluding '$txId' from return set due to exceeded count." }
}
// Send null if limit is exceeded
val maybeserialized = MaybeSerializedSignedTransaction(txId, if (batchFetchCountExceeded) {
null
} else {
serialized
}, null)
firstItem = false
maybeserialized
} // Batch response loop end
FetchDataFlow.DataType.ATTACHMENT -> dataRequest.hashes.map {
logger.trace { "Sending: Attachments for '$it'" }
serviceHub.attachments.openAttachment(it)?.open()?.readFully()
?: throw FetchDataFlow.HashNotFound(it)
}
FetchDataFlow.DataType.PARAMETERS -> dataRequest.hashes.map {
logger.trace { "Sending: Parameters for '$it'" }
(serviceHub.networkParametersService as NetworkParametersStorage).lookupSigned(it)
?: throw FetchDataFlow.MissingNetworkParameters(it)
}
FetchDataFlow.DataType.UNKNOWN -> dataRequest.hashes.map {
logger.warn("Message from from a future version of Corda with UNKNOWN enum value for FetchDataFlow.DataType: ID='$it'")
}
}
logger.trace { "Block total size = $totalByteCount: Num Items = ($numSent of ${dataRequest.hashes.size} total)" }
}
}

View File

@ -8,10 +8,13 @@ import net.corda.core.crypto.sha256
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.MaybeSerializedSignedTransaction
import net.corda.core.internal.FetchDataFlow.DownloadedVsRequestedDataMismatch
import net.corda.core.internal.FetchDataFlow.HashNotFound
import net.corda.core.node.NetworkParameters
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.CordaSerializationTransformEnumDefault
import net.corda.core.serialization.CordaSerializationTransformEnumDefaults
import net.corda.core.serialization.SerializationToken
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SerializeAsTokenContext
@ -20,6 +23,7 @@ import net.corda.core.utilities.NonEmptySet
import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.debug
import net.corda.core.utilities.unwrap
import net.corda.core.utilities.trace
import java.nio.file.FileAlreadyExistsException
import java.util.*
@ -39,6 +43,7 @@ import java.util.*
* @param T The ultimate type of the data being fetched.
* @param W The wire type of the data being fetched, for when it isn't the same as the ultimate type.
*/
sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
protected val requests: Set<SecureHash>,
protected val otherSideSession: FlowSession,
@ -54,7 +59,8 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
class MissingNetworkParameters(val requested: SecureHash) : FlowException("Failed to fetch network parameters with hash: $requested")
class IllegalTransactionRequest(val requested: SecureHash) : FlowException("Illegal attempt to request a transaction (${requested}) that is not in the transitive dependency graph of the sent transaction.")
class IllegalTransactionRequest(val requested: SecureHash) : FlowException("Illegal attempt to request a transaction ($requested)"
+ " that is not in the transitive dependency graph of the sent transaction.")
@CordaSerializable
data class Result<out T : NamedByHash>(val fromDisk: List<T>, val downloaded: List<T>)
@ -65,9 +71,18 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
object End : Request()
}
// https://docs.corda.net/serialization-enum-evolution.html
// Below annotations added to map two new enum values (BATCH_TRANSACTION and UNKNOWN) onto TRANSACTION. The effect of this is that
// if a that does not have these enum values receives it will not throw an error during deserialization. The purpose of adding
// UNKNOWN is such that future additions can default to UNKNOWN rather than an existing value. In this instance we are protecting
// against not having unknown by using the platform version as a guard.
@CordaSerializationTransformEnumDefaults(
CordaSerializationTransformEnumDefault("BATCH_TRANSACTION", "TRANSACTION"),
CordaSerializationTransformEnumDefault("UNKNOWN", "TRANSACTION")
)
@CordaSerializable
enum class DataType {
TRANSACTION, ATTACHMENT, PARAMETERS
TRANSACTION, ATTACHMENT, PARAMETERS, BATCH_TRANSACTION, UNKNOWN
}
@Suspendable
@ -77,9 +92,11 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
val (fromDisk, toFetch) = loadWhatWeHave()
return if (toFetch.isEmpty()) {
logger.trace { "FetchDataFlow.call(): loadWhatWeHave(): From disk size = ${fromDisk.size}: No items to fetch." }
val loadedFromDisk = loadExpected(fromDisk)
Result(loadedFromDisk, emptyList())
} else {
logger.trace { "FetchDataFlow.call(): loadWhatWeHave(): From disk size = ${fromDisk.size}, To-fetch size = ${toFetch.size}" }
logger.debug { "Requesting ${toFetch.size} dependency(s) for verification from ${otherSideSession.counterparty.name}" }
// TODO: Support "large message" response streaming so response sizes are not limited by RAM.
@ -89,17 +106,27 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
// configured Artemis to not fragment messages up to 10mb so we can send 10mb messages without problems.
// Above that, we start losing authentication data on the message fragments and take exceptions in the
// network layer.
val maybeItems = ArrayList<W>(toFetch.size)
for (hash in toFetch) {
val maybeItems = ArrayList<W>()
if (toFetch.size == 1) {
val hash = toFetch.single()
// We skip the validation here (with unwrap { it }) because we will do it below in validateFetchResponse.
// The only thing checked is the object type. It is a protocol violation to send results out of order.
// The only thing checked is the object type.
// TODO We need to page here after large messages will work.
logger.trace { "[Single fetch]: otherSideSession.sendAndReceive($hash): Fetch type: ${dataType.name}" }
// should only pass single item dataType below.
maybeItems += otherSideSession.sendAndReceive<List<W>>(Request.Data(NonEmptySet.of(hash), dataType)).unwrap { it }
} else {
logger.trace { "[Batch fetch]: otherSideSession.sendAndReceive(set of ${toFetch.size}): Fetch type: ${dataType.name})" }
maybeItems += otherSideSession.sendAndReceive<List<W>>(Request.Data(NonEmptySet.copyOf(toFetch), dataType))
.unwrap { it }
logger.trace { "[Batch fetch]: otherSideSession.sendAndReceive Done: count= ${maybeItems.size})" }
}
// Check for a buggy/malicious peer answering with something that we didn't ask for.
val downloaded = validateFetchResponse(UntrustworthyData(maybeItems), toFetch)
logger.debug { "Fetched ${downloaded.size} elements from ${otherSideSession.counterparty.name}" }
logger.trace { "Fetched ${downloaded.size} elements from ${otherSideSession.counterparty.name}, maybeItems.size = ${maybeItems.size}" }
maybeWriteToDisk(downloaded)
// Re-load items already present before the download procedure. This ensures these objects are not unnecessarily checkpointed.
val loadedFromDisk = loadExpected(fromDisk)
Result(loadedFromDisk, downloaded)
@ -110,9 +137,9 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
// Do nothing by default.
}
private fun loadWhatWeHave(): Pair<List<SecureHash>, List<SecureHash>> {
private fun loadWhatWeHave(): Pair<List<SecureHash>, Set<SecureHash>> {
val fromDisk = ArrayList<SecureHash>()
val toFetch = ArrayList<SecureHash>()
val toFetch = LinkedHashSet<SecureHash>()
for (txid in requests) {
val stx = load(txid)
if (stx == null)
@ -137,18 +164,52 @@ sealed class FetchDataFlow<T : NamedByHash, in W : Any>(
protected open fun convert(wire: W): T = uncheckedCast(wire)
@Suppress("ComplexMethod")
private fun validateFetchResponse(maybeItems: UntrustworthyData<ArrayList<W>>,
requests: List<SecureHash>): List<T> {
requests: Set<SecureHash>): List<T> {
return maybeItems.unwrap { response ->
if (response.size != requests.size)
throw DownloadedVsRequestedSizeMismatch(requests.size, response.size)
logger.trace { "validateFetchResponse(): Response size = ${response.size}, Request size = ${requests.size}" }
if (response.size != requests.size) {
logger.trace { "maybeItems.unwrap: RespType Response.size (${requests.size}) != requests.size (${response.size})" }
throw FetchDataFlow.DownloadedVsRequestedSizeMismatch(requests.size, response.size)
}
if (logger.isTraceEnabled()) {
logger.trace { "Request size = ${requests.size}" }
for ((reqInd, req) in requests.withIndex()) {
logger.trace { "Requested[$reqInd] = '$req'" }
}
}
val answers = response.map { convert(it) }
if (logger.isTraceEnabled()) {
logger.trace { "Answers size = ${answers.size}" }
for ((respInd, item) in answers.withIndex()) {
if (item is MaybeSerializedSignedTransaction) {
logger.trace { "ValidateItem[$respInd]: '${item.id}': Type = MaybeSerializedSignedTransaction: ${item.payloadContentDescription()}" }
} else {
logger.trace("ValidateItem[$respInd]: Type = ${item.javaClass.name}")
}
}
}
// Check transactions actually hash to what we requested, if this fails the remote node
// is a malicious flow violator or buggy.
for ((index, item) in answers.withIndex()) {
if (item.id != requests[index])
throw DownloadedVsRequestedDataMismatch(requests[index], item.id)
var badDataIndex = -1
var badDataId: SecureHash? = null
for ((index, item) in requests.withIndex()) {
if (item != answers[index].id) {
badDataIndex = index
badDataId = item
logger.info("Will Throw on DownloadedVsRequestedDataMismatch(Req item = '$item', Resp item = '${answers[index].id}'")
}
}
if (badDataIndex >= 0 && badDataId != null) {
logger.error("Throwing DownloadedVsRequestedDataMismatch due to bad verification on: ID = $badDataId, Answer[$badDataIndex]='${answers[badDataIndex].id}'")
throw DownloadedVsRequestedDataMismatch(badDataId, answers[badDataIndex].id)
}
answers
}
}
@ -212,13 +273,27 @@ class FetchTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowSession) :
override fun load(txid: SecureHash): SignedTransaction? = serviceHub.validatedTransactions.getTransaction(txid)
}
class FetchBatchTransactionsFlow(requests: Set<SecureHash>, otherSide: FlowSession) :
FetchDataFlow<MaybeSerializedSignedTransaction, MaybeSerializedSignedTransaction>(requests, otherSide, DataType.BATCH_TRANSACTION) {
override fun load(txid: SecureHash): MaybeSerializedSignedTransaction? {
val tran = serviceHub.validatedTransactions.getTransaction(txid)
return if (tran == null) {
null
} else {
MaybeSerializedSignedTransaction(txid, null, tran)
}
}
}
/**
* Given a set of hashes either loads from local network parameters storage or requests them from the other peer. Downloaded
* network parameters are saved to local parameters storage automatically. This flow can be used only if the minimumPlatformVersion is >= 4.
* Nodes on lower versions won't respond to this flow.
*/
class FetchNetworkParametersFlow(requests: Set<SecureHash>,
otherSide: FlowSession) : FetchDataFlow<SignedDataWithCert<NetworkParameters>, SignedDataWithCert<NetworkParameters>>(requests, otherSide, DataType.PARAMETERS) {
otherSide: FlowSession) : FetchDataFlow<SignedDataWithCert<NetworkParameters>,
SignedDataWithCert<NetworkParameters>>(requests, otherSide, DataType.PARAMETERS) {
override fun load(txid: SecureHash): SignedDataWithCert<NetworkParameters>? {
return (serviceHub.networkParametersService as NetworkParametersStorage).lookupSigned(txid)
}
@ -235,4 +310,4 @@ class FetchNetworkParametersFlow(requests: Set<SecureHash>,
}
}
}
}
}

View File

@ -9,6 +9,8 @@ import net.corda.core.node.StatesToRecord
import net.corda.core.transactions.ContractUpgradeWireTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.debug
import net.corda.core.utilities.trace
/**
* Resolves transactions for the specified [txHashes] along with their full history (dependency graph) from [otherSide].
@ -36,15 +38,19 @@ class ResolveTransactionsFlow private constructor(
private var fetchNetParamsFromCounterpart = false
@Suppress("MagicNumber")
@Suspendable
override fun call() {
// TODO This error should actually cause the flow to be sent to the flow hospital to be retried
val counterpartyPlatformVersion = checkNotNull(serviceHub.networkMapCache.getNodeByLegalIdentity(otherSide.counterparty)?.platformVersion) {
"Couldn't retrieve party's ${otherSide.counterparty} platform version from NetworkMapCache"
}
// Fetch missing parameters flow was added in version 4. This check is needed so we don't end up with node V4 sending parameters
// request to node V3 that doesn't know about this protocol.
fetchNetParamsFromCounterpart = counterpartyPlatformVersion >= 4
val batchMode = counterpartyPlatformVersion >= 6
logger.debug { "ResolveTransactionsFlow.call(): Otherside Platform Version = '$counterpartyPlatformVersion': Batch mode = $batchMode" }
if (initialTx != null) {
fetchMissingAttachments(initialTx)
@ -52,8 +58,9 @@ class ResolveTransactionsFlow private constructor(
}
val resolver = (serviceHub as ServiceHubCoreInternal).createTransactionsResolver(this)
resolver.downloadDependencies()
resolver.downloadDependencies(batchMode)
logger.trace { "ResolveTransactionsFlow: Sending END." }
otherSide.send(FetchDataFlow.Request.End) // Finish fetching data.
// If transaction resolution is performed for a transaction where some states are relevant, then those should be

View File

@ -19,7 +19,7 @@ interface ServiceHubCoreInternal : ServiceHub {
interface TransactionsResolver {
@Suspendable
fun downloadDependencies()
fun downloadDependencies(batchMode: Boolean)
fun recordDependencies(usedStatesToRecord: StatesToRecord)
}

View File

@ -10,6 +10,7 @@ import net.corda.core.internal.dependencies
import net.corda.core.node.StatesToRecord
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.debug
import net.corda.core.utilities.trace
import net.corda.core.utilities.seconds
import net.corda.node.services.api.WritableTransactionStorage
import java.util.*
@ -19,7 +20,7 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
private val logger = flow.logger
@Suspendable
override fun downloadDependencies() {
override fun downloadDependencies(batchMode: Boolean) {
logger.debug { "Downloading dependencies for transactions ${flow.txHashes}" }
val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage
@ -39,10 +40,12 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
// the db contain the identities that were resolved when the transaction was first checked, or should we
// accept this kind of change is possible? Most likely solution is for identity data to be an attachment.
val nextRequests = LinkedHashSet<SecureHash>(flow.txHashes) // Keep things unique but ordered, for unit test stability.
val nextRequests = LinkedHashSet<SecureHash>(flow.txHashes) // Keep things unique but ordered, for unit test stability.
val topologicalSort = TopologicalSort()
logger.debug { "DbTransactionsResolver.downloadDependencies(batchMode=$batchMode)" }
while (nextRequests.isNotEmpty()) {
logger.debug { "Main fetch loop: size_remaining=${nextRequests.size}" }
// Don't re-download the same tx when we haven't verified it yet but it's referenced multiple times in the
// graph we're traversing.
nextRequests.removeAll(topologicalSort.transactionIds)
@ -76,8 +79,8 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
nextRequests.addAll(dependencies)
}
// If the flow did not suspend on the last iteration of the downloaded loop above, perform a suspend here to ensure no write
// locks are held going into the next while loop iteration.
// If the flow did not suspend on the last iteration of the downloaded loop above, perform a suspend here to ensure that
// all data is flushed to the database.
if (!suspended) {
FlowLogic.sleep(0.seconds)
}
@ -93,7 +96,7 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
override fun recordDependencies(usedStatesToRecord: StatesToRecord) {
val sortedDependencies = checkNotNull(this.sortedDependencies)
logger.debug { "Recording ${sortedDependencies.size} dependencies for ${flow.txHashes.size} transactions" }
logger.trace { "Recording ${sortedDependencies.size} dependencies for ${flow.txHashes.size} transactions" }
val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage
for (txId in sortedDependencies) {
// Retrieve and delete the transaction from the unverified store.