mirror of
https://github.com/corda/corda.git
synced 2025-05-25 19:54:25 +00:00
Fix support for large attachments by de-batching tx/attachment fetch. This is a workaround until the upstream Artemis large message streaming bugs are fixed.
This commit is contained in:
parent
e83deb78ec
commit
a56540a3d6
@ -15,8 +15,7 @@ import net.corda.core.serialization.SerializeAsTokenContext
|
|||||||
*/
|
*/
|
||||||
@InitiatingFlow
|
@InitiatingFlow
|
||||||
class FetchAttachmentsFlow(requests: Set<SecureHash>,
|
class FetchAttachmentsFlow(requests: Set<SecureHash>,
|
||||||
otherSide: Party) : FetchDataFlow<Attachment, ByteArray>(requests, otherSide) {
|
otherSide: Party) : FetchDataFlow<Attachment, ByteArray>(requests, otherSide, ByteArray::class.java) {
|
||||||
|
|
||||||
override fun load(txid: SecureHash): Attachment? = serviceHub.attachments.openAttachment(txid)
|
override fun load(txid: SecureHash): Attachment? = serviceHub.attachments.openAttachment(txid)
|
||||||
|
|
||||||
override fun convert(wire: ByteArray): Attachment = FetchedAttachment({ wire })
|
override fun convert(wire: ByteArray): Attachment = FetchedAttachment({ wire })
|
||||||
|
@ -27,9 +27,10 @@ import java.util.*
|
|||||||
* @param T The ultimate type of the data being fetched.
|
* @param T The ultimate type of the data being fetched.
|
||||||
* @param W The wire type of the data being fetched, for when it isn't the same as the ultimate type.
|
* @param W The wire type of the data being fetched, for when it isn't the same as the ultimate type.
|
||||||
*/
|
*/
|
||||||
abstract class FetchDataFlow<T : NamedByHash, in W : Any>(
|
abstract class FetchDataFlow<T : NamedByHash, W : Any>(
|
||||||
protected val requests: Set<SecureHash>,
|
protected val requests: Set<SecureHash>,
|
||||||
protected val otherSide: Party) : FlowLogic<FetchDataFlow.Result<T>>() {
|
protected val otherSide: Party,
|
||||||
|
protected val wrapperType: Class<W>) : FlowLogic<FetchDataFlow.Result<T>>() {
|
||||||
|
|
||||||
@CordaSerializable
|
@CordaSerializable
|
||||||
class DownloadedVsRequestedDataMismatch(val requested: SecureHash, val got: SecureHash) : IllegalArgumentException()
|
class DownloadedVsRequestedDataMismatch(val requested: SecureHash, val got: SecureHash) : IllegalArgumentException()
|
||||||
@ -54,12 +55,25 @@ abstract class FetchDataFlow<T : NamedByHash, in W : Any>(
|
|||||||
return if (toFetch.isEmpty()) {
|
return if (toFetch.isEmpty()) {
|
||||||
Result(fromDisk, emptyList())
|
Result(fromDisk, emptyList())
|
||||||
} else {
|
} else {
|
||||||
logger.trace("Requesting ${toFetch.size} dependency(s) for verification")
|
logger.info("Requesting ${toFetch.size} dependency(s) for verification from ${otherSide.name}")
|
||||||
|
|
||||||
// TODO: Support "large message" response streaming so response sizes are not limited by RAM.
|
// TODO: Support "large message" response streaming so response sizes are not limited by RAM.
|
||||||
val maybeItems = sendAndReceive<ArrayList<W>>(otherSide, Request(toFetch))
|
// We can then switch to requesting items in large batches to minimise the latency penalty.
|
||||||
|
// This is blocked by bugs ARTEMIS-1278 and ARTEMIS-1279. For now we limit attachments and txns to 10mb each
|
||||||
|
// and don't request items in batch, which is a performance loss, but works around the issue. We have
|
||||||
|
// configured Artemis to not fragment messages up to 10mb so we can send 10mb messages without problems.
|
||||||
|
// Above that, we start losing authentication data on the message fragments and take exceptions in the
|
||||||
|
// network layer.
|
||||||
|
val maybeItems = ArrayList<W>(toFetch.size)
|
||||||
|
send(otherSide, Request(toFetch))
|
||||||
|
for (hash in toFetch) {
|
||||||
|
// We skip the validation here (with unwrap { it }) because we will do it below in validateFetchResponse.
|
||||||
|
// The only thing checked is the object type. It is a protocol violation to send results out of order.
|
||||||
|
maybeItems += receive(wrapperType, otherSide).unwrap { it }
|
||||||
|
}
|
||||||
// Check for a buggy/malicious peer answering with something that we didn't ask for.
|
// Check for a buggy/malicious peer answering with something that we didn't ask for.
|
||||||
val downloaded = validateFetchResponse(maybeItems, toFetch)
|
val downloaded = validateFetchResponse(UntrustworthyData(maybeItems), toFetch)
|
||||||
|
logger.info("Fetched ${downloaded.size} elements from ${otherSide.name}")
|
||||||
maybeWriteToDisk(downloaded)
|
maybeWriteToDisk(downloaded)
|
||||||
Result(fromDisk, downloaded)
|
Result(fromDisk, downloaded)
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,7 @@ import net.corda.core.transactions.SignedTransaction
|
|||||||
*/
|
*/
|
||||||
@InitiatingFlow
|
@InitiatingFlow
|
||||||
class FetchTransactionsFlow(requests: Set<SecureHash>, otherSide: Party) :
|
class FetchTransactionsFlow(requests: Set<SecureHash>, otherSide: Party) :
|
||||||
FetchDataFlow<SignedTransaction, SignedTransaction>(requests, otherSide) {
|
FetchDataFlow<SignedTransaction, SignedTransaction>(requests, otherSide, SignedTransaction::class.java) {
|
||||||
|
|
||||||
override fun load(txid: SecureHash): SignedTransaction? = serviceHub.validatedTransactions.getTransaction(txid)
|
override fun load(txid: SecureHash): SignedTransaction? = serviceHub.validatedTransactions.getTransaction(txid)
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,73 @@
|
|||||||
|
package net.corda.node.services.statemachine
|
||||||
|
|
||||||
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
|
import net.corda.core.InputStreamAndHash
|
||||||
|
import net.corda.core.crypto.SecureHash
|
||||||
|
import net.corda.core.flows.*
|
||||||
|
import net.corda.core.identity.Party
|
||||||
|
import net.corda.core.messaging.startFlow
|
||||||
|
import net.corda.core.transactions.SignedTransaction
|
||||||
|
import net.corda.core.transactions.TransactionBuilder
|
||||||
|
import net.corda.core.utilities.unwrap
|
||||||
|
import net.corda.testing.BOB
|
||||||
|
import net.corda.testing.DUMMY_NOTARY
|
||||||
|
import net.corda.testing.aliceBobAndNotary
|
||||||
|
import net.corda.testing.contracts.DummyState
|
||||||
|
import net.corda.testing.driver.driver
|
||||||
|
import org.junit.Test
|
||||||
|
import kotlin.test.assertEquals
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check that we can add lots of large attachments to a transaction and that it works OK, e.g. does not hit the
|
||||||
|
* transaction size limit (which should only consider the hashes).
|
||||||
|
*/
|
||||||
|
class LargeTransactionsTest {
|
||||||
|
@StartableByRPC @InitiatingFlow
|
||||||
|
class SendLargeTransactionFlow(val hash1: SecureHash, val hash2: SecureHash, val hash3: SecureHash, val hash4: SecureHash) : FlowLogic<Unit>() {
|
||||||
|
@Suspendable
|
||||||
|
override fun call() {
|
||||||
|
val tx = TransactionBuilder(notary = DUMMY_NOTARY)
|
||||||
|
.addOutputState(DummyState())
|
||||||
|
.addAttachment(hash1)
|
||||||
|
.addAttachment(hash2)
|
||||||
|
.addAttachment(hash3)
|
||||||
|
.addAttachment(hash4)
|
||||||
|
val stx = serviceHub.signInitialTransaction(tx, serviceHub.legalIdentityKey)
|
||||||
|
// Send to the other side and wait for it to trigger resolution from us.
|
||||||
|
sendAndReceive<Unit>(serviceHub.networkMapCache.getNodeByLegalName(BOB.name)!!.legalIdentity, stx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@InitiatedBy(SendLargeTransactionFlow::class) @Suppress("UNUSED")
|
||||||
|
class ReceiveLargeTransactionFlow(private val counterParty: Party) : FlowLogic<Unit>() {
|
||||||
|
@Suspendable
|
||||||
|
override fun call() {
|
||||||
|
val stx = receive<SignedTransaction>(counterParty).unwrap { it }
|
||||||
|
subFlow(ResolveTransactionsFlow(stx, counterParty))
|
||||||
|
// Unblock the other side by sending some dummy object (Unit is fine here as it's a singleton).
|
||||||
|
send(counterParty, Unit)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun checkCanSendLargeTransactions() {
|
||||||
|
// These 4 attachments yield a transaction that's got >10mb attached, so it'd push us over the Artemis
|
||||||
|
// max message size.
|
||||||
|
val bigFile1 = InputStreamAndHash.createInMemoryTestZip(1024*1024*3, 0)
|
||||||
|
val bigFile2 = InputStreamAndHash.createInMemoryTestZip(1024*1024*3, 1)
|
||||||
|
val bigFile3 = InputStreamAndHash.createInMemoryTestZip(1024*1024*3, 2)
|
||||||
|
val bigFile4 = InputStreamAndHash.createInMemoryTestZip(1024*1024*3, 3)
|
||||||
|
driver(startNodesInProcess = true) {
|
||||||
|
val (alice, _, _) = aliceBobAndNotary()
|
||||||
|
alice.useRPC {
|
||||||
|
val hash1 = it.uploadAttachment(bigFile1.inputStream)
|
||||||
|
val hash2 = it.uploadAttachment(bigFile2.inputStream)
|
||||||
|
val hash3 = it.uploadAttachment(bigFile3.inputStream)
|
||||||
|
val hash4 = it.uploadAttachment(bigFile4.inputStream)
|
||||||
|
assertEquals(hash1, bigFile1.sha256)
|
||||||
|
// Should not throw any exceptions.
|
||||||
|
it.startFlow(::SendLargeTransactionFlow, hash1, hash2, hash3, hash4).returnValue.get()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -31,7 +31,6 @@ class FetchTransactionsHandler(otherParty: Party) : FetchDataHandler<SignedTrans
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
|
|
||||||
class FetchAttachmentsHandler(otherParty: Party) : FetchDataHandler<ByteArray>(otherParty) {
|
class FetchAttachmentsHandler(otherParty: Party) : FetchDataHandler<ByteArray>(otherParty) {
|
||||||
override fun getData(id: SecureHash): ByteArray? {
|
override fun getData(id: SecureHash): ByteArray? {
|
||||||
return serviceHub.attachments.openAttachment(id)?.open()?.readBytes()
|
return serviceHub.attachments.openAttachment(id)?.open()?.readBytes()
|
||||||
@ -46,10 +45,13 @@ abstract class FetchDataHandler<out T>(val otherParty: Party) : FlowLogic<Unit>(
|
|||||||
if (it.hashes.isEmpty()) throw FlowException("Empty hash list")
|
if (it.hashes.isEmpty()) throw FlowException("Empty hash list")
|
||||||
it
|
it
|
||||||
}
|
}
|
||||||
val response = request.hashes.map {
|
// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
|
||||||
getData(it) ?: throw FetchDataFlow.HashNotFound(it)
|
// See the discussion in FetchDataFlow. We send each item individually here in a separate asynchronous send
|
||||||
|
// call, and the other side picks them up with a straight receive call, because we batching would push us over
|
||||||
|
// the (current) Artemis message size limit.
|
||||||
|
request.hashes.forEach {
|
||||||
|
send(otherParty, getData(it) ?: throw FetchDataFlow.HashNotFound(it))
|
||||||
}
|
}
|
||||||
send(otherParty, response)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract fun getData(id: SecureHash): T?
|
protected abstract fun getData(id: SecureHash): T?
|
||||||
|
Loading…
x
Reference in New Issue
Block a user