diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index aebf94b5cd..35768010f7 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -34,6 +34,9 @@ import javax.annotation.concurrent.ThreadSafe class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguration? = null, val serviceConfigurationOverride: (ServerLocator.() -> Unit)? = null) : Closeable, ArtemisMessagingComponent() { private companion object { val log = loggerFor() + /** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */ + @JvmStatic val MAX_FILE_SIZE = 10485760 + } // TODO: Certificate handling for clients needs more work. @@ -63,6 +66,7 @@ class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguratio retryInterval = 5.seconds.toMillis() retryIntervalMultiplier = 1.5 // Exponential backoff maxRetryInterval = 3.minutes.toMillis() + minLargeMessageSize = MAX_FILE_SIZE serviceConfigurationOverride?.invoke(this) } sessionFactory = serverLocator.createSessionFactory() diff --git a/core/src/main/kotlin/net/corda/core/Utils.kt b/core/src/main/kotlin/net/corda/core/Utils.kt index b0b437d642..68434d0e8c 100644 --- a/core/src/main/kotlin/net/corda/core/Utils.kt +++ b/core/src/main/kotlin/net/corda/core/Utils.kt @@ -7,16 +7,16 @@ import com.google.common.base.Function import com.google.common.base.Throwables import com.google.common.io.ByteStreams import com.google.common.util.concurrent.* +import net.corda.core.crypto.SecureHash import net.corda.core.crypto.newSecureRandom +import net.corda.core.crypto.sha256 import net.corda.core.serialization.CordaSerializable import org.slf4j.Logger import rx.Observable import rx.Observer import rx.subjects.PublishSubject import rx.subjects.UnicastSubject -import java.io.BufferedInputStream -import java.io.InputStream -import java.io.OutputStream +import java.io.* import java.math.BigDecimal import java.nio.charset.Charset import java.nio.charset.StandardCharsets.UTF_8 @@ -28,7 +28,10 @@ import java.util.concurrent.* import java.util.concurrent.locks.ReentrantLock import java.util.function.BiConsumer import java.util.stream.Stream +import java.util.zip.Deflater +import java.util.zip.ZipEntry import java.util.zip.ZipInputStream +import java.util.zip.ZipOutputStream import kotlin.concurrent.withLock import kotlin.reflect.KProperty @@ -309,6 +312,38 @@ fun extractZipFile(inputStream: InputStream, toDirectory: Path) { } } +/** + * Get a valid InputStream from an in-memory zip as required for tests. + * Note that a slightly bigger than numOfExpectedBytes size is expected. + */ +@Throws(IllegalArgumentException::class) +fun sizedInputStreamAndHash(numOfExpectedBytes : Int) : InputStreamAndHash { + if (numOfExpectedBytes <= 0) throw IllegalArgumentException("A positive number of numOfExpectedBytes is required.") + val baos = ByteArrayOutputStream() + ZipOutputStream(baos).use({ zos -> + val arraySize = 1024 + val bytes = ByteArray(arraySize) + val n = (numOfExpectedBytes - 1) / arraySize + 1 // same as Math.ceil(numOfExpectedBytes/arraySize). + zos.setLevel(Deflater.NO_COMPRESSION) + zos.putNextEntry(ZipEntry("z")) + for (i in 0 until n) { + zos.write(bytes, 0, arraySize) + } + zos.closeEntry() + }) + return getInputStreamAndHashFromOutputStream(baos) +} + +/** Convert a [ByteArrayOutputStream] to [InputStreamAndHash]. */ +fun getInputStreamAndHashFromOutputStream(baos: ByteArrayOutputStream) : InputStreamAndHash { + // TODO: Consider converting OutputStream to InputStream without creating a ByteArray, probably using piped streams. + val bytes = baos.toByteArray() + // TODO: Consider calculating sha256 on the fly using a DigestInputStream. + return InputStreamAndHash(ByteArrayInputStream(bytes), bytes.sha256()) +} + +data class InputStreamAndHash(val inputStream: InputStream, val sha256: SecureHash.SHA256) + // TODO: Generic csv printing utility for clases. val Throwable.rootCause: Throwable get() = Throwables.getRootCause(this) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index 37bd2a8bc4..f95cb24f41 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -90,6 +90,8 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, val userService: RPCUserService) : ArtemisMessagingComponent() { companion object { private val log = loggerFor() + /** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */ + @JvmStatic val MAX_FILE_SIZE = 10485760 } private class InnerState { @@ -171,6 +173,9 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess isPersistIDCache = true isPopulateValidatedUser = true + journalBufferSize_NIO = MAX_FILE_SIZE // Artemis default is 490KiB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store. + journalBufferSize_AIO = MAX_FILE_SIZE // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store. + journalFileSize = MAX_FILE_SIZE // The size of each journal file in bytes. Artemis default is 10MiB. managementNotificationAddress = SimpleString(NOTIFICATIONS_ADDRESS) // Artemis allows multiple servers to be grouped together into a cluster for load balancing purposes. The cluster // user is used for connecting the nodes together. It has super-user privileges and so it's imperative that its diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt index 52151b41f5..d0076ec40a 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt @@ -152,6 +152,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, // TODO Add broker CN to config for host verification in case the embedded broker isn't used val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverHostPort, config) val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport) + locator.setMinLargeMessageSize(ArtemisMessagingServer.MAX_FILE_SIZE) clientFactory = locator.createSessionFactory() // Login using the node username. The broker will authentiate us as its node (as opposed to another peer) diff --git a/samples/attachment-demo/src/integration-test/kotlin/net/corda/attachmentdemo/AttachmentDemoTest.kt b/samples/attachment-demo/src/integration-test/kotlin/net/corda/attachmentdemo/AttachmentDemoTest.kt index 5cc53e8f1a..954d4cf7b4 100644 --- a/samples/attachment-demo/src/integration-test/kotlin/net/corda/attachmentdemo/AttachmentDemoTest.kt +++ b/samples/attachment-demo/src/integration-test/kotlin/net/corda/attachmentdemo/AttachmentDemoTest.kt @@ -10,7 +10,18 @@ import org.junit.Test import java.util.concurrent.CompletableFuture class AttachmentDemoTest { - @Test fun `runs attachment demo`() { + // run with the default 1K bytes in-memory zip file. + @Test fun `attachment demo`() { + attachmentDemo() + } + + // run with a 10,000,000 bytes in-memory zip file. In practice, a slightly bigger file will be used (~10,002,000 bytes). + @Test fun `attachment demo using a 10MB zip file`() { + attachmentDemo(10000000) + } + + // An in-memory zip file will be used as InputStream, with a size slightly bigger than numOfExpectedBytes. + private fun attachmentDemo(numOfExpectedBytes: Int = 1024) { driver(dsl = { val demoUser = listOf(User("demo", "demo", setOf("StartFlow.net.corda.flows.FinalityFlow"))) val (nodeA, nodeB) = Futures.allAsList( @@ -21,7 +32,7 @@ class AttachmentDemoTest { val senderThread = CompletableFuture.supplyAsync { nodeA.rpcClientToNode().use(demoUser[0].username, demoUser[0].password) { - sender(this) + sender(this, numOfExpectedBytes) } }.exceptionally { it.printStackTrace() } diff --git a/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt b/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt index 95be07aad4..3714e02f43 100644 --- a/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt +++ b/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt @@ -18,6 +18,8 @@ import java.nio.file.Path import java.nio.file.Paths import kotlin.system.exitProcess import kotlin.test.assertEquals +import java.io.* +import net.corda.core.sizedInputStreamAndHash internal enum class Role { SENDER, @@ -55,26 +57,32 @@ fun main(args: Array) { } } -val PROSPECTUS_HASH = SecureHash.parse("decd098666b9657314870e192ced0c3519c2c9d395507a238338f8d003929de9") +var EXPECTED_HASH = SecureHash.zeroHash // Note: We could use another random default value to initialize it. -fun sender(rpc: CordaRPCOps) { +/** An in memory test zip attachment of at least numOfClearBytes size, will be used. */ +fun sender(rpc: CordaRPCOps, numOfClearBytes: Int = 1024) { // default size 1K. + val (inputStream, hash) = sizedInputStreamAndHash(numOfClearBytes) + sender(rpc, inputStream, hash) +} + +fun sender(rpc: CordaRPCOps, inputStream: InputStream, hash: SecureHash.SHA256) { + EXPECTED_HASH = hash // Get the identity key of the other side (the recipient). val otherSide: Party = rpc.partyFromName("Bank B")!! // Make sure we have the file in storage - // TODO: We should have our own demo file, not share the trader demo file - if (!rpc.attachmentExists(PROSPECTUS_HASH)) { - Thread.currentThread().contextClassLoader.getResourceAsStream("bank-of-london-cp.jar").use { + if (!rpc.attachmentExists(hash)) { + inputStream.use { val id = rpc.uploadAttachment(it) - assertEquals(PROSPECTUS_HASH, id) + assertEquals(hash, id) } } // Create a trivial transaction that just passes across the attachment - in normal cases there would be // inputs, outputs and commands that refer to this attachment. val ptx = TransactionType.General.Builder(notary = null) - require(rpc.attachmentExists(PROSPECTUS_HASH)) - ptx.addAttachment(PROSPECTUS_HASH) + require(rpc.attachmentExists(hash)) + ptx.addAttachment(hash) // TODO: Add a dummy state and specify a notary, so that the tx hash is randomised each time and the demo can be repeated. // Despite not having any states, we have to have at least one signature on the transaction @@ -93,8 +101,8 @@ fun recipient(rpc: CordaRPCOps) { val stx = rpc.verifiedTransactions().second.toBlocking().first() val wtx = stx.tx if (wtx.attachments.isNotEmpty()) { - assertEquals(PROSPECTUS_HASH, wtx.attachments.first()) - require(rpc.attachmentExists(PROSPECTUS_HASH)) + assertEquals(EXPECTED_HASH, wtx.attachments.first()) + require(rpc.attachmentExists(EXPECTED_HASH)) println("File received - we're happy!\n\nFinal transaction is:\n\n${Emoji.renderIfSupported(wtx)}") } else { println("Error: no attachments found in ${wtx.id}")