diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index 7953574dcb..cdd057dcdd 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -15,11 +15,8 @@ import com.nhaarman.mockito_kotlin.whenever import net.corda.core.crypto.toStringShort import net.corda.core.internal.div import net.corda.core.utilities.NetworkHostAndPort -import net.corda.node.services.config.* import net.corda.core.utilities.loggerFor -import net.corda.node.services.config.CertChainPolicyConfig -import net.corda.node.services.config.NodeConfiguration -import net.corda.node.services.config.configureWithDevSSLCertificate +import net.corda.node.services.config.* import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingComponent diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt index a49141160d..0da1beccd4 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt @@ -18,13 +18,12 @@ import net.corda.core.utilities.contextLogger import net.corda.core.utilities.trace import net.corda.node.VersionInfo import net.corda.node.services.statemachine.FlowMessagingImpl +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException -import org.apache.activemq.artemis.api.core.ActiveMQException import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ClientMessage import org.apache.activemq.artemis.api.core.client.ClientProducer import org.apache.activemq.artemis.api.core.client.ClientSession -import java.util.* import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.ExecutionException import java.util.concurrent.atomic.AtomicLong @@ -161,18 +160,18 @@ class MessagingExecutor( internal fun cordaToArtemisMessage(message: Message): ClientMessage? { return session.createMessage(true).apply { - putStringProperty(P2PMessagingClient.cordaVendorProperty, cordaVendor) - putStringProperty(P2PMessagingClient.releaseVersionProperty, releaseVersion) - putIntProperty(P2PMessagingClient.platformVersionProperty, versionInfo.platformVersion) - putStringProperty(P2PMessagingClient.topicProperty, SimpleString(message.topic)) + putStringProperty(P2PMessagingHeaders.cordaVendorProperty, cordaVendor) + putStringProperty(P2PMessagingHeaders.releaseVersionProperty, releaseVersion) + putIntProperty(P2PMessagingHeaders.platformVersionProperty, versionInfo.platformVersion) + putStringProperty(P2PMessagingHeaders.topicProperty, SimpleString(message.topic)) sendMessageSizeMetric.update(message.data.bytes.size) writeBodyBufferBytes(message.data.bytes) // Use the magic deduplication property built into Artemis as our message identity too putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString)) // If we are the sender (ie. we are not going through recovery of some sort), use sequence number short cut. if (ourSenderUUID == message.senderUUID) { - putStringProperty(P2PMessagingClient.senderUUID, SimpleString(ourSenderUUID)) - putLongProperty(P2PMessagingClient.senderSeqNo, ourSenderSeqNo.getAndIncrement()) + putStringProperty(P2PMessagingHeaders.senderUUID, SimpleString(ourSenderUUID)) + putLongProperty(P2PMessagingHeaders.senderSeqNo, ourSenderSeqNo.getAndIncrement()) } // For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended if (amqDelayMillis > 0 && message.topic == FlowMessagingImpl.sessionTopic) { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 1f7c18d442..add62c381a 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -27,11 +27,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.serialization.serialize -import net.corda.core.utilities.ByteSequence -import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.OpaqueBytes -import net.corda.core.utilities.contextLogger -import net.corda.core.utilities.trace +import net.corda.core.utilities.* import net.corda.node.VersionInfo import net.corda.node.internal.LifecycleSupport import net.corda.node.internal.artemis.ReactiveArtemisConsumer.Companion.multiplex @@ -43,14 +39,11 @@ import net.corda.node.utilities.PersistentMap import net.corda.nodeapi.ArtemisTcpTransport import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.internal.ArtemisMessagingComponent -import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress +import net.corda.nodeapi.internal.ArtemisMessagingComponent.* import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX -import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress -import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress -import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress import net.corda.nodeapi.internal.bridging.BridgeControl import net.corda.nodeapi.internal.bridging.BridgeEntry import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -60,12 +53,7 @@ import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString -import org.apache.activemq.artemis.api.core.client.ActiveMQClient -import org.apache.activemq.artemis.api.core.client.ClientConsumer -import org.apache.activemq.artemis.api.core.client.ClientMessage -import org.apache.activemq.artemis.api.core.client.ClientProducer -import org.apache.activemq.artemis.api.core.client.ClientSession -import org.apache.activemq.artemis.api.core.client.ServerLocator +import org.apache.activemq.artemis.api.core.client.* import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY import rx.Observable import rx.Subscription @@ -399,8 +387,8 @@ class P2PMessagingClient(val config: NodeConfiguration, val platformVersion = message.required(P2PMessagingHeaders.platformVersionProperty) { getIntProperty(it) } // Use the magic deduplication property built into Artemis as our message identity too val uniqueMessageId = message.required(HDR_DUPLICATE_DETECTION_ID) { DeduplicationId(message.getStringProperty(it)) } - val receivedSenderUUID = message.getStringProperty(senderUUID) - val receivedSenderSeqNo = if (message.containsProperty(senderSeqNo)) message.getLongProperty(senderSeqNo) else null + val receivedSenderUUID = message.getStringProperty(P2PMessagingHeaders.senderUUID) + val receivedSenderSeqNo = if (message.containsProperty(P2PMessagingHeaders.senderSeqNo)) message.getLongProperty(P2PMessagingHeaders.senderSeqNo) else null log.trace { "Received message from: ${message.address} user: $user topic: $topic id: $uniqueMessageId senderUUID: $receivedSenderUUID senderSeqNo: $receivedSenderSeqNo" } return ArtemisReceivedMessage(topic, CordaX500Name.parse(user), platformVersion, uniqueMessageId, receivedSenderUUID, receivedSenderSeqNo, message) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt index 431a3701ed..017e8dda6b 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt @@ -20,8 +20,8 @@ import net.corda.core.utilities.contextLogger import net.corda.core.utilities.trace import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.messaging.AcknowledgeHandle -import net.corda.node.services.messaging.P2PMessagingHeaders import net.corda.node.services.messaging.ReceivedMessage +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import java.io.NotSerializableException /**