From 519644ce0de21bef7ee16109184816041734b47e Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Wed, 7 Mar 2018 08:56:58 +0000 Subject: [PATCH 1/2] CORDA-1170: Define and whitelist the Artemis/AMQP application headers that are accepted by Corda (#2728) * Whitelist headers copied across bridges * Address PR comments --- .../internal/ArtemisMessagingComponent.kt | 45 +++++++++++++++++++ .../internal/bridging/AMQPBridgeManager.kt | 13 +++--- .../net/corda/node/amqp/AMQPBridgeTest.kt | 15 ++++--- .../node/services/messaging/Messaging.kt | 7 --- .../services/messaging/P2PMessagingClient.kt | 23 ++++------ .../statemachine/StateMachineManagerImpl.kt | 8 +--- 6 files changed, 71 insertions(+), 40 deletions(-) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingComponent.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingComponent.kt index cd9ae5bc6d..4c55f77e87 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingComponent.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingComponent.kt @@ -7,6 +7,8 @@ import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.serialization.CordaSerializable import net.corda.core.utilities.NetworkHostAndPort +import org.apache.activemq.artemis.api.core.Message +import org.apache.activemq.artemis.api.core.SimpleString import java.security.PublicKey /** @@ -28,6 +30,49 @@ class ArtemisMessagingComponent { const val BRIDGE_CONTROL = "${INTERNAL_PREFIX}bridge.control" const val BRIDGE_NOTIFY = "${INTERNAL_PREFIX}bridge.notify" const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications" + + /** + * In the operation mode where we have an out of process bridge we cannot correctly populate the Artemis validated user header + * as the TLS does not terminate directly onto Artemis. We therefore use this internal only header to forward + * the equivalent information from the Float. + */ + val bridgedCertificateSubject = SimpleString("sender-subject-name") + + object P2PMessagingHeaders { + // This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic". + // We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint + // that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid + // confusion. + val topicProperty = SimpleString("platform-topic") + val cordaVendorProperty = SimpleString("corda-vendor") + val releaseVersionProperty = SimpleString("release-version") + val platformVersionProperty = SimpleString("platform-version") + val senderUUID = SimpleString("sender-uuid") + val senderSeqNo = SimpleString("send-seq-no") + /** + * In the operation mode where we have an out of process bridge we cannot correctly populate the Artemis validated user header + * as the TLS does not terminate directly onto Artemis. We therefore use this internal only header to forward + * the equivalent information from the Float. + */ + val bridgedCertificateSubject = SimpleString("sender-subject-name") + + + object Type { + const val KEY = "corda_p2p_message_type" + const val SESSION_INIT_VALUE = "session_init" + } + + val whitelistedHeaders: Set = setOf(topicProperty.toString(), + cordaVendorProperty.toString(), + releaseVersionProperty.toString(), + platformVersionProperty.toString(), + senderUUID.toString(), + senderSeqNo.toString(), + bridgedCertificateSubject.toString(), + Type.KEY, + Message.HDR_DUPLICATE_DETECTION_ID.toString(), + Message.HDR_VALIDATED_USER.toString()) + } } interface ArtemisAddress : MessageRecipients { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt index 13b73bf3b5..2c771cfd13 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt @@ -10,6 +10,7 @@ import net.corda.core.utilities.debug import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress import net.corda.nodeapi.internal.ArtemisSessionProvider @@ -128,12 +129,14 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) { val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) } val properties = HashMap() - for (key in artemisMessage.propertyNames) { - var value = artemisMessage.getObjectProperty(key) - if (value is SimpleString) { - value = value.toString() + for (key in P2PMessagingHeaders.whitelistedHeaders) { + if (artemisMessage.containsProperty(key)) { + var value = artemisMessage.getObjectProperty(key) + if (value is SimpleString) { + value = value.toString() + } + properties[key] = value } - properties[key.toString()] = value } log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" } val peerInbox = translateLocalQueueToInboxAddress(queueName) diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index 38804f56d2..1b5dd8f5e0 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -12,6 +12,7 @@ import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingComponent +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.bridging.AMQPBridgeManager import net.corda.nodeapi.internal.bridging.BridgeManager import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer @@ -56,7 +57,7 @@ class AMQPBridgeTest { val artemis = artemisClient.started!! for (i in 0 until 3) { val artemisMessage = artemis.session.createMessage(true).apply { - putIntProperty("CountProp", i) + putIntProperty(P2PMessagingHeaders.senderUUID, i) writeBodyBufferBytes("Test$i".toByteArray()) // Use the magic deduplication property built into Artemis as our message identity too putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) @@ -80,7 +81,7 @@ class AMQPBridgeTest { } val received1 = receive.next() - val messageID1 = received1.applicationProperties["CountProp"] as Int + val messageID1 = received1.applicationProperties[P2PMessagingHeaders.senderUUID.toString()] as Int assertArrayEquals("Test$messageID1".toByteArray(), received1.payload) assertEquals(0, messageID1) dedupeSet += received1.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String @@ -89,7 +90,7 @@ class AMQPBridgeTest { atNodeSequence += messageID1 val received2 = receive.next() - val messageID2 = received2.applicationProperties["CountProp"] as Int + val messageID2 = received2.applicationProperties[P2PMessagingHeaders.senderUUID.toString()] as Int assertArrayEquals("Test$messageID2".toByteArray(), received2.payload) assertEquals(1, messageID2, formatMessage("1", messageID2, receivedSequence)) received2.complete(false) // Reject message and don't add to dedupe @@ -98,7 +99,7 @@ class AMQPBridgeTest { // drop things until we get back to the replay while (true) { val received3 = receive.next() - val messageID3 = received3.applicationProperties["CountProp"] as Int + val messageID3 = received3.applicationProperties[P2PMessagingHeaders.senderUUID.toString()] as Int assertArrayEquals("Test$messageID3".toByteArray(), received3.payload) receivedSequence += messageID3 if (messageID3 != 1) { // keep rejecting any batched items following rejection @@ -117,7 +118,7 @@ class AMQPBridgeTest { // start receiving again, but discarding duplicates while (true) { val received4 = receive.next() - val messageID4 = received4.applicationProperties["CountProp"] as Int + val messageID4 = received4.applicationProperties[P2PMessagingHeaders.senderUUID.toString()] as Int assertArrayEquals("Test$messageID4".toByteArray(), received4.payload) receivedSequence += messageID4 val messageId = received4.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String @@ -133,7 +134,7 @@ class AMQPBridgeTest { // Send a fresh item and check receive val artemisMessage = artemis.session.createMessage(true).apply { - putIntProperty("CountProp", 3) + putIntProperty(P2PMessagingHeaders.senderUUID, 3) writeBodyBufferBytes("Test3".toByteArray()) // Use the magic deduplication property built into Artemis as our message identity too putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) @@ -144,7 +145,7 @@ class AMQPBridgeTest { // start receiving again, discarding duplicates while (true) { val received5 = receive.next() - val messageID5 = received5.applicationProperties["CountProp"] as Int + val messageID5 = received5.applicationProperties[P2PMessagingHeaders.senderUUID.toString()] as Int assertArrayEquals("Test$messageID5".toByteArray(), received5.payload) receivedSequence += messageID5 val messageId = received5.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt index 383705f518..24620f274e 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt @@ -147,10 +147,3 @@ object TopicStringValidator { fun check(tag: String) = require(regex.matcher(tag).matches()) } -object P2PMessagingHeaders { - - object Type { - const val KEY = "corda_p2p_message_type" - const val SESSION_INIT_VALUE = "session_init" - } -} diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 5f050d2ec5..b115201602 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -30,6 +30,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.ArtemisMessagingComponent.* import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX import net.corda.nodeapi.internal.bridging.BridgeControl import net.corda.nodeapi.internal.bridging.BridgeEntry @@ -97,14 +98,6 @@ class P2PMessagingClient(private val config: NodeConfiguration, ) : SingletonSerializeAsToken(), MessagingService, AutoCloseable { companion object { private val log = contextLogger() - // This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic". - // We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint - // that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid - // confusion. - private val topicProperty = SimpleString("platform-topic") - private val cordaVendorProperty = SimpleString("corda-vendor") - private val releaseVersionProperty = SimpleString("release-version") - private val platformVersionProperty = SimpleString("platform-version") private val amqDelayMillis = System.getProperty("amq.delivery.delay.ms", "0").toInt() private const val messageMaxRetryCount: Int = 3 @@ -392,9 +385,9 @@ class P2PMessagingClient(private val config: NodeConfiguration, private fun artemisToCordaMessage(message: ClientMessage): ReceivedMessage? { try { - val topic = message.required(topicProperty) { getStringProperty(it) } + val topic = message.required(P2PMessagingHeaders.topicProperty) { getStringProperty(it) } val user = requireNotNull(message.getStringProperty(HDR_VALIDATED_USER)) { "Message is not authenticated" } - val platformVersion = message.required(platformVersionProperty) { getIntProperty(it) } + val platformVersion = message.required(P2PMessagingHeaders.platformVersionProperty) { getIntProperty(it) } // Use the magic deduplication property built into Artemis as our message identity too val uuid = message.required(HDR_DUPLICATE_DETECTION_ID) { message.getStringProperty(it) } log.info("Received message from: ${message.address} user: $user topic: $topic uuid: $uuid") @@ -523,13 +516,13 @@ class P2PMessagingClient(private val config: NodeConfiguration, state.locked { val mqAddress = getMQAddress(target) val artemisMessage = producerSession!!.createMessage(true).apply { - putStringProperty(cordaVendorProperty, cordaVendor) - putStringProperty(releaseVersionProperty, releaseVersion) - putIntProperty(platformVersionProperty, versionInfo.platformVersion) - putStringProperty(topicProperty, SimpleString(message.topic)) + putStringProperty(P2PMessagingHeaders.cordaVendorProperty, cordaVendor) + putStringProperty(P2PMessagingHeaders.releaseVersionProperty, releaseVersion) + putIntProperty(P2PMessagingHeaders.platformVersionProperty, versionInfo.platformVersion) + putStringProperty(P2PMessagingHeaders.topicProperty, SimpleString(message.topic)) writeBodyBufferBytes(message.data.bytes) // Use the magic deduplication property built into Artemis as our message identity too - putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString())) + putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId)) // For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended if (amqDelayMillis > 0 && message.topic == StateMachineManagerImpl.sessionTopic) { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt index 1ac69e1a59..a4fcc7a440 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt @@ -19,12 +19,8 @@ import net.corda.core.flows.FlowInfo import net.corda.core.flows.FlowLogic import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party -import net.corda.core.internal.FlowStateMachine -import net.corda.core.internal.ThreadBox -import net.corda.core.internal.bufferUntilSubscribed -import net.corda.core.internal.castIfPossible +import net.corda.core.internal.* import net.corda.core.internal.concurrent.doneFuture -import net.corda.core.internal.uncheckedCast import net.corda.core.messaging.DataFeed import net.corda.core.serialization.SerializationDefaults.CHECKPOINT_CONTEXT import net.corda.core.serialization.SerializationDefaults.SERIALIZATION_FACTORY @@ -40,10 +36,10 @@ import net.corda.node.services.api.Checkpoint import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.config.shouldCheckCheckpoints -import net.corda.node.services.messaging.P2PMessagingHeaders import net.corda.node.services.messaging.ReceivedMessage import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.newNamedSingleThreadExecutor +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction From b6831eed6aa1dc47d8d1683f71564d52768d2b5f Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Wed, 7 Mar 2018 11:37:46 +0000 Subject: [PATCH 2/2] Merge remote-tracking branch 'remotes/open/master' into mnesbit-merge-20180307 # Conflicts: # node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt # node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt # node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt --- .../net/corda/node/amqp/AMQPBridgeTest.kt | 5 +---- .../services/messaging/MessagingExecutor.kt | 15 ++++++------- .../services/messaging/P2PMessagingClient.kt | 22 +++++-------------- .../services/statemachine/FlowMessaging.kt | 2 +- 4 files changed, 14 insertions(+), 30 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index 7953574dcb..cdd057dcdd 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -15,11 +15,8 @@ import com.nhaarman.mockito_kotlin.whenever import net.corda.core.crypto.toStringShort import net.corda.core.internal.div import net.corda.core.utilities.NetworkHostAndPort -import net.corda.node.services.config.* import net.corda.core.utilities.loggerFor -import net.corda.node.services.config.CertChainPolicyConfig -import net.corda.node.services.config.NodeConfiguration -import net.corda.node.services.config.configureWithDevSSLCertificate +import net.corda.node.services.config.* import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingComponent diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt index a49141160d..0da1beccd4 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt @@ -18,13 +18,12 @@ import net.corda.core.utilities.contextLogger import net.corda.core.utilities.trace import net.corda.node.VersionInfo import net.corda.node.services.statemachine.FlowMessagingImpl +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException -import org.apache.activemq.artemis.api.core.ActiveMQException import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ClientMessage import org.apache.activemq.artemis.api.core.client.ClientProducer import org.apache.activemq.artemis.api.core.client.ClientSession -import java.util.* import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.ExecutionException import java.util.concurrent.atomic.AtomicLong @@ -161,18 +160,18 @@ class MessagingExecutor( internal fun cordaToArtemisMessage(message: Message): ClientMessage? { return session.createMessage(true).apply { - putStringProperty(P2PMessagingClient.cordaVendorProperty, cordaVendor) - putStringProperty(P2PMessagingClient.releaseVersionProperty, releaseVersion) - putIntProperty(P2PMessagingClient.platformVersionProperty, versionInfo.platformVersion) - putStringProperty(P2PMessagingClient.topicProperty, SimpleString(message.topic)) + putStringProperty(P2PMessagingHeaders.cordaVendorProperty, cordaVendor) + putStringProperty(P2PMessagingHeaders.releaseVersionProperty, releaseVersion) + putIntProperty(P2PMessagingHeaders.platformVersionProperty, versionInfo.platformVersion) + putStringProperty(P2PMessagingHeaders.topicProperty, SimpleString(message.topic)) sendMessageSizeMetric.update(message.data.bytes.size) writeBodyBufferBytes(message.data.bytes) // Use the magic deduplication property built into Artemis as our message identity too putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString)) // If we are the sender (ie. we are not going through recovery of some sort), use sequence number short cut. if (ourSenderUUID == message.senderUUID) { - putStringProperty(P2PMessagingClient.senderUUID, SimpleString(ourSenderUUID)) - putLongProperty(P2PMessagingClient.senderSeqNo, ourSenderSeqNo.getAndIncrement()) + putStringProperty(P2PMessagingHeaders.senderUUID, SimpleString(ourSenderUUID)) + putLongProperty(P2PMessagingHeaders.senderSeqNo, ourSenderSeqNo.getAndIncrement()) } // For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended if (amqDelayMillis > 0 && message.topic == FlowMessagingImpl.sessionTopic) { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 1f7c18d442..add62c381a 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -27,11 +27,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.serialization.serialize -import net.corda.core.utilities.ByteSequence -import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.OpaqueBytes -import net.corda.core.utilities.contextLogger -import net.corda.core.utilities.trace +import net.corda.core.utilities.* import net.corda.node.VersionInfo import net.corda.node.internal.LifecycleSupport import net.corda.node.internal.artemis.ReactiveArtemisConsumer.Companion.multiplex @@ -43,14 +39,11 @@ import net.corda.node.utilities.PersistentMap import net.corda.nodeapi.ArtemisTcpTransport import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.internal.ArtemisMessagingComponent -import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress +import net.corda.nodeapi.internal.ArtemisMessagingComponent.* import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX -import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress -import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress -import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress import net.corda.nodeapi.internal.bridging.BridgeControl import net.corda.nodeapi.internal.bridging.BridgeEntry import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -60,12 +53,7 @@ import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString -import org.apache.activemq.artemis.api.core.client.ActiveMQClient -import org.apache.activemq.artemis.api.core.client.ClientConsumer -import org.apache.activemq.artemis.api.core.client.ClientMessage -import org.apache.activemq.artemis.api.core.client.ClientProducer -import org.apache.activemq.artemis.api.core.client.ClientSession -import org.apache.activemq.artemis.api.core.client.ServerLocator +import org.apache.activemq.artemis.api.core.client.* import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY import rx.Observable import rx.Subscription @@ -399,8 +387,8 @@ class P2PMessagingClient(val config: NodeConfiguration, val platformVersion = message.required(P2PMessagingHeaders.platformVersionProperty) { getIntProperty(it) } // Use the magic deduplication property built into Artemis as our message identity too val uniqueMessageId = message.required(HDR_DUPLICATE_DETECTION_ID) { DeduplicationId(message.getStringProperty(it)) } - val receivedSenderUUID = message.getStringProperty(senderUUID) - val receivedSenderSeqNo = if (message.containsProperty(senderSeqNo)) message.getLongProperty(senderSeqNo) else null + val receivedSenderUUID = message.getStringProperty(P2PMessagingHeaders.senderUUID) + val receivedSenderSeqNo = if (message.containsProperty(P2PMessagingHeaders.senderSeqNo)) message.getLongProperty(P2PMessagingHeaders.senderSeqNo) else null log.trace { "Received message from: ${message.address} user: $user topic: $topic id: $uniqueMessageId senderUUID: $receivedSenderUUID senderSeqNo: $receivedSenderSeqNo" } return ArtemisReceivedMessage(topic, CordaX500Name.parse(user), platformVersion, uniqueMessageId, receivedSenderUUID, receivedSenderSeqNo, message) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt index 431a3701ed..017e8dda6b 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt @@ -20,8 +20,8 @@ import net.corda.core.utilities.contextLogger import net.corda.core.utilities.trace import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.messaging.AcknowledgeHandle -import net.corda.node.services.messaging.P2PMessagingHeaders import net.corda.node.services.messaging.ReceivedMessage +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import java.io.NotSerializableException /**