mirror of
https://github.com/corda/corda.git
synced 2025-02-22 10:10:59 +00:00
Update the proton-j library to latest version (#5105)
This commit is contained in:
parent
6769b00ed5
commit
7bcff70864
@ -68,7 +68,7 @@ buildscript {
|
|||||||
ext.proguard_version = constants.getProperty('proguardVersion')
|
ext.proguard_version = constants.getProperty('proguardVersion')
|
||||||
ext.jsch_version = '0.1.54'
|
ext.jsch_version = '0.1.54'
|
||||||
ext.commons_cli_version = '1.4'
|
ext.commons_cli_version = '1.4'
|
||||||
ext.protonj_version = '0.27.1' // This is now aligned with the Artemis version, but retaining in case we ever need to diverge again for a bug fix.
|
ext.protonj_version = '0.33.0' // Overide Artemis version
|
||||||
ext.snappy_version = '0.4'
|
ext.snappy_version = '0.4'
|
||||||
ext.class_graph_version = '4.6.12'
|
ext.class_graph_version = '4.6.12'
|
||||||
ext.jcabi_manifests_version = '1.1'
|
ext.jcabi_manifests_version = '1.1'
|
||||||
|
@ -22,7 +22,6 @@ import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode
|
|||||||
import org.apache.qpid.proton.amqp.transport.SenderSettleMode
|
import org.apache.qpid.proton.amqp.transport.SenderSettleMode
|
||||||
import org.apache.qpid.proton.engine.*
|
import org.apache.qpid.proton.engine.*
|
||||||
import org.apache.qpid.proton.message.Message
|
import org.apache.qpid.proton.message.Message
|
||||||
import org.apache.qpid.proton.message.ProtonJMessage
|
|
||||||
import org.slf4j.MDC
|
import org.slf4j.MDC
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
@ -380,8 +379,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
|||||||
val link = delivery.link
|
val link = delivery.link
|
||||||
if (link is Receiver) {
|
if (link is Receiver) {
|
||||||
if (delivery.isReadable && !delivery.isPartial) {
|
if (delivery.isReadable && !delivery.isPartial) {
|
||||||
val pending = delivery.pending()
|
val amqpMessage = decodeAMQPMessage(link)
|
||||||
val amqpMessage = decodeAMQPMessage(pending, link)
|
|
||||||
val payload = (amqpMessage.body as Data).value.array
|
val payload = (amqpMessage.body as Data).value.array
|
||||||
val connection = event.connection
|
val connection = event.connection
|
||||||
val channel = connection?.context as? Channel
|
val channel = connection?.context as? Channel
|
||||||
@ -420,7 +418,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun encodeAMQPMessage(message: ProtonJMessage): ByteBuf {
|
private fun encodeAMQPMessage(message: Message): ByteBuf {
|
||||||
val buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1500)
|
val buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1500)
|
||||||
try {
|
try {
|
||||||
try {
|
try {
|
||||||
@ -438,7 +436,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun encodePayloadBytes(msg: SendableMessageImpl): ByteBuf {
|
private fun encodePayloadBytes(msg: SendableMessageImpl): ByteBuf {
|
||||||
val message = Proton.message() as ProtonJMessage
|
val message = Proton.message()
|
||||||
message.body = Data(Binary(msg.payload))
|
message.body = Data(Binary(msg.payload))
|
||||||
message.isDurable = true
|
message.isDurable = true
|
||||||
message.properties = Properties()
|
message.properties = Properties()
|
||||||
@ -450,16 +448,11 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
|
|||||||
return encodeAMQPMessage(message)
|
return encodeAMQPMessage(message)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun decodeAMQPMessage(pending: Int, link: Receiver): Message {
|
private fun decodeAMQPMessage(link: Receiver): Message {
|
||||||
val msgBuf = PooledByteBufAllocator.DEFAULT.heapBuffer(pending)
|
|
||||||
try {
|
|
||||||
link.recv(NettyWritable(msgBuf))
|
|
||||||
val amqpMessage = Proton.message()
|
val amqpMessage = Proton.message()
|
||||||
amqpMessage.decode(msgBuf.array(), msgBuf.arrayOffset() + msgBuf.readerIndex(), msgBuf.readableBytes())
|
val buf = link.recv()
|
||||||
|
amqpMessage.decode(buf)
|
||||||
return amqpMessage
|
return amqpMessage
|
||||||
} finally {
|
|
||||||
msgBuf.release()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fun transportWriteMessage(msg: SendableMessageImpl) {
|
fun transportWriteMessage(msg: SendableMessageImpl) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user