ENT-6693: Change the way how message sizes are determined (#7090)

Especially relevant for `AMQPLargeMessage`.
This commit is contained in:
Viktor Kolomeyko
2022-02-24 13:54:27 +00:00
committed by GitHub
parent 387cb30e2e
commit 162f76f710
2 changed files with 11 additions and 10 deletions

View File

@ -8,14 +8,13 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePac
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage
import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection
import org.apache.qpid.proton.amqp.messaging.Data
class ArtemisMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChecksInterceptor<Packet>(maxMessageSize), Interceptor { class ArtemisMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChecksInterceptor<Packet>(maxMessageSize), Interceptor {
override fun getMessageSize(packet: Packet?): Int? { override fun getMessageSize(packet: Packet?): Long? {
return when (packet) { return when (packet) {
// This is an estimate of how much memory a Message body takes up. // This is an estimate of how much memory a Message body takes up.
// Note, it is only an estimate // Note, it is only an estimate
is MessagePacket -> (packet.message.persistentSize - packet.message.headersAndPropertiesEncodeSize - 4).toInt() is MessagePacket -> (packet.message.persistentSize - packet.message.headersAndPropertiesEncodeSize - 4)
// Skip all artemis control messages. // Skip all artemis control messages.
else -> null else -> null
} }
@ -23,7 +22,7 @@ class ArtemisMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChec
} }
class AmqpMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChecksInterceptor<AMQPMessage>(maxMessageSize), AmqpInterceptor { class AmqpMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChecksInterceptor<AMQPMessage>(maxMessageSize), AmqpInterceptor {
override fun getMessageSize(packet: AMQPMessage?): Int? = (packet?.protonMessage?.body as? Data)?.value?.length override fun getMessageSize(packet: AMQPMessage?): Long? = packet?.wholeMessageSize
} }
/** /**
@ -46,6 +45,6 @@ sealed class MessageSizeChecksInterceptor<T : Any>(private val maxMessageSize: I
} }
// get size of the message in byte, returns null if the message is null or size don't need to be checked. // get size of the message in byte, returns null if the message is null or size don't need to be checked.
abstract fun getMessageSize(packet: T?): Int? abstract fun getMessageSize(packet: T?): Long?
} }

View File

@ -291,9 +291,11 @@ class ProtonWrapperTests {
@Test(timeout=300_000) @Test(timeout=300_000)
fun `Send a message larger then maxMessageSize from AMQP to Artemis inbox`() { fun `Send a message larger then maxMessageSize from AMQP to Artemis inbox`() {
val maxMessageSize = 100_000 val maxUserPayloadSize = 100_000
val (server, artemisClient) = createArtemisServerAndClient(maxMessageSize) val maxMessageSizeWithHeaders = maxUserPayloadSize + 512 // Adding a small "shim" to account for headers
val amqpClient = createClient(maxMessageSize) // and other non-payload bits of data
val (server, artemisClient) = createArtemisServerAndClient(maxMessageSizeWithHeaders)
val amqpClient = createClient(maxMessageSizeWithHeaders)
val clientConnected = amqpClient.onConnection.toFuture() val clientConnected = amqpClient.onConnection.toFuture()
amqpClient.start() amqpClient.start()
assertEquals(true, clientConnected.get().connected) assertEquals(true, clientConnected.get().connected)
@ -308,7 +310,7 @@ class ProtonWrapperTests {
testProperty["TestProp"] = "1" testProperty["TestProp"] = "1"
// Send normal message. // Send normal message.
val testData = ByteArray(maxMessageSize) val testData = ByteArray(maxUserPayloadSize)
val message = amqpClient.createMessage(testData, sendAddress, CHARLIE_NAME.toString(), testProperty) val message = amqpClient.createMessage(testData, sendAddress, CHARLIE_NAME.toString(), testProperty)
amqpClient.write(message) amqpClient.write(message)
assertEquals(MessageStatus.Acknowledged, message.onComplete.get()) assertEquals(MessageStatus.Acknowledged, message.onComplete.get())
@ -317,7 +319,7 @@ class ProtonWrapperTests {
assertArrayEquals(testData, ByteArray(received.bodySize).apply { received.bodyBuffer.readBytes(this) }) assertArrayEquals(testData, ByteArray(received.bodySize).apply { received.bodyBuffer.readBytes(this) })
// Send message larger than max message size. // Send message larger than max message size.
val largeData = ByteArray(maxMessageSize + 1) val largeData = ByteArray(maxMessageSizeWithHeaders + 1)
// Create message will fail. // Create message will fail.
assertThatThrownBy { assertThatThrownBy {
amqpClient.createMessage(largeData, sendAddress, CHARLIE_NAME.toString(), testProperty) amqpClient.createMessage(largeData, sendAddress, CHARLIE_NAME.toString(), testProperty)