From a56540a3d63fd327a99560d65ea8dfb25a8cafa7 Mon Sep 17 00:00:00 2001
From: Mike Hearn <mike@r3cev.com>
Date: Wed, 12 Jul 2017 12:18:49 +0200
Subject: [PATCH] Fix support for large attachments by de-batching
 tx/attachment fetch. This is a workaround until the upstream Artemis large
 message streaming bugs are fixed.

---
 .../corda/core/flows/FetchAttachmentsFlow.kt  |  3 +-
 .../net/corda/core/flows/FetchDataFlow.kt     | 24 ++++--
 .../corda/core/flows/FetchTransactionsFlow.kt |  2 +-
 .../statemachine/LargeTransactionsTest.kt     | 73 +++++++++++++++++++
 .../corda/node/services/CoreFlowHandlers.kt   | 10 ++-
 5 files changed, 100 insertions(+), 12 deletions(-)
 create mode 100644 node/src/integration-test/kotlin/net/corda/node/services/statemachine/LargeTransactionsTest.kt

diff --git a/core/src/main/kotlin/net/corda/core/flows/FetchAttachmentsFlow.kt b/core/src/main/kotlin/net/corda/core/flows/FetchAttachmentsFlow.kt
index 1eaeff6e56..cdf9f632fb 100644
--- a/core/src/main/kotlin/net/corda/core/flows/FetchAttachmentsFlow.kt
+++ b/core/src/main/kotlin/net/corda/core/flows/FetchAttachmentsFlow.kt
@@ -15,8 +15,7 @@ import net.corda.core.serialization.SerializeAsTokenContext
  */
 @InitiatingFlow
 class FetchAttachmentsFlow(requests: Set<SecureHash>,
-                           otherSide: Party) : FetchDataFlow<Attachment, ByteArray>(requests, otherSide) {
-
+                           otherSide: Party) : FetchDataFlow<Attachment, ByteArray>(requests, otherSide, ByteArray::class.java) {
     override fun load(txid: SecureHash): Attachment? = serviceHub.attachments.openAttachment(txid)
 
     override fun convert(wire: ByteArray): Attachment = FetchedAttachment({ wire })
diff --git a/core/src/main/kotlin/net/corda/core/flows/FetchDataFlow.kt b/core/src/main/kotlin/net/corda/core/flows/FetchDataFlow.kt
index 28c5fdceb1..bd843fbce0 100644
--- a/core/src/main/kotlin/net/corda/core/flows/FetchDataFlow.kt
+++ b/core/src/main/kotlin/net/corda/core/flows/FetchDataFlow.kt
@@ -27,9 +27,10 @@ import java.util.*
  * @param T The ultimate type of the data being fetched.
  * @param W The wire type of the data being fetched, for when it isn't the same as the ultimate type.
  */
-abstract class FetchDataFlow<T : NamedByHash, in W : Any>(
+abstract class FetchDataFlow<T : NamedByHash, W : Any>(
         protected val requests: Set<SecureHash>,
-        protected val otherSide: Party) : FlowLogic<FetchDataFlow.Result<T>>() {
+        protected val otherSide: Party,
+        protected val wrapperType: Class<W>) : FlowLogic<FetchDataFlow.Result<T>>() {
 
     @CordaSerializable
     class DownloadedVsRequestedDataMismatch(val requested: SecureHash, val got: SecureHash) : IllegalArgumentException()
@@ -54,12 +55,25 @@ abstract class FetchDataFlow<T : NamedByHash, in W : Any>(
         return if (toFetch.isEmpty()) {
             Result(fromDisk, emptyList())
         } else {
-            logger.trace("Requesting ${toFetch.size} dependency(s) for verification")
+            logger.info("Requesting ${toFetch.size} dependency(s) for verification from ${otherSide.name}")
 
             // TODO: Support "large message" response streaming so response sizes are not limited by RAM.
-            val maybeItems = sendAndReceive<ArrayList<W>>(otherSide, Request(toFetch))
+            // We can then switch to requesting items in large batches to minimise the latency penalty.
+            // This is blocked by bugs ARTEMIS-1278 and ARTEMIS-1279. For now we limit attachments and txns to 10mb each
+            // and don't request items in batch, which is a performance loss, but works around the issue. We have
+            // configured Artemis to not fragment messages up to 10mb so we can send 10mb messages without problems.
+            // Above that, we start losing authentication data on the message fragments and take exceptions in the
+            // network layer.
+            val maybeItems = ArrayList<W>(toFetch.size)
+            send(otherSide, Request(toFetch))
+            for (hash in toFetch) {
+                // We skip the validation here (with unwrap { it }) because we will do it below in validateFetchResponse.
+                // The only thing checked is the object type. It is a protocol violation to send results out of order.
+                maybeItems += receive(wrapperType, otherSide).unwrap { it }
+            }
             // Check for a buggy/malicious peer answering with something that we didn't ask for.
-            val downloaded = validateFetchResponse(maybeItems, toFetch)
+            val downloaded = validateFetchResponse(UntrustworthyData(maybeItems), toFetch)
+            logger.info("Fetched ${downloaded.size} elements from ${otherSide.name}")
             maybeWriteToDisk(downloaded)
             Result(fromDisk, downloaded)
         }
diff --git a/core/src/main/kotlin/net/corda/core/flows/FetchTransactionsFlow.kt b/core/src/main/kotlin/net/corda/core/flows/FetchTransactionsFlow.kt
index e39e16aa7c..e43a022b7a 100644
--- a/core/src/main/kotlin/net/corda/core/flows/FetchTransactionsFlow.kt
+++ b/core/src/main/kotlin/net/corda/core/flows/FetchTransactionsFlow.kt
@@ -14,7 +14,7 @@ import net.corda.core.transactions.SignedTransaction
  */
 @InitiatingFlow
 class FetchTransactionsFlow(requests: Set<SecureHash>, otherSide: Party) :
-        FetchDataFlow<SignedTransaction, SignedTransaction>(requests, otherSide) {
+        FetchDataFlow<SignedTransaction, SignedTransaction>(requests, otherSide, SignedTransaction::class.java) {
 
     override fun load(txid: SecureHash): SignedTransaction? = serviceHub.validatedTransactions.getTransaction(txid)
 }
diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/LargeTransactionsTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/LargeTransactionsTest.kt
new file mode 100644
index 0000000000..a5c059a498
--- /dev/null
+++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/LargeTransactionsTest.kt
@@ -0,0 +1,73 @@
+package net.corda.node.services.statemachine
+
+import co.paralleluniverse.fibers.Suspendable
+import net.corda.core.InputStreamAndHash
+import net.corda.core.crypto.SecureHash
+import net.corda.core.flows.*
+import net.corda.core.identity.Party
+import net.corda.core.messaging.startFlow
+import net.corda.core.transactions.SignedTransaction
+import net.corda.core.transactions.TransactionBuilder
+import net.corda.core.utilities.unwrap
+import net.corda.testing.BOB
+import net.corda.testing.DUMMY_NOTARY
+import net.corda.testing.aliceBobAndNotary
+import net.corda.testing.contracts.DummyState
+import net.corda.testing.driver.driver
+import org.junit.Test
+import kotlin.test.assertEquals
+
+/**
+ * Check that we can add lots of large attachments to a transaction and that it works OK, e.g. does not hit the
+ * transaction size limit (which should only consider the hashes).
+ */
+class LargeTransactionsTest {
+    @StartableByRPC @InitiatingFlow
+    class SendLargeTransactionFlow(val hash1: SecureHash, val hash2: SecureHash, val hash3: SecureHash, val hash4: SecureHash) : FlowLogic<Unit>() {
+        @Suspendable
+        override fun call() {
+            val tx = TransactionBuilder(notary = DUMMY_NOTARY)
+                    .addOutputState(DummyState())
+                    .addAttachment(hash1)
+                    .addAttachment(hash2)
+                    .addAttachment(hash3)
+                    .addAttachment(hash4)
+            val stx = serviceHub.signInitialTransaction(tx, serviceHub.legalIdentityKey)
+            // Send to the other side and wait for it to trigger resolution from us.
+            sendAndReceive<Unit>(serviceHub.networkMapCache.getNodeByLegalName(BOB.name)!!.legalIdentity, stx)
+        }
+    }
+
+    @InitiatedBy(SendLargeTransactionFlow::class) @Suppress("UNUSED")
+    class ReceiveLargeTransactionFlow(private val counterParty: Party) : FlowLogic<Unit>() {
+        @Suspendable
+        override fun call() {
+            val stx = receive<SignedTransaction>(counterParty).unwrap { it }
+            subFlow(ResolveTransactionsFlow(stx, counterParty))
+            // Unblock the other side by sending some dummy object (Unit is fine here as it's a singleton).
+            send(counterParty, Unit)
+        }
+    }
+
+    @Test
+    fun checkCanSendLargeTransactions() {
+        // These 4 attachments yield a transaction that's got >10mb attached, so it'd push us over the Artemis
+        // max message size.
+        val bigFile1 = InputStreamAndHash.createInMemoryTestZip(1024*1024*3, 0)
+        val bigFile2 = InputStreamAndHash.createInMemoryTestZip(1024*1024*3, 1)
+        val bigFile3 = InputStreamAndHash.createInMemoryTestZip(1024*1024*3, 2)
+        val bigFile4 = InputStreamAndHash.createInMemoryTestZip(1024*1024*3, 3)
+        driver(startNodesInProcess = true) {
+            val (alice, _, _) = aliceBobAndNotary()
+            alice.useRPC {
+                val hash1 = it.uploadAttachment(bigFile1.inputStream)
+                val hash2 = it.uploadAttachment(bigFile2.inputStream)
+                val hash3 = it.uploadAttachment(bigFile3.inputStream)
+                val hash4 = it.uploadAttachment(bigFile4.inputStream)
+                assertEquals(hash1, bigFile1.sha256)
+                // Should not throw any exceptions.
+                it.startFlow(::SendLargeTransactionFlow, hash1, hash2, hash3, hash4).returnValue.get()
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/node/src/main/kotlin/net/corda/node/services/CoreFlowHandlers.kt b/node/src/main/kotlin/net/corda/node/services/CoreFlowHandlers.kt
index 7853a3a04b..f28ce22809 100644
--- a/node/src/main/kotlin/net/corda/node/services/CoreFlowHandlers.kt
+++ b/node/src/main/kotlin/net/corda/node/services/CoreFlowHandlers.kt
@@ -31,7 +31,6 @@ class FetchTransactionsHandler(otherParty: Party) : FetchDataHandler<SignedTrans
     }
 }
 
-// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
 class FetchAttachmentsHandler(otherParty: Party) : FetchDataHandler<ByteArray>(otherParty) {
     override fun getData(id: SecureHash): ByteArray? {
         return serviceHub.attachments.openAttachment(id)?.open()?.readBytes()
@@ -46,10 +45,13 @@ abstract class FetchDataHandler<out T>(val otherParty: Party) : FlowLogic<Unit>(
             if (it.hashes.isEmpty()) throw FlowException("Empty hash list")
             it
         }
-        val response = request.hashes.map {
-            getData(it) ?: throw FetchDataFlow.HashNotFound(it)
+        // TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
+        // See the discussion in FetchDataFlow. We send each item individually here in a separate asynchronous send
+        // call, and the other side picks them up with a straight receive call, because we batching would push us over
+        // the (current) Artemis message size limit.
+        request.hashes.forEach {
+            send(otherParty, getData(it) ?: throw FetchDataFlow.HashNotFound(it))
         }
-        send(otherParty, response)
     }
 
     protected abstract fun getData(id: SecureHash): T?