mirror of
https://github.com/corda/corda.git
synced 2025-06-23 01:19:00 +00:00
ENT-10009 Enhance SendTransactionFlow to allow sending a txn to multiple sessions. (#7393)
This commit is contained in:
@ -6,7 +6,6 @@ import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.crypto.isFulfilledBy
|
||||
import net.corda.core.flows.NotarySigCheck.needsNotarySignature
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.identity.groupAbstractPartyByWellKnownParty
|
||||
import net.corda.core.internal.FetchDataFlow
|
||||
@ -16,7 +15,6 @@ import net.corda.core.internal.pushToLoggingContext
|
||||
import net.corda.core.internal.telemetry.telemetryServiceInternal
|
||||
import net.corda.core.internal.warnOnce
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.node.StatesToRecord.ALL_VISIBLE
|
||||
import net.corda.core.node.StatesToRecord.ONLY_RELEVANT
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
@ -169,7 +167,6 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
|
||||
}
|
||||
|
||||
private lateinit var externalTxParticipants: Set<Party>
|
||||
private lateinit var txnMetadata: TransactionMetadata
|
||||
|
||||
@Suspendable
|
||||
@Suppress("ComplexMethod", "NestedBlockDepth")
|
||||
@ -221,8 +218,7 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
|
||||
|
||||
val requiresNotarisation = needsNotarySignature(transaction)
|
||||
val useTwoPhaseFinality = serviceHub.myInfo.platformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY
|
||||
txnMetadata = TransactionMetadata(serviceHub.myInfo.legalIdentities.first().name,
|
||||
DistributionList(statesToRecord, deriveStatesToRecord(newPlatformSessions)))
|
||||
|
||||
if (useTwoPhaseFinality) {
|
||||
val stxn = if (requiresNotarisation) {
|
||||
recordLocallyAndBroadcast(newPlatformSessions, transaction)
|
||||
@ -283,29 +279,24 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
|
||||
@Suspendable
|
||||
private fun broadcast(sessions: Collection<FlowSession>, tx: SignedTransaction) {
|
||||
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#broadcast", flowLogic = this) {
|
||||
sessions.forEach { session ->
|
||||
try {
|
||||
logger.debug { "Sending transaction to party $session." }
|
||||
subFlow(SendTransactionFlow(session, tx, txnMetadata))
|
||||
txnMetadata = txnMetadata.copy(persist = false)
|
||||
} catch (e: UnexpectedFlowEndException) {
|
||||
throw UnexpectedFlowEndException(
|
||||
"${session.counterparty} has finished prematurely and we're trying to send them a transaction." +
|
||||
"Did they forget to call ReceiveFinalityFlow? (${e.message})",
|
||||
e.cause,
|
||||
e.originalErrorId
|
||||
)
|
||||
}
|
||||
try {
|
||||
logger.debug { "Sending transaction to party sessions: $sessions." }
|
||||
val (participantSessions, observerSessions) = deriveSessions(sessions)
|
||||
subFlow(SendTransactionFlow(tx, participantSessions, observerSessions, statesToRecord))
|
||||
} catch (e: UnexpectedFlowEndException) {
|
||||
throw UnexpectedFlowEndException(
|
||||
"One of the sessions ${sessions.map { it.counterparty }} has finished prematurely and we're trying to send them a transaction." +
|
||||
"Did they forget to call ReceiveFinalityFlow? (${e.message})",
|
||||
e.cause,
|
||||
e.originalErrorId
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun deriveStatesToRecord(newPlatformSessions: Collection<FlowSession>): Map<CordaX500Name, StatesToRecord> {
|
||||
val derivedObserverSessions = newPlatformSessions.map { it.counterparty }.toSet() - externalTxParticipants
|
||||
val txParticipantSessions = externalTxParticipants
|
||||
return txParticipantSessions.map { it.name to ONLY_RELEVANT }.toMap() +
|
||||
(derivedObserverSessions + observerSessions.map { it.counterparty }).map { it.name to ALL_VISIBLE }
|
||||
}
|
||||
private fun deriveSessions(newPlatformSessions: Collection<FlowSession>) =
|
||||
Pair(newPlatformSessions.filter { it.counterparty in externalTxParticipants }.toSet(),
|
||||
(observerSessions + newPlatformSessions.filter { it.counterparty !in externalTxParticipants }).toSet())
|
||||
|
||||
@Suspendable
|
||||
private fun broadcastSignaturesAndFinalise(sessions: Collection<FlowSession>, notarySignatures: List<TransactionSignature>) {
|
||||
@ -373,19 +364,16 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
|
||||
logger.info("Broadcasting complete transaction to other participants.")
|
||||
if (newApi) {
|
||||
oldV3Broadcast(tx, oldParticipants.toSet())
|
||||
for (session in sessions) {
|
||||
try {
|
||||
logger.debug { "Sending transaction to party $session." }
|
||||
subFlow(SendTransactionFlow(session, tx, txnMetadata))
|
||||
txnMetadata = txnMetadata.copy(persist = false)
|
||||
} catch (e: UnexpectedFlowEndException) {
|
||||
throw UnexpectedFlowEndException(
|
||||
"${session.counterparty} has finished prematurely and we're trying to send them the finalised transaction. " +
|
||||
"Did they forget to call ReceiveFinalityFlow? (${e.message})",
|
||||
e.cause,
|
||||
e.originalErrorId
|
||||
)
|
||||
}
|
||||
try {
|
||||
logger.debug { "Sending transaction to party sessions $sessions." }
|
||||
subFlow(SendTransactionFlow(tx, sessions.toSet(), emptySet(), statesToRecord))
|
||||
} catch (e: UnexpectedFlowEndException) {
|
||||
throw UnexpectedFlowEndException(
|
||||
"One of the sessions ${sessions.map { it.counterparty }} has finished prematurely and we're trying to send them the finalised transaction. " +
|
||||
"Did they forget to call ReceiveFinalityFlow? (${e.message})",
|
||||
e.cause,
|
||||
e.originalErrorId
|
||||
)
|
||||
}
|
||||
} else {
|
||||
oldV3Broadcast(tx, (externalTxParticipants + oldParticipants).toSet())
|
||||
@ -396,15 +384,11 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
|
||||
|
||||
@Suspendable
|
||||
private fun oldV3Broadcast(notarised: SignedTransaction, recipients: Set<Party>) {
|
||||
for (recipient in recipients) {
|
||||
if (!serviceHub.myInfo.isLegalIdentity(recipient)) {
|
||||
logger.debug { "Sending transaction to party $recipient." }
|
||||
val session = initiateFlow(recipient)
|
||||
subFlow(SendTransactionFlow(session, notarised, txnMetadata))
|
||||
txnMetadata = txnMetadata.copy(persist = false)
|
||||
logger.info("Party $recipient received the transaction.")
|
||||
}
|
||||
}
|
||||
val remoteRecipients = recipients.filter { !serviceHub.myInfo.isLegalIdentity(it) }
|
||||
logger.debug { "Sending transaction to parties $remoteRecipients." }
|
||||
val sessions = remoteRecipients.map { initiateFlow(it) }.toSet()
|
||||
subFlow(SendTransactionFlow(notarised, sessions, emptySet(), statesToRecord))
|
||||
logger.info("Parties $remoteRecipients received the transaction.")
|
||||
}
|
||||
|
||||
private fun logCommandData() {
|
||||
|
@ -23,9 +23,8 @@ data class FlowTransactionInfo(
|
||||
|
||||
@CordaSerializable
|
||||
data class TransactionMetadata(
|
||||
val initiator: CordaX500Name,
|
||||
val distributionList: DistributionList,
|
||||
val persist: Boolean = true // hint to persist to transactional store
|
||||
val initiator: CordaX500Name,
|
||||
val distributionList: DistributionList
|
||||
)
|
||||
|
||||
@CordaSerializable
|
||||
|
@ -74,14 +74,22 @@ class MaybeSerializedSignedTransaction(override val id: SecureHash, val serializ
|
||||
* the right point in the conversation to receive the sent transaction and perform the resolution back-and-forth required
|
||||
* to check the dependencies and download any missing attachments.
|
||||
*
|
||||
* @param otherSide the target party.
|
||||
* @param stx the [SignedTransaction] being sent to the [otherSideSession].
|
||||
* @property txnMetadata transaction recovery metadata (eg. used by Two Phase Finality).
|
||||
* @param stx the [SignedTransaction] being sent to the [otherSessions].
|
||||
* @param participantSessions the target parties which are participants to the transaction.
|
||||
* @param observerSessions the target parties which are observers to the transaction.
|
||||
* @param senderStatesToRecord the [StatesToRecord] relevancy information of the sender.
|
||||
*/
|
||||
open class SendTransactionFlow(otherSide: FlowSession, stx: SignedTransaction, txnMetadata: TransactionMetadata) : DataVendingFlow(otherSide, stx, txnMetadata) {
|
||||
constructor(otherSide: FlowSession, stx: SignedTransaction) : this(otherSide, stx,
|
||||
TransactionMetadata(DUMMY_PARTICIPANT_NAME, DistributionList(StatesToRecord.NONE, mapOf(otherSide.counterparty.name to StatesToRecord.ALL_VISIBLE))))
|
||||
// Note: DUMMY_PARTICIPANT_NAME to be substituted with actual "ourIdentity.name" in flow call()
|
||||
open class SendTransactionFlow(val stx: SignedTransaction,
|
||||
val participantSessions: Set<FlowSession>,
|
||||
val observerSessions: Set<FlowSession>,
|
||||
val senderStatesToRecord: StatesToRecord) : DataVendingFlow(participantSessions + observerSessions, stx,
|
||||
TransactionMetadata(DUMMY_PARTICIPANT_NAME,
|
||||
DistributionList(senderStatesToRecord,
|
||||
(participantSessions.map { it.counterparty.name to StatesToRecord.ONLY_RELEVANT}).toMap() +
|
||||
(observerSessions.map { it.counterparty.name to StatesToRecord.ALL_VISIBLE}).toMap()
|
||||
))) {
|
||||
constructor(otherSide: FlowSession, stx: SignedTransaction) : this(stx, setOf(otherSide), emptySet(), StatesToRecord.NONE)
|
||||
// Note: DUMMY_PARTICIPANT_NAME to be substituted with actual "ourIdentity.name" in flow call()
|
||||
companion object {
|
||||
val DUMMY_PARTICIPANT_NAME = CordaX500Name("Transaction Participant", "London", "GB")
|
||||
}
|
||||
@ -98,7 +106,8 @@ open class SendTransactionFlow(otherSide: FlowSession, stx: SignedTransaction, t
|
||||
*/
|
||||
open class SendStateAndRefFlow(otherSideSession: FlowSession, stateAndRefs: List<StateAndRef<*>>) : DataVendingFlow(otherSideSession, stateAndRefs)
|
||||
|
||||
open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any, val txnMetadata: TransactionMetadata? = null) : FlowLogic<Void?>() {
|
||||
open class DataVendingFlow(val otherSessions: Set<FlowSession>, val payload: Any, private val txnMetadata: TransactionMetadata? = null) : FlowLogic<Void?>() {
|
||||
constructor(otherSideSession: FlowSession, payload: Any, txnMetadata: TransactionMetadata? = null) : this(setOf(otherSideSession), payload, txnMetadata)
|
||||
constructor(otherSideSession: FlowSession, payload: Any) : this(otherSideSession, payload, null)
|
||||
|
||||
@Suspendable
|
||||
@ -109,7 +118,7 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any,
|
||||
// User can override this method to perform custom request verification.
|
||||
}
|
||||
|
||||
@Suppress("ComplexCondition", "ComplexMethod")
|
||||
@Suppress("ComplexCondition", "ComplexMethod", "LongMethod")
|
||||
@Suspendable
|
||||
override fun call(): Void? {
|
||||
val networkMaxMessageSize = serviceHub.networkParameters.maxMessageSize
|
||||
@ -140,115 +149,126 @@ open class DataVendingFlow(val otherSideSession: FlowSession, val payload: Any,
|
||||
|
||||
// store and share transaction recovery metadata if required
|
||||
val useTwoPhaseFinality = serviceHub.myInfo.platformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY
|
||||
val toTwoPhaseFinalityNode = serviceHub.networkMapCache.getNodeByLegalIdentity(otherSideSession.counterparty)?.platformVersion!! >= PlatformVersionSwitches.TWO_PHASE_FINALITY
|
||||
if (txnMetadata != null && toTwoPhaseFinalityNode && useTwoPhaseFinality && payload is SignedTransaction) {
|
||||
val encryptedDistributionList = (serviceHub as ServiceHubCoreInternal).recordSenderTransactionRecoveryMetadata(payload.id, txnMetadata.copy(initiator = ourIdentity.name))
|
||||
payload = SignedTransactionWithDistributionList(payload, encryptedDistributionList!!)
|
||||
val toTwoPhaseFinalityNode = otherSessions.any { otherSideSession ->
|
||||
serviceHub.networkMapCache.getNodeByLegalIdentity(otherSideSession.counterparty)?.platformVersion!! >= PlatformVersionSwitches.TWO_PHASE_FINALITY
|
||||
}
|
||||
// record transaction recovery metadata once
|
||||
val payloadWithMetadata =
|
||||
if (txnMetadata != null && toTwoPhaseFinalityNode && useTwoPhaseFinality && payload is SignedTransaction) {
|
||||
val encryptedDistributionList = (serviceHub as ServiceHubCoreInternal).recordSenderTransactionRecoveryMetadata(payload.id, txnMetadata.copy(initiator = ourIdentity.name))
|
||||
SignedTransactionWithDistributionList(payload, encryptedDistributionList!!)
|
||||
} else null
|
||||
|
||||
// This loop will receive [FetchDataFlow.Request] continuously until the `otherSideSession` has all the data they need
|
||||
// to resolve the transaction, a [FetchDataFlow.EndRequest] will be sent from the `otherSideSession` to indicate end of
|
||||
// data request.
|
||||
var loopCount = 0
|
||||
while (true) {
|
||||
val loopCnt = loopCount++
|
||||
logger.trace { "DataVendingFlow: Main While [$loopCnt]..." }
|
||||
val dataRequest = sendPayloadAndReceiveDataRequest(otherSideSession, payload).unwrap { request ->
|
||||
logger.trace { "sendPayloadAndReceiveDataRequest(): ${request.javaClass.name}" }
|
||||
when (request) {
|
||||
is FetchDataFlow.Request.Data -> {
|
||||
// Security TODO: Check for abnormally large or malformed data requests
|
||||
verifyDataRequest(request)
|
||||
request
|
||||
}
|
||||
FetchDataFlow.Request.End -> {
|
||||
logger.trace { "DataVendingFlow: END" }
|
||||
return null
|
||||
otherSessions.forEachIndexed { idx, otherSideSession ->
|
||||
if (payloadWithMetadata != null)
|
||||
payload = payloadWithMetadata
|
||||
// This loop will receive [FetchDataFlow.Request] continuously until the `otherSideSession` has all the data they need
|
||||
// to resolve the transaction, a [FetchDataFlow.EndRequest] will be sent from the `otherSideSession` to indicate end of
|
||||
// data request.
|
||||
var loopCount = 0
|
||||
while (true) {
|
||||
val loopCnt = loopCount++
|
||||
logger.trace { "DataVendingFlow: Main While [$loopCnt]..." }
|
||||
val dataRequest = sendPayloadAndReceiveDataRequest(otherSideSession, payload).unwrap { request ->
|
||||
logger.trace { "sendPayloadAndReceiveDataRequest(): ${request.javaClass.name}" }
|
||||
when (request) {
|
||||
is FetchDataFlow.Request.Data -> {
|
||||
// Security TODO: Check for abnormally large or malformed data requests
|
||||
verifyDataRequest(request)
|
||||
request
|
||||
}
|
||||
FetchDataFlow.Request.End -> {
|
||||
logger.trace { "DataVendingFlow: END" }
|
||||
return@forEachIndexed
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.trace { "Sending data (Type = ${dataRequest.dataType.name})" }
|
||||
var totalByteCount = 0
|
||||
var firstItem = true
|
||||
var batchFetchCountExceeded = false
|
||||
var numSent = 0
|
||||
payload = when (dataRequest.dataType) {
|
||||
FetchDataFlow.DataType.TRANSACTION -> dataRequest.hashes.map { txId ->
|
||||
logger.trace { "Sending: TRANSACTION (dataRequest.hashes.size=${dataRequest.hashes.size})" }
|
||||
if (!authorisedTransactions.isAuthorised(txId)) {
|
||||
throw FetchDataFlow.IllegalTransactionRequest(txId)
|
||||
}
|
||||
val tx = serviceHub.validatedTransactions.getTransaction(txId)
|
||||
?: throw FetchDataFlow.HashNotFound(txId)
|
||||
authorisedTransactions.removeAuthorised(tx.id)
|
||||
authorisedTransactions.addAuthorised(getInputTransactions(tx))
|
||||
totalByteCount += tx.txBits.size
|
||||
numSent++
|
||||
tx
|
||||
}
|
||||
// Loop on all items returned using dataRequest.hashes.map:
|
||||
FetchDataFlow.DataType.BATCH_TRANSACTION -> dataRequest.hashes.map { txId ->
|
||||
if (!authorisedTransactions.isAuthorised(txId)) {
|
||||
throw FetchDataFlow.IllegalTransactionRequest(txId)
|
||||
}
|
||||
// Maybe we should not just throw here as it's not recoverable on the client side. Might be better to send a reason code or
|
||||
// remove the restriction on sending once.
|
||||
logger.trace { "Transaction authorised OK: '$txId'" }
|
||||
var serialized: SerializedBytes<SignedTransaction>? = null
|
||||
if (!batchFetchCountExceeded) {
|
||||
// Only fetch and serialize if we have not already exceeded the maximum byte count. Once we have, no more fetching
|
||||
// is required, just reject all additional items.
|
||||
logger.trace { "Sending data (Type = ${dataRequest.dataType.name})" }
|
||||
var totalByteCount = 0
|
||||
var firstItem = true
|
||||
var batchFetchCountExceeded = false
|
||||
var numSent = 0
|
||||
payload = when (dataRequest.dataType) {
|
||||
FetchDataFlow.DataType.TRANSACTION -> dataRequest.hashes.map { txId ->
|
||||
logger.trace { "Sending: TRANSACTION (dataRequest.hashes.size=${dataRequest.hashes.size})" }
|
||||
if (!authorisedTransactions.isAuthorised(txId)) {
|
||||
throw FetchDataFlow.IllegalTransactionRequest(txId)
|
||||
}
|
||||
val tx = serviceHub.validatedTransactions.getTransaction(txId)
|
||||
?: throw FetchDataFlow.HashNotFound(txId)
|
||||
logger.trace { "Transaction get OK: '$txId'" }
|
||||
serialized = tx.serialize()
|
||||
|
||||
val itemByteCount = serialized.size
|
||||
logger.trace { "Batch-Send '$txId': first = $firstItem, Total bytes = $totalByteCount, Item byte count = $itemByteCount, Maximum = $maxPayloadSize" }
|
||||
if (firstItem || (totalByteCount + itemByteCount) < maxPayloadSize) {
|
||||
totalByteCount += itemByteCount
|
||||
numSent++
|
||||
// Always include at least one item else if the max is set too low nothing will ever get returned.
|
||||
// Splitting items will be a separate Jira if need be
|
||||
if (idx == otherSessions.size - 1)
|
||||
authorisedTransactions.removeAuthorised(tx.id)
|
||||
authorisedTransactions.addAuthorised(getInputTransactions(tx))
|
||||
logger.trace { "Adding item to return set: '$txId'" }
|
||||
} else {
|
||||
logger.trace { "Fetch block size EXCEEDED at '$txId'." }
|
||||
batchFetchCountExceeded = true
|
||||
}
|
||||
} // end
|
||||
|
||||
if (batchFetchCountExceeded) {
|
||||
logger.trace { "Excluding '$txId' from return set due to exceeded count." }
|
||||
authorisedTransactions.addAuthorised(getInputTransactions(tx))
|
||||
totalByteCount += tx.txBits.size
|
||||
numSent++
|
||||
tx
|
||||
}
|
||||
// Loop on all items returned using dataRequest.hashes.map:
|
||||
FetchDataFlow.DataType.BATCH_TRANSACTION -> dataRequest.hashes.map { txId ->
|
||||
if (!authorisedTransactions.isAuthorised(txId)) {
|
||||
throw FetchDataFlow.IllegalTransactionRequest(txId)
|
||||
}
|
||||
// Maybe we should not just throw here as it's not recoverable on the client side. Might be better to send a reason code or
|
||||
// remove the restriction on sending once.
|
||||
logger.trace { "Transaction authorised OK: '$txId'" }
|
||||
var serialized: SerializedBytes<SignedTransaction>? = null
|
||||
if (!batchFetchCountExceeded) {
|
||||
// Only fetch and serialize if we have not already exceeded the maximum byte count. Once we have, no more fetching
|
||||
// is required, just reject all additional items.
|
||||
val tx = serviceHub.validatedTransactions.getTransaction(txId)
|
||||
?: throw FetchDataFlow.HashNotFound(txId)
|
||||
logger.trace { "Transaction get OK: '$txId'" }
|
||||
serialized = tx.serialize()
|
||||
|
||||
// Send null if limit is exceeded
|
||||
val maybeserialized = MaybeSerializedSignedTransaction(txId, if (batchFetchCountExceeded) {
|
||||
null
|
||||
} else {
|
||||
serialized
|
||||
}, null)
|
||||
firstItem = false
|
||||
maybeserialized
|
||||
} // Batch response loop end
|
||||
FetchDataFlow.DataType.ATTACHMENT -> dataRequest.hashes.map {
|
||||
logger.trace { "Sending: Attachments for '$it'" }
|
||||
serviceHub.attachments.openAttachment(it)?.open()?.readFully()
|
||||
?: throw FetchDataFlow.HashNotFound(it)
|
||||
}
|
||||
FetchDataFlow.DataType.PARAMETERS -> dataRequest.hashes.map {
|
||||
logger.trace { "Sending: Parameters for '$it'" }
|
||||
(serviceHub.networkParametersService as NetworkParametersStorage).lookupSigned(it)
|
||||
?: throw FetchDataFlow.MissingNetworkParameters(it)
|
||||
}
|
||||
FetchDataFlow.DataType.UNKNOWN -> dataRequest.hashes.map {
|
||||
logger.warn("Message from from a future version of Corda with UNKNOWN enum value for FetchDataFlow.DataType: ID='$it'")
|
||||
val itemByteCount = serialized.size
|
||||
logger.trace { "Batch-Send '$txId': first = $firstItem, Total bytes = $totalByteCount, Item byte count = $itemByteCount, Maximum = $maxPayloadSize" }
|
||||
if (firstItem || (totalByteCount + itemByteCount) < maxPayloadSize) {
|
||||
totalByteCount += itemByteCount
|
||||
numSent++
|
||||
// Always include at least one item else if the max is set too low nothing will ever get returned.
|
||||
// Splitting items will be a separate Jira if need be
|
||||
if (idx == otherSessions.size - 1)
|
||||
authorisedTransactions.removeAuthorised(tx.id)
|
||||
authorisedTransactions.addAuthorised(getInputTransactions(tx))
|
||||
logger.trace { "Adding item to return set: '$txId'" }
|
||||
} else {
|
||||
logger.trace { "Fetch block size EXCEEDED at '$txId'." }
|
||||
batchFetchCountExceeded = true
|
||||
}
|
||||
} // end
|
||||
|
||||
if (batchFetchCountExceeded) {
|
||||
logger.trace { "Excluding '$txId' from return set due to exceeded count." }
|
||||
}
|
||||
|
||||
// Send null if limit is exceeded
|
||||
val maybeserialized = MaybeSerializedSignedTransaction(txId, if (batchFetchCountExceeded) {
|
||||
null
|
||||
} else {
|
||||
serialized
|
||||
}, null)
|
||||
firstItem = false
|
||||
maybeserialized
|
||||
} // Batch response loop end
|
||||
FetchDataFlow.DataType.ATTACHMENT -> dataRequest.hashes.map {
|
||||
logger.trace { "Sending: Attachments for '$it'" }
|
||||
serviceHub.attachments.openAttachment(it)?.open()?.readFully()
|
||||
?: throw FetchDataFlow.HashNotFound(it)
|
||||
}
|
||||
FetchDataFlow.DataType.PARAMETERS -> dataRequest.hashes.map {
|
||||
logger.trace { "Sending: Parameters for '$it'" }
|
||||
(serviceHub.networkParametersService as NetworkParametersStorage).lookupSigned(it)
|
||||
?: throw FetchDataFlow.MissingNetworkParameters(it)
|
||||
}
|
||||
FetchDataFlow.DataType.UNKNOWN -> dataRequest.hashes.map {
|
||||
logger.warn("Message from from a future version of Corda with UNKNOWN enum value for FetchDataFlow.DataType: ID='$it'")
|
||||
}
|
||||
}
|
||||
logger.trace { "Block total size = $totalByteCount: Num Items = ($numSent of ${dataRequest.hashes.size} total)" }
|
||||
}
|
||||
logger.trace { "Block total size = $totalByteCount: Num Items = ($numSent of ${dataRequest.hashes.size} total)" }
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
|
Reference in New Issue
Block a user