diff --git a/build.gradle b/build.gradle index f839df4c71..859260ce94 100644 --- a/build.gradle +++ b/build.gradle @@ -68,7 +68,7 @@ buildscript { ext.proguard_version = constants.getProperty('proguardVersion') ext.jsch_version = '0.1.54' ext.commons_cli_version = '1.4' - ext.protonj_version = '0.27.1' // This is now aligned with the Artemis version, but retaining in case we ever need to diverge again for a bug fix. + ext.protonj_version = '0.33.0' // Overide Artemis version ext.snappy_version = '0.4' ext.class_graph_version = '4.6.12' ext.jcabi_manifests_version = '1.1' diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt index 6cb39ecfd2..8ec6c84940 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt @@ -22,7 +22,6 @@ import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode import org.apache.qpid.proton.amqp.transport.SenderSettleMode import org.apache.qpid.proton.engine.* import org.apache.qpid.proton.message.Message -import org.apache.qpid.proton.message.ProtonJMessage import org.slf4j.MDC import java.net.InetSocketAddress import java.nio.ByteBuffer @@ -380,8 +379,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, val link = delivery.link if (link is Receiver) { if (delivery.isReadable && !delivery.isPartial) { - val pending = delivery.pending() - val amqpMessage = decodeAMQPMessage(pending, link) + val amqpMessage = decodeAMQPMessage(link) val payload = (amqpMessage.body as Data).value.array val connection = event.connection val channel = connection?.context as? Channel @@ -420,7 +418,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, } } - private fun encodeAMQPMessage(message: ProtonJMessage): ByteBuf { + private fun encodeAMQPMessage(message: Message): ByteBuf { val buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1500) try { try { @@ -438,7 +436,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, } private fun encodePayloadBytes(msg: SendableMessageImpl): ByteBuf { - val message = Proton.message() as ProtonJMessage + val message = Proton.message() message.body = Data(Binary(msg.payload)) message.isDurable = true message.properties = Properties() @@ -450,16 +448,11 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, return encodeAMQPMessage(message) } - private fun decodeAMQPMessage(pending: Int, link: Receiver): Message { - val msgBuf = PooledByteBufAllocator.DEFAULT.heapBuffer(pending) - try { - link.recv(NettyWritable(msgBuf)) - val amqpMessage = Proton.message() - amqpMessage.decode(msgBuf.array(), msgBuf.arrayOffset() + msgBuf.readerIndex(), msgBuf.readableBytes()) - return amqpMessage - } finally { - msgBuf.release() - } + private fun decodeAMQPMessage(link: Receiver): Message { + val amqpMessage = Proton.message() + val buf = link.recv() + amqpMessage.decode(buf) + return amqpMessage } fun transportWriteMessage(msg: SendableMessageImpl) {