mirror of
https://github.com/corda/corda.git
synced 2025-01-28 07:04:12 +00:00
Merge pull request #524 from corda/mnesbit-merge-20180307
Merge up from OS master including Artemis headers whitelisting
This commit is contained in:
commit
c63311343b
@ -17,6 +17,8 @@ import net.corda.core.messaging.MessageRecipients
|
|||||||
import net.corda.core.messaging.SingleMessageRecipient
|
import net.corda.core.messaging.SingleMessageRecipient
|
||||||
import net.corda.core.serialization.CordaSerializable
|
import net.corda.core.serialization.CordaSerializable
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
|
import org.apache.activemq.artemis.api.core.Message
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -38,6 +40,49 @@ class ArtemisMessagingComponent {
|
|||||||
const val BRIDGE_CONTROL = "${INTERNAL_PREFIX}bridge.control"
|
const val BRIDGE_CONTROL = "${INTERNAL_PREFIX}bridge.control"
|
||||||
const val BRIDGE_NOTIFY = "${INTERNAL_PREFIX}bridge.notify"
|
const val BRIDGE_NOTIFY = "${INTERNAL_PREFIX}bridge.notify"
|
||||||
const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications"
|
const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* In the operation mode where we have an out of process bridge we cannot correctly populate the Artemis validated user header
|
||||||
|
* as the TLS does not terminate directly onto Artemis. We therefore use this internal only header to forward
|
||||||
|
* the equivalent information from the Float.
|
||||||
|
*/
|
||||||
|
val bridgedCertificateSubject = SimpleString("sender-subject-name")
|
||||||
|
|
||||||
|
object P2PMessagingHeaders {
|
||||||
|
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
|
||||||
|
// We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint
|
||||||
|
// that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid
|
||||||
|
// confusion.
|
||||||
|
val topicProperty = SimpleString("platform-topic")
|
||||||
|
val cordaVendorProperty = SimpleString("corda-vendor")
|
||||||
|
val releaseVersionProperty = SimpleString("release-version")
|
||||||
|
val platformVersionProperty = SimpleString("platform-version")
|
||||||
|
val senderUUID = SimpleString("sender-uuid")
|
||||||
|
val senderSeqNo = SimpleString("send-seq-no")
|
||||||
|
/**
|
||||||
|
* In the operation mode where we have an out of process bridge we cannot correctly populate the Artemis validated user header
|
||||||
|
* as the TLS does not terminate directly onto Artemis. We therefore use this internal only header to forward
|
||||||
|
* the equivalent information from the Float.
|
||||||
|
*/
|
||||||
|
val bridgedCertificateSubject = SimpleString("sender-subject-name")
|
||||||
|
|
||||||
|
|
||||||
|
object Type {
|
||||||
|
const val KEY = "corda_p2p_message_type"
|
||||||
|
const val SESSION_INIT_VALUE = "session_init"
|
||||||
|
}
|
||||||
|
|
||||||
|
val whitelistedHeaders: Set<String> = setOf(topicProperty.toString(),
|
||||||
|
cordaVendorProperty.toString(),
|
||||||
|
releaseVersionProperty.toString(),
|
||||||
|
platformVersionProperty.toString(),
|
||||||
|
senderUUID.toString(),
|
||||||
|
senderSeqNo.toString(),
|
||||||
|
bridgedCertificateSubject.toString(),
|
||||||
|
Type.KEY,
|
||||||
|
Message.HDR_DUPLICATE_DETECTION_ID.toString(),
|
||||||
|
Message.HDR_VALIDATED_USER.toString())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
interface ArtemisAddress : MessageRecipients {
|
interface ArtemisAddress : MessageRecipients {
|
||||||
|
@ -20,6 +20,7 @@ import net.corda.core.utilities.debug
|
|||||||
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
|
||||||
import net.corda.nodeapi.internal.ArtemisSessionProvider
|
import net.corda.nodeapi.internal.ArtemisSessionProvider
|
||||||
@ -138,12 +139,14 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa
|
|||||||
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
|
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
|
||||||
val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) }
|
val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) }
|
||||||
val properties = HashMap<Any?, Any?>()
|
val properties = HashMap<Any?, Any?>()
|
||||||
for (key in artemisMessage.propertyNames) {
|
for (key in P2PMessagingHeaders.whitelistedHeaders) {
|
||||||
|
if (artemisMessage.containsProperty(key)) {
|
||||||
var value = artemisMessage.getObjectProperty(key)
|
var value = artemisMessage.getObjectProperty(key)
|
||||||
if (value is SimpleString) {
|
if (value is SimpleString) {
|
||||||
value = value.toString()
|
value = value.toString()
|
||||||
}
|
}
|
||||||
properties[key.toString()] = value
|
properties[key] = value
|
||||||
|
}
|
||||||
}
|
}
|
||||||
log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
|
log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
|
||||||
val peerInbox = translateLocalQueueToInboxAddress(queueName)
|
val peerInbox = translateLocalQueueToInboxAddress(queueName)
|
||||||
|
@ -15,14 +15,12 @@ import com.nhaarman.mockito_kotlin.whenever
|
|||||||
import net.corda.core.crypto.toStringShort
|
import net.corda.core.crypto.toStringShort
|
||||||
import net.corda.core.internal.div
|
import net.corda.core.internal.div
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.node.services.config.*
|
|
||||||
import net.corda.core.utilities.loggerFor
|
import net.corda.core.utilities.loggerFor
|
||||||
import net.corda.node.services.config.CertChainPolicyConfig
|
import net.corda.node.services.config.*
|
||||||
import net.corda.node.services.config.NodeConfiguration
|
|
||||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
|
||||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||||
import net.corda.nodeapi.internal.bridging.AMQPBridgeManager
|
import net.corda.nodeapi.internal.bridging.AMQPBridgeManager
|
||||||
import net.corda.nodeapi.internal.bridging.BridgeManager
|
import net.corda.nodeapi.internal.bridging.BridgeManager
|
||||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
|
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
|
||||||
@ -73,7 +71,7 @@ class AMQPBridgeTest {
|
|||||||
val artemis = artemisClient.started!!
|
val artemis = artemisClient.started!!
|
||||||
for (i in 0 until 3) {
|
for (i in 0 until 3) {
|
||||||
val artemisMessage = artemis.session.createMessage(true).apply {
|
val artemisMessage = artemis.session.createMessage(true).apply {
|
||||||
putIntProperty("CountProp", i)
|
putIntProperty(P2PMessagingHeaders.senderUUID, i)
|
||||||
writeBodyBufferBytes("Test$i".toByteArray())
|
writeBodyBufferBytes("Test$i".toByteArray())
|
||||||
// Use the magic deduplication property built into Artemis as our message identity too
|
// Use the magic deduplication property built into Artemis as our message identity too
|
||||||
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||||
@ -97,7 +95,7 @@ class AMQPBridgeTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
val received1 = receive.next()
|
val received1 = receive.next()
|
||||||
val messageID1 = received1.applicationProperties["CountProp"] as Int
|
val messageID1 = received1.applicationProperties[P2PMessagingHeaders.senderUUID.toString()] as Int
|
||||||
assertArrayEquals("Test$messageID1".toByteArray(), received1.payload)
|
assertArrayEquals("Test$messageID1".toByteArray(), received1.payload)
|
||||||
assertEquals(0, messageID1)
|
assertEquals(0, messageID1)
|
||||||
dedupeSet += received1.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
|
dedupeSet += received1.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
|
||||||
@ -106,7 +104,7 @@ class AMQPBridgeTest {
|
|||||||
atNodeSequence += messageID1
|
atNodeSequence += messageID1
|
||||||
|
|
||||||
val received2 = receive.next()
|
val received2 = receive.next()
|
||||||
val messageID2 = received2.applicationProperties["CountProp"] as Int
|
val messageID2 = received2.applicationProperties[P2PMessagingHeaders.senderUUID.toString()] as Int
|
||||||
assertArrayEquals("Test$messageID2".toByteArray(), received2.payload)
|
assertArrayEquals("Test$messageID2".toByteArray(), received2.payload)
|
||||||
assertEquals(1, messageID2, formatMessage("1", messageID2, receivedSequence))
|
assertEquals(1, messageID2, formatMessage("1", messageID2, receivedSequence))
|
||||||
received2.complete(false) // Reject message and don't add to dedupe
|
received2.complete(false) // Reject message and don't add to dedupe
|
||||||
@ -115,7 +113,7 @@ class AMQPBridgeTest {
|
|||||||
// drop things until we get back to the replay
|
// drop things until we get back to the replay
|
||||||
while (true) {
|
while (true) {
|
||||||
val received3 = receive.next()
|
val received3 = receive.next()
|
||||||
val messageID3 = received3.applicationProperties["CountProp"] as Int
|
val messageID3 = received3.applicationProperties[P2PMessagingHeaders.senderUUID.toString()] as Int
|
||||||
assertArrayEquals("Test$messageID3".toByteArray(), received3.payload)
|
assertArrayEquals("Test$messageID3".toByteArray(), received3.payload)
|
||||||
receivedSequence += messageID3
|
receivedSequence += messageID3
|
||||||
if (messageID3 != 1) { // keep rejecting any batched items following rejection
|
if (messageID3 != 1) { // keep rejecting any batched items following rejection
|
||||||
@ -134,7 +132,7 @@ class AMQPBridgeTest {
|
|||||||
// start receiving again, but discarding duplicates
|
// start receiving again, but discarding duplicates
|
||||||
while (true) {
|
while (true) {
|
||||||
val received4 = receive.next()
|
val received4 = receive.next()
|
||||||
val messageID4 = received4.applicationProperties["CountProp"] as Int
|
val messageID4 = received4.applicationProperties[P2PMessagingHeaders.senderUUID.toString()] as Int
|
||||||
assertArrayEquals("Test$messageID4".toByteArray(), received4.payload)
|
assertArrayEquals("Test$messageID4".toByteArray(), received4.payload)
|
||||||
receivedSequence += messageID4
|
receivedSequence += messageID4
|
||||||
val messageId = received4.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
|
val messageId = received4.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
|
||||||
@ -150,7 +148,7 @@ class AMQPBridgeTest {
|
|||||||
|
|
||||||
// Send a fresh item and check receive
|
// Send a fresh item and check receive
|
||||||
val artemisMessage = artemis.session.createMessage(true).apply {
|
val artemisMessage = artemis.session.createMessage(true).apply {
|
||||||
putIntProperty("CountProp", 3)
|
putIntProperty(P2PMessagingHeaders.senderUUID, 3)
|
||||||
writeBodyBufferBytes("Test3".toByteArray())
|
writeBodyBufferBytes("Test3".toByteArray())
|
||||||
// Use the magic deduplication property built into Artemis as our message identity too
|
// Use the magic deduplication property built into Artemis as our message identity too
|
||||||
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||||
@ -161,7 +159,7 @@ class AMQPBridgeTest {
|
|||||||
// start receiving again, discarding duplicates
|
// start receiving again, discarding duplicates
|
||||||
while (true) {
|
while (true) {
|
||||||
val received5 = receive.next()
|
val received5 = receive.next()
|
||||||
val messageID5 = received5.applicationProperties["CountProp"] as Int
|
val messageID5 = received5.applicationProperties[P2PMessagingHeaders.senderUUID.toString()] as Int
|
||||||
assertArrayEquals("Test$messageID5".toByteArray(), received5.payload)
|
assertArrayEquals("Test$messageID5".toByteArray(), received5.payload)
|
||||||
receivedSequence += messageID5
|
receivedSequence += messageID5
|
||||||
val messageId = received5.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
|
val messageId = received5.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
|
||||||
|
@ -176,10 +176,3 @@ interface AcknowledgeHandle {
|
|||||||
|
|
||||||
typealias MessageHandler = (ReceivedMessage, MessageHandlerRegistration, AcknowledgeHandle) -> Unit
|
typealias MessageHandler = (ReceivedMessage, MessageHandlerRegistration, AcknowledgeHandle) -> Unit
|
||||||
|
|
||||||
object P2PMessagingHeaders {
|
|
||||||
|
|
||||||
object Type {
|
|
||||||
const val KEY = "corda_p2p_message_type"
|
|
||||||
const val SESSION_INIT_VALUE = "session_init"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -18,13 +18,12 @@ import net.corda.core.utilities.contextLogger
|
|||||||
import net.corda.core.utilities.trace
|
import net.corda.core.utilities.trace
|
||||||
import net.corda.node.VersionInfo
|
import net.corda.node.VersionInfo
|
||||||
import net.corda.node.services.statemachine.FlowMessagingImpl
|
import net.corda.node.services.statemachine.FlowMessagingImpl
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException
|
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException
|
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||||
import java.util.*
|
|
||||||
import java.util.concurrent.ArrayBlockingQueue
|
import java.util.concurrent.ArrayBlockingQueue
|
||||||
import java.util.concurrent.ExecutionException
|
import java.util.concurrent.ExecutionException
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
@ -161,18 +160,18 @@ class MessagingExecutor(
|
|||||||
|
|
||||||
internal fun cordaToArtemisMessage(message: Message): ClientMessage? {
|
internal fun cordaToArtemisMessage(message: Message): ClientMessage? {
|
||||||
return session.createMessage(true).apply {
|
return session.createMessage(true).apply {
|
||||||
putStringProperty(P2PMessagingClient.cordaVendorProperty, cordaVendor)
|
putStringProperty(P2PMessagingHeaders.cordaVendorProperty, cordaVendor)
|
||||||
putStringProperty(P2PMessagingClient.releaseVersionProperty, releaseVersion)
|
putStringProperty(P2PMessagingHeaders.releaseVersionProperty, releaseVersion)
|
||||||
putIntProperty(P2PMessagingClient.platformVersionProperty, versionInfo.platformVersion)
|
putIntProperty(P2PMessagingHeaders.platformVersionProperty, versionInfo.platformVersion)
|
||||||
putStringProperty(P2PMessagingClient.topicProperty, SimpleString(message.topic))
|
putStringProperty(P2PMessagingHeaders.topicProperty, SimpleString(message.topic))
|
||||||
sendMessageSizeMetric.update(message.data.bytes.size)
|
sendMessageSizeMetric.update(message.data.bytes.size)
|
||||||
writeBodyBufferBytes(message.data.bytes)
|
writeBodyBufferBytes(message.data.bytes)
|
||||||
// Use the magic deduplication property built into Artemis as our message identity too
|
// Use the magic deduplication property built into Artemis as our message identity too
|
||||||
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString))
|
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString))
|
||||||
// If we are the sender (ie. we are not going through recovery of some sort), use sequence number short cut.
|
// If we are the sender (ie. we are not going through recovery of some sort), use sequence number short cut.
|
||||||
if (ourSenderUUID == message.senderUUID) {
|
if (ourSenderUUID == message.senderUUID) {
|
||||||
putStringProperty(P2PMessagingClient.senderUUID, SimpleString(ourSenderUUID))
|
putStringProperty(P2PMessagingHeaders.senderUUID, SimpleString(ourSenderUUID))
|
||||||
putLongProperty(P2PMessagingClient.senderSeqNo, ourSenderSeqNo.getAndIncrement())
|
putLongProperty(P2PMessagingHeaders.senderSeqNo, ourSenderSeqNo.getAndIncrement())
|
||||||
}
|
}
|
||||||
// For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended
|
// For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended
|
||||||
if (amqDelayMillis > 0 && message.topic == FlowMessagingImpl.sessionTopic) {
|
if (amqDelayMillis > 0 && message.topic == FlowMessagingImpl.sessionTopic) {
|
||||||
|
@ -27,11 +27,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
|
|||||||
import net.corda.core.serialization.deserialize
|
import net.corda.core.serialization.deserialize
|
||||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||||
import net.corda.core.serialization.serialize
|
import net.corda.core.serialization.serialize
|
||||||
import net.corda.core.utilities.ByteSequence
|
import net.corda.core.utilities.*
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
|
||||||
import net.corda.core.utilities.OpaqueBytes
|
|
||||||
import net.corda.core.utilities.contextLogger
|
|
||||||
import net.corda.core.utilities.trace
|
|
||||||
import net.corda.node.VersionInfo
|
import net.corda.node.VersionInfo
|
||||||
import net.corda.node.internal.LifecycleSupport
|
import net.corda.node.internal.LifecycleSupport
|
||||||
import net.corda.node.internal.artemis.ReactiveArtemisConsumer.Companion.multiplex
|
import net.corda.node.internal.artemis.ReactiveArtemisConsumer.Companion.multiplex
|
||||||
@ -43,13 +39,11 @@ import net.corda.node.utilities.PersistentMap
|
|||||||
import net.corda.nodeapi.ArtemisTcpTransport
|
import net.corda.nodeapi.ArtemisTcpTransport
|
||||||
import net.corda.nodeapi.ConnectionDirection
|
import net.corda.nodeapi.ConnectionDirection
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
|
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress
|
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress
|
|
||||||
import net.corda.nodeapi.internal.bridging.BridgeControl
|
import net.corda.nodeapi.internal.bridging.BridgeControl
|
||||||
import net.corda.nodeapi.internal.bridging.BridgeEntry
|
import net.corda.nodeapi.internal.bridging.BridgeEntry
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
@ -59,12 +53,7 @@ import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
|
|||||||
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
|
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType
|
import org.apache.activemq.artemis.api.core.RoutingType
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
import org.apache.activemq.artemis.api.core.client.*
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator
|
|
||||||
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.Subscription
|
import rx.Subscription
|
||||||
@ -124,17 +113,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
|||||||
) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver, AutoCloseable {
|
) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver, AutoCloseable {
|
||||||
companion object {
|
companion object {
|
||||||
private val log = contextLogger()
|
private val log = contextLogger()
|
||||||
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
|
private val amqDelayMillis = System.getProperty("amq.delivery.delay.ms", "0").toInt()
|
||||||
// We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint
|
|
||||||
// that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid
|
|
||||||
// confusion.
|
|
||||||
val topicProperty = SimpleString("platform-topic")
|
|
||||||
val cordaVendorProperty = SimpleString("corda-vendor")
|
|
||||||
val releaseVersionProperty = SimpleString("release-version")
|
|
||||||
val platformVersionProperty = SimpleString("platform-version")
|
|
||||||
val senderUUID = SimpleString("sender-uuid")
|
|
||||||
val senderSeqNo = SimpleString("send-seq-no")
|
|
||||||
|
|
||||||
private const val messageMaxRetryCount: Int = 3
|
private const val messageMaxRetryCount: Int = 3
|
||||||
|
|
||||||
fun createMessageToRedeliver(): PersistentMap<Long, Pair<Message, MessageRecipients>, RetryMessage, Long> {
|
fun createMessageToRedeliver(): PersistentMap<Long, Pair<Message, MessageRecipients>, RetryMessage, Long> {
|
||||||
@ -403,13 +382,13 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
|||||||
|
|
||||||
private fun artemisToCordaMessage(message: ClientMessage): ReceivedMessage? {
|
private fun artemisToCordaMessage(message: ClientMessage): ReceivedMessage? {
|
||||||
try {
|
try {
|
||||||
val topic = message.required(topicProperty) { getStringProperty(it) }
|
val topic = message.required(P2PMessagingHeaders.topicProperty) { getStringProperty(it) }
|
||||||
val user = requireNotNull(message.getStringProperty(HDR_VALIDATED_USER)) { "Message is not authenticated" }
|
val user = requireNotNull(message.getStringProperty(HDR_VALIDATED_USER)) { "Message is not authenticated" }
|
||||||
val platformVersion = message.required(platformVersionProperty) { getIntProperty(it) }
|
val platformVersion = message.required(P2PMessagingHeaders.platformVersionProperty) { getIntProperty(it) }
|
||||||
// Use the magic deduplication property built into Artemis as our message identity too
|
// Use the magic deduplication property built into Artemis as our message identity too
|
||||||
val uniqueMessageId = message.required(HDR_DUPLICATE_DETECTION_ID) { DeduplicationId(message.getStringProperty(it)) }
|
val uniqueMessageId = message.required(HDR_DUPLICATE_DETECTION_ID) { DeduplicationId(message.getStringProperty(it)) }
|
||||||
val receivedSenderUUID = message.getStringProperty(senderUUID)
|
val receivedSenderUUID = message.getStringProperty(P2PMessagingHeaders.senderUUID)
|
||||||
val receivedSenderSeqNo = if (message.containsProperty(senderSeqNo)) message.getLongProperty(senderSeqNo) else null
|
val receivedSenderSeqNo = if (message.containsProperty(P2PMessagingHeaders.senderSeqNo)) message.getLongProperty(P2PMessagingHeaders.senderSeqNo) else null
|
||||||
log.trace { "Received message from: ${message.address} user: $user topic: $topic id: $uniqueMessageId senderUUID: $receivedSenderUUID senderSeqNo: $receivedSenderSeqNo" }
|
log.trace { "Received message from: ${message.address} user: $user topic: $topic id: $uniqueMessageId senderUUID: $receivedSenderUUID senderSeqNo: $receivedSenderSeqNo" }
|
||||||
|
|
||||||
return ArtemisReceivedMessage(topic, CordaX500Name.parse(user), platformVersion, uniqueMessageId, receivedSenderUUID, receivedSenderSeqNo, message)
|
return ArtemisReceivedMessage(topic, CordaX500Name.parse(user), platformVersion, uniqueMessageId, receivedSenderUUID, receivedSenderSeqNo, message)
|
||||||
|
@ -20,8 +20,8 @@ import net.corda.core.utilities.contextLogger
|
|||||||
import net.corda.core.utilities.trace
|
import net.corda.core.utilities.trace
|
||||||
import net.corda.node.services.api.ServiceHubInternal
|
import net.corda.node.services.api.ServiceHubInternal
|
||||||
import net.corda.node.services.messaging.AcknowledgeHandle
|
import net.corda.node.services.messaging.AcknowledgeHandle
|
||||||
import net.corda.node.services.messaging.P2PMessagingHeaders
|
|
||||||
import net.corda.node.services.messaging.ReceivedMessage
|
import net.corda.node.services.messaging.ReceivedMessage
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||||
import java.io.NotSerializableException
|
import java.io.NotSerializableException
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
x
Reference in New Issue
Block a user