mirror of
https://github.com/corda/corda.git
synced 2025-06-15 21:58:17 +00:00
CORDA-1416: Upgrade version of Proton-J library (#3050)
* CORDA-1416: Upgrade version of Proton-J library * CORDA-1416: Compilation fixes following Proton-J upgrade Reflects: https://issues.apache.org/jira/browse/PROTON-1712 and https://issues.apache.org/jira/browse/PROTON-1672 * CORDA-1416: Add an integration test to prove that data saved by from previous version can be read. * CORDA-1416: Add additional check validate serialized form.
This commit is contained in:
@ -130,7 +130,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa
|
||||
|
||||
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
|
||||
val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) }
|
||||
val properties = HashMap<Any?, Any?>()
|
||||
val properties = HashMap<String, Any?>()
|
||||
for (key in P2PMessagingHeaders.whitelistedHeaders) {
|
||||
if (artemisMessage.containsProperty(key)) {
|
||||
var value = artemisMessage.getObjectProperty(key)
|
||||
|
@ -1,6 +1,7 @@
|
||||
package net.corda.nodeapi.internal.protonwrapper.engine
|
||||
|
||||
import io.netty.buffer.ByteBuf
|
||||
import org.apache.qpid.proton.codec.ReadableBuffer
|
||||
import org.apache.qpid.proton.codec.WritableBuffer
|
||||
import java.nio.ByteBuffer
|
||||
|
||||
@ -57,6 +58,10 @@ internal class NettyWritable(val nettyBuffer: ByteBuf) : WritableBuffer {
|
||||
nettyBuffer.writeBytes(payload)
|
||||
}
|
||||
|
||||
override fun put(payload: ReadableBuffer) {
|
||||
nettyBuffer.writeBytes(payload.byteBuffer())
|
||||
}
|
||||
|
||||
override fun limit(): Int {
|
||||
return nettyBuffer.capacity()
|
||||
}
|
||||
|
@ -10,5 +10,5 @@ interface ApplicationMessage {
|
||||
val topic: String
|
||||
val destinationLegalName: String
|
||||
val destinationLink: NetworkHostAndPort
|
||||
val applicationProperties: Map<Any?, Any?>
|
||||
val applicationProperties: Map<String, Any?>
|
||||
}
|
@ -16,7 +16,7 @@ internal class ReceivedMessageImpl(override val payload: ByteArray,
|
||||
override val sourceLink: NetworkHostAndPort,
|
||||
override val destinationLegalName: String,
|
||||
override val destinationLink: NetworkHostAndPort,
|
||||
override val applicationProperties: Map<Any?, Any?>,
|
||||
override val applicationProperties: Map<String, Any?>,
|
||||
private val channel: Channel,
|
||||
private val delivery: Delivery) : ReceivedMessage {
|
||||
data class MessageCompleter(val status: MessageStatus, val delivery: Delivery)
|
||||
|
@ -15,7 +15,7 @@ internal class SendableMessageImpl(override val payload: ByteArray,
|
||||
override val topic: String,
|
||||
override val destinationLegalName: String,
|
||||
override val destinationLink: NetworkHostAndPort,
|
||||
override val applicationProperties: Map<Any?, Any?>) : SendableMessage {
|
||||
override val applicationProperties: Map<String, Any?>) : SendableMessage {
|
||||
var buf: ByteBuf? = null
|
||||
@Volatile
|
||||
var status: MessageStatus = MessageStatus.Unsent
|
||||
|
@ -170,7 +170,7 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
|
||||
fun createMessage(payload: ByteArray,
|
||||
topic: String,
|
||||
destinationLegalName: String,
|
||||
properties: Map<Any?, Any?>): SendableMessage {
|
||||
properties: Map<String, Any?>): SendableMessage {
|
||||
return SendableMessageImpl(payload, topic, destinationLegalName, currentTarget, properties)
|
||||
}
|
||||
|
||||
|
@ -155,7 +155,7 @@ class AMQPServer(val hostName: String,
|
||||
topic: String,
|
||||
destinationLegalName: String,
|
||||
destinationLink: NetworkHostAndPort,
|
||||
properties: Map<Any?, Any?>): SendableMessage {
|
||||
properties: Map<String, Any?>): SendableMessage {
|
||||
val dest = InetSocketAddress(destinationLink.host, destinationLink.port)
|
||||
require(dest in clientChannels.keys) {
|
||||
"Destination not available"
|
||||
|
Reference in New Issue
Block a user