Artemis - allow attachments to a maximum of 10MiB including any headers. (#452)

Allow attachments to a maximum of 10MiB including any headers.
This commit is contained in:
Konstantinos Chalkias 2017-04-03 18:42:21 +01:00 committed by GitHub
parent 413e39903d
commit dbd82705aa
6 changed files with 79 additions and 15 deletions

View File

@ -34,6 +34,9 @@ import javax.annotation.concurrent.ThreadSafe
class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguration? = null, val serviceConfigurationOverride: (ServerLocator.() -> Unit)? = null) : Closeable, ArtemisMessagingComponent() { class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguration? = null, val serviceConfigurationOverride: (ServerLocator.() -> Unit)? = null) : Closeable, ArtemisMessagingComponent() {
private companion object { private companion object {
val log = loggerFor<CordaRPCClient>() val log = loggerFor<CordaRPCClient>()
/** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */
@JvmStatic val MAX_FILE_SIZE = 10485760
} }
// TODO: Certificate handling for clients needs more work. // TODO: Certificate handling for clients needs more work.
@ -63,6 +66,7 @@ class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguratio
retryInterval = 5.seconds.toMillis() retryInterval = 5.seconds.toMillis()
retryIntervalMultiplier = 1.5 // Exponential backoff retryIntervalMultiplier = 1.5 // Exponential backoff
maxRetryInterval = 3.minutes.toMillis() maxRetryInterval = 3.minutes.toMillis()
minLargeMessageSize = MAX_FILE_SIZE
serviceConfigurationOverride?.invoke(this) serviceConfigurationOverride?.invoke(this)
} }
sessionFactory = serverLocator.createSessionFactory() sessionFactory = serverLocator.createSessionFactory()

View File

@ -7,16 +7,16 @@ import com.google.common.base.Function
import com.google.common.base.Throwables import com.google.common.base.Throwables
import com.google.common.io.ByteStreams import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.* import com.google.common.util.concurrent.*
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.newSecureRandom import net.corda.core.crypto.newSecureRandom
import net.corda.core.crypto.sha256
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import org.slf4j.Logger import org.slf4j.Logger
import rx.Observable import rx.Observable
import rx.Observer import rx.Observer
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import rx.subjects.UnicastSubject import rx.subjects.UnicastSubject
import java.io.BufferedInputStream import java.io.*
import java.io.InputStream
import java.io.OutputStream
import java.math.BigDecimal import java.math.BigDecimal
import java.nio.charset.Charset import java.nio.charset.Charset
import java.nio.charset.StandardCharsets.UTF_8 import java.nio.charset.StandardCharsets.UTF_8
@ -28,7 +28,10 @@ import java.util.concurrent.*
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import java.util.function.BiConsumer import java.util.function.BiConsumer
import java.util.stream.Stream import java.util.stream.Stream
import java.util.zip.Deflater
import java.util.zip.ZipEntry
import java.util.zip.ZipInputStream import java.util.zip.ZipInputStream
import java.util.zip.ZipOutputStream
import kotlin.concurrent.withLock import kotlin.concurrent.withLock
import kotlin.reflect.KProperty import kotlin.reflect.KProperty
@ -309,6 +312,38 @@ fun extractZipFile(inputStream: InputStream, toDirectory: Path) {
} }
} }
/**
* Get a valid InputStream from an in-memory zip as required for tests.
* Note that a slightly bigger than numOfExpectedBytes size is expected.
*/
@Throws(IllegalArgumentException::class)
fun sizedInputStreamAndHash(numOfExpectedBytes : Int) : InputStreamAndHash {
if (numOfExpectedBytes <= 0) throw IllegalArgumentException("A positive number of numOfExpectedBytes is required.")
val baos = ByteArrayOutputStream()
ZipOutputStream(baos).use({ zos ->
val arraySize = 1024
val bytes = ByteArray(arraySize)
val n = (numOfExpectedBytes - 1) / arraySize + 1 // same as Math.ceil(numOfExpectedBytes/arraySize).
zos.setLevel(Deflater.NO_COMPRESSION)
zos.putNextEntry(ZipEntry("z"))
for (i in 0 until n) {
zos.write(bytes, 0, arraySize)
}
zos.closeEntry()
})
return getInputStreamAndHashFromOutputStream(baos)
}
/** Convert a [ByteArrayOutputStream] to [InputStreamAndHash]. */
fun getInputStreamAndHashFromOutputStream(baos: ByteArrayOutputStream) : InputStreamAndHash {
// TODO: Consider converting OutputStream to InputStream without creating a ByteArray, probably using piped streams.
val bytes = baos.toByteArray()
// TODO: Consider calculating sha256 on the fly using a DigestInputStream.
return InputStreamAndHash(ByteArrayInputStream(bytes), bytes.sha256())
}
data class InputStreamAndHash(val inputStream: InputStream, val sha256: SecureHash.SHA256)
// TODO: Generic csv printing utility for clases. // TODO: Generic csv printing utility for clases.
val Throwable.rootCause: Throwable get() = Throwables.getRootCause(this) val Throwable.rootCause: Throwable get() = Throwables.getRootCause(this)

View File

@ -90,6 +90,8 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
val userService: RPCUserService) : ArtemisMessagingComponent() { val userService: RPCUserService) : ArtemisMessagingComponent() {
companion object { companion object {
private val log = loggerFor<ArtemisMessagingServer>() private val log = loggerFor<ArtemisMessagingServer>()
/** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */
@JvmStatic val MAX_FILE_SIZE = 10485760
} }
private class InnerState { private class InnerState {
@ -171,6 +173,9 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
isPersistIDCache = true isPersistIDCache = true
isPopulateValidatedUser = true isPopulateValidatedUser = true
journalBufferSize_NIO = MAX_FILE_SIZE // Artemis default is 490KiB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store.
journalBufferSize_AIO = MAX_FILE_SIZE // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store.
journalFileSize = MAX_FILE_SIZE // The size of each journal file in bytes. Artemis default is 10MiB.
managementNotificationAddress = SimpleString(NOTIFICATIONS_ADDRESS) managementNotificationAddress = SimpleString(NOTIFICATIONS_ADDRESS)
// Artemis allows multiple servers to be grouped together into a cluster for load balancing purposes. The cluster // Artemis allows multiple servers to be grouped together into a cluster for load balancing purposes. The cluster
// user is used for connecting the nodes together. It has super-user privileges and so it's imperative that its // user is used for connecting the nodes together. It has super-user privileges and so it's imperative that its

View File

@ -152,6 +152,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// TODO Add broker CN to config for host verification in case the embedded broker isn't used // TODO Add broker CN to config for host verification in case the embedded broker isn't used
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverHostPort, config) val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverHostPort, config)
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport) val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport)
locator.setMinLargeMessageSize(ArtemisMessagingServer.MAX_FILE_SIZE)
clientFactory = locator.createSessionFactory() clientFactory = locator.createSessionFactory()
// Login using the node username. The broker will authentiate us as its node (as opposed to another peer) // Login using the node username. The broker will authentiate us as its node (as opposed to another peer)

View File

@ -10,7 +10,18 @@ import org.junit.Test
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
class AttachmentDemoTest { class AttachmentDemoTest {
@Test fun `runs attachment demo`() { // run with the default 1K bytes in-memory zip file.
@Test fun `attachment demo`() {
attachmentDemo()
}
// run with a 10,000,000 bytes in-memory zip file. In practice, a slightly bigger file will be used (~10,002,000 bytes).
@Test fun `attachment demo using a 10MB zip file`() {
attachmentDemo(10000000)
}
// An in-memory zip file will be used as InputStream, with a size slightly bigger than numOfExpectedBytes.
private fun attachmentDemo(numOfExpectedBytes: Int = 1024) {
driver(dsl = { driver(dsl = {
val demoUser = listOf(User("demo", "demo", setOf("StartFlow.net.corda.flows.FinalityFlow"))) val demoUser = listOf(User("demo", "demo", setOf("StartFlow.net.corda.flows.FinalityFlow")))
val (nodeA, nodeB) = Futures.allAsList( val (nodeA, nodeB) = Futures.allAsList(
@ -21,7 +32,7 @@ class AttachmentDemoTest {
val senderThread = CompletableFuture.supplyAsync { val senderThread = CompletableFuture.supplyAsync {
nodeA.rpcClientToNode().use(demoUser[0].username, demoUser[0].password) { nodeA.rpcClientToNode().use(demoUser[0].username, demoUser[0].password) {
sender(this) sender(this, numOfExpectedBytes)
} }
}.exceptionally { it.printStackTrace() } }.exceptionally { it.printStackTrace() }

View File

@ -18,6 +18,8 @@ import java.nio.file.Path
import java.nio.file.Paths import java.nio.file.Paths
import kotlin.system.exitProcess import kotlin.system.exitProcess
import kotlin.test.assertEquals import kotlin.test.assertEquals
import java.io.*
import net.corda.core.sizedInputStreamAndHash
internal enum class Role { internal enum class Role {
SENDER, SENDER,
@ -55,26 +57,32 @@ fun main(args: Array<String>) {
} }
} }
val PROSPECTUS_HASH = SecureHash.parse("decd098666b9657314870e192ced0c3519c2c9d395507a238338f8d003929de9") var EXPECTED_HASH = SecureHash.zeroHash // Note: We could use another random default value to initialize it.
fun sender(rpc: CordaRPCOps) { /** An in memory test zip attachment of at least numOfClearBytes size, will be used. */
fun sender(rpc: CordaRPCOps, numOfClearBytes: Int = 1024) { // default size 1K.
val (inputStream, hash) = sizedInputStreamAndHash(numOfClearBytes)
sender(rpc, inputStream, hash)
}
fun sender(rpc: CordaRPCOps, inputStream: InputStream, hash: SecureHash.SHA256) {
EXPECTED_HASH = hash
// Get the identity key of the other side (the recipient). // Get the identity key of the other side (the recipient).
val otherSide: Party = rpc.partyFromName("Bank B")!! val otherSide: Party = rpc.partyFromName("Bank B")!!
// Make sure we have the file in storage // Make sure we have the file in storage
// TODO: We should have our own demo file, not share the trader demo file if (!rpc.attachmentExists(hash)) {
if (!rpc.attachmentExists(PROSPECTUS_HASH)) { inputStream.use {
Thread.currentThread().contextClassLoader.getResourceAsStream("bank-of-london-cp.jar").use {
val id = rpc.uploadAttachment(it) val id = rpc.uploadAttachment(it)
assertEquals(PROSPECTUS_HASH, id) assertEquals(hash, id)
} }
} }
// Create a trivial transaction that just passes across the attachment - in normal cases there would be // Create a trivial transaction that just passes across the attachment - in normal cases there would be
// inputs, outputs and commands that refer to this attachment. // inputs, outputs and commands that refer to this attachment.
val ptx = TransactionType.General.Builder(notary = null) val ptx = TransactionType.General.Builder(notary = null)
require(rpc.attachmentExists(PROSPECTUS_HASH)) require(rpc.attachmentExists(hash))
ptx.addAttachment(PROSPECTUS_HASH) ptx.addAttachment(hash)
// TODO: Add a dummy state and specify a notary, so that the tx hash is randomised each time and the demo can be repeated. // TODO: Add a dummy state and specify a notary, so that the tx hash is randomised each time and the demo can be repeated.
// Despite not having any states, we have to have at least one signature on the transaction // Despite not having any states, we have to have at least one signature on the transaction
@ -93,8 +101,8 @@ fun recipient(rpc: CordaRPCOps) {
val stx = rpc.verifiedTransactions().second.toBlocking().first() val stx = rpc.verifiedTransactions().second.toBlocking().first()
val wtx = stx.tx val wtx = stx.tx
if (wtx.attachments.isNotEmpty()) { if (wtx.attachments.isNotEmpty()) {
assertEquals(PROSPECTUS_HASH, wtx.attachments.first()) assertEquals(EXPECTED_HASH, wtx.attachments.first())
require(rpc.attachmentExists(PROSPECTUS_HASH)) require(rpc.attachmentExists(EXPECTED_HASH))
println("File received - we're happy!\n\nFinal transaction is:\n\n${Emoji.renderIfSupported(wtx)}") println("File received - we're happy!\n\nFinal transaction is:\n\n${Emoji.renderIfSupported(wtx)}")
} else { } else {
println("Error: no attachments found in ${wtx.id}") println("Error: no attachments found in ${wtx.id}")