diff --git a/core/src/main/kotlin/net/corda/core/flows/FetchAttachmentsFlow.kt b/core/src/main/kotlin/net/corda/core/flows/FetchAttachmentsFlow.kt index 1eaeff6e56..cdf9f632fb 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FetchAttachmentsFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FetchAttachmentsFlow.kt @@ -15,8 +15,7 @@ import net.corda.core.serialization.SerializeAsTokenContext */ @InitiatingFlow class FetchAttachmentsFlow(requests: Set, - otherSide: Party) : FetchDataFlow(requests, otherSide) { - + otherSide: Party) : FetchDataFlow(requests, otherSide, ByteArray::class.java) { override fun load(txid: SecureHash): Attachment? = serviceHub.attachments.openAttachment(txid) override fun convert(wire: ByteArray): Attachment = FetchedAttachment({ wire }) diff --git a/core/src/main/kotlin/net/corda/core/flows/FetchDataFlow.kt b/core/src/main/kotlin/net/corda/core/flows/FetchDataFlow.kt index 28c5fdceb1..bd843fbce0 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FetchDataFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FetchDataFlow.kt @@ -27,9 +27,10 @@ import java.util.* * @param T The ultimate type of the data being fetched. * @param W The wire type of the data being fetched, for when it isn't the same as the ultimate type. */ -abstract class FetchDataFlow( +abstract class FetchDataFlow( protected val requests: Set, - protected val otherSide: Party) : FlowLogic>() { + protected val otherSide: Party, + protected val wrapperType: Class) : FlowLogic>() { @CordaSerializable class DownloadedVsRequestedDataMismatch(val requested: SecureHash, val got: SecureHash) : IllegalArgumentException() @@ -54,12 +55,25 @@ abstract class FetchDataFlow( return if (toFetch.isEmpty()) { Result(fromDisk, emptyList()) } else { - logger.trace("Requesting ${toFetch.size} dependency(s) for verification") + logger.info("Requesting ${toFetch.size} dependency(s) for verification from ${otherSide.name}") // TODO: Support "large message" response streaming so response sizes are not limited by RAM. - val maybeItems = sendAndReceive>(otherSide, Request(toFetch)) + // We can then switch to requesting items in large batches to minimise the latency penalty. + // This is blocked by bugs ARTEMIS-1278 and ARTEMIS-1279. For now we limit attachments and txns to 10mb each + // and don't request items in batch, which is a performance loss, but works around the issue. We have + // configured Artemis to not fragment messages up to 10mb so we can send 10mb messages without problems. + // Above that, we start losing authentication data on the message fragments and take exceptions in the + // network layer. + val maybeItems = ArrayList(toFetch.size) + send(otherSide, Request(toFetch)) + for (hash in toFetch) { + // We skip the validation here (with unwrap { it }) because we will do it below in validateFetchResponse. + // The only thing checked is the object type. It is a protocol violation to send results out of order. + maybeItems += receive(wrapperType, otherSide).unwrap { it } + } // Check for a buggy/malicious peer answering with something that we didn't ask for. - val downloaded = validateFetchResponse(maybeItems, toFetch) + val downloaded = validateFetchResponse(UntrustworthyData(maybeItems), toFetch) + logger.info("Fetched ${downloaded.size} elements from ${otherSide.name}") maybeWriteToDisk(downloaded) Result(fromDisk, downloaded) } diff --git a/core/src/main/kotlin/net/corda/core/flows/FetchTransactionsFlow.kt b/core/src/main/kotlin/net/corda/core/flows/FetchTransactionsFlow.kt index e39e16aa7c..e43a022b7a 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FetchTransactionsFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FetchTransactionsFlow.kt @@ -14,7 +14,7 @@ import net.corda.core.transactions.SignedTransaction */ @InitiatingFlow class FetchTransactionsFlow(requests: Set, otherSide: Party) : - FetchDataFlow(requests, otherSide) { + FetchDataFlow(requests, otherSide, SignedTransaction::class.java) { override fun load(txid: SecureHash): SignedTransaction? = serviceHub.validatedTransactions.getTransaction(txid) } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/LargeTransactionsTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/LargeTransactionsTest.kt new file mode 100644 index 0000000000..a5c059a498 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/LargeTransactionsTest.kt @@ -0,0 +1,73 @@ +package net.corda.node.services.statemachine + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.InputStreamAndHash +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.* +import net.corda.core.identity.Party +import net.corda.core.messaging.startFlow +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.unwrap +import net.corda.testing.BOB +import net.corda.testing.DUMMY_NOTARY +import net.corda.testing.aliceBobAndNotary +import net.corda.testing.contracts.DummyState +import net.corda.testing.driver.driver +import org.junit.Test +import kotlin.test.assertEquals + +/** + * Check that we can add lots of large attachments to a transaction and that it works OK, e.g. does not hit the + * transaction size limit (which should only consider the hashes). + */ +class LargeTransactionsTest { + @StartableByRPC @InitiatingFlow + class SendLargeTransactionFlow(val hash1: SecureHash, val hash2: SecureHash, val hash3: SecureHash, val hash4: SecureHash) : FlowLogic() { + @Suspendable + override fun call() { + val tx = TransactionBuilder(notary = DUMMY_NOTARY) + .addOutputState(DummyState()) + .addAttachment(hash1) + .addAttachment(hash2) + .addAttachment(hash3) + .addAttachment(hash4) + val stx = serviceHub.signInitialTransaction(tx, serviceHub.legalIdentityKey) + // Send to the other side and wait for it to trigger resolution from us. + sendAndReceive(serviceHub.networkMapCache.getNodeByLegalName(BOB.name)!!.legalIdentity, stx) + } + } + + @InitiatedBy(SendLargeTransactionFlow::class) @Suppress("UNUSED") + class ReceiveLargeTransactionFlow(private val counterParty: Party) : FlowLogic() { + @Suspendable + override fun call() { + val stx = receive(counterParty).unwrap { it } + subFlow(ResolveTransactionsFlow(stx, counterParty)) + // Unblock the other side by sending some dummy object (Unit is fine here as it's a singleton). + send(counterParty, Unit) + } + } + + @Test + fun checkCanSendLargeTransactions() { + // These 4 attachments yield a transaction that's got >10mb attached, so it'd push us over the Artemis + // max message size. + val bigFile1 = InputStreamAndHash.createInMemoryTestZip(1024*1024*3, 0) + val bigFile2 = InputStreamAndHash.createInMemoryTestZip(1024*1024*3, 1) + val bigFile3 = InputStreamAndHash.createInMemoryTestZip(1024*1024*3, 2) + val bigFile4 = InputStreamAndHash.createInMemoryTestZip(1024*1024*3, 3) + driver(startNodesInProcess = true) { + val (alice, _, _) = aliceBobAndNotary() + alice.useRPC { + val hash1 = it.uploadAttachment(bigFile1.inputStream) + val hash2 = it.uploadAttachment(bigFile2.inputStream) + val hash3 = it.uploadAttachment(bigFile3.inputStream) + val hash4 = it.uploadAttachment(bigFile4.inputStream) + assertEquals(hash1, bigFile1.sha256) + // Should not throw any exceptions. + it.startFlow(::SendLargeTransactionFlow, hash1, hash2, hash3, hash4).returnValue.get() + } + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/CoreFlowHandlers.kt b/node/src/main/kotlin/net/corda/node/services/CoreFlowHandlers.kt index 7853a3a04b..f28ce22809 100644 --- a/node/src/main/kotlin/net/corda/node/services/CoreFlowHandlers.kt +++ b/node/src/main/kotlin/net/corda/node/services/CoreFlowHandlers.kt @@ -31,7 +31,6 @@ class FetchTransactionsHandler(otherParty: Party) : FetchDataHandler(otherParty) { override fun getData(id: SecureHash): ByteArray? { return serviceHub.attachments.openAttachment(id)?.open()?.readBytes() @@ -46,10 +45,13 @@ abstract class FetchDataHandler(val otherParty: Party) : FlowLogic( if (it.hashes.isEmpty()) throw FlowException("Empty hash list") it } - val response = request.hashes.map { - getData(it) ?: throw FetchDataFlow.HashNotFound(it) + // TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer. + // See the discussion in FetchDataFlow. We send each item individually here in a separate asynchronous send + // call, and the other side picks them up with a straight receive call, because we batching would push us over + // the (current) Artemis message size limit. + request.hashes.forEach { + send(otherParty, getData(it) ?: throw FetchDataFlow.HashNotFound(it)) } - send(otherParty, response) } protected abstract fun getData(id: SecureHash): T?