From ef45900fda2f286731fbed30673bed4919f80199 Mon Sep 17 00:00:00 2001 From: Rick Parker Date: Mon, 12 Feb 2018 18:48:34 +0000 Subject: [PATCH] ENT-1391 Create P2P message de-duplication fast path. (#443) --- .../node/services/messaging/Messaging.kt | 2 + .../services/messaging/MessagingExecutor.kt | 43 ++++--- .../messaging/P2PMessageDeduplicator.kt | 98 ++++++++++++++ .../services/messaging/P2PMessagingClient.kt | 67 ++++------ .../node/services/schema/NodeSchemaService.kt | 3 +- .../messaging/ArtemisMessagingTest.kt | 121 ++++++++++++++++++ .../testing/node/InMemoryMessagingNetwork.kt | 8 +- 7 files changed, 283 insertions(+), 59 deletions(-) create mode 100644 node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt index 03b6e33517..39f74f78cf 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt @@ -126,6 +126,7 @@ interface Message { val data: ByteSequence val debugTimestamp: Instant val uniqueMessageId: DeduplicationId + val senderUUID: String? } // TODO Have ReceivedMessage point to the TLS certificate of the peer, and [peer] would simply be the subject DN of that. @@ -136,6 +137,7 @@ interface ReceivedMessage : Message { val peer: CordaX500Name /** Platform version of the sender's node. */ val platformVersion: Int + val senderSeqNo: Long? } /** A singleton that's useful for validating topic strings */ diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt index a0e661890a..244fc0a73f 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt @@ -17,6 +17,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession import java.util.* import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.ExecutionException +import java.util.concurrent.atomic.AtomicLong import kotlin.concurrent.thread interface AddressToArtemisQueueResolver { @@ -38,6 +39,7 @@ class MessagingExecutor( val versionInfo: VersionInfo, val resolver: AddressToArtemisQueueResolver, metricRegistry: MetricRegistry, + val ourSenderUUID: String, queueBound: Int ) { private sealed class Job { @@ -59,6 +61,7 @@ class MessagingExecutor( private val sendMessageSizeMetric = metricRegistry.histogram("SendMessageSize") private val sendLatencyMetric = metricRegistry.timer("SendLatency") private val sendBatchSizeMetric = metricRegistry.histogram("SendBatchSize") + private val ourSenderSeqNo = AtomicLong() private companion object { val log = contextLogger() @@ -193,26 +196,34 @@ class MessagingExecutor( private fun sendJob(job: Job.Send) { val mqAddress = resolver.resolveTargetToArtemisQueue(job.target) - val artemisMessage = session.createMessage(true).apply { - putStringProperty(P2PMessagingClient.cordaVendorProperty, cordaVendor) - putStringProperty(P2PMessagingClient.releaseVersionProperty, releaseVersion) - putIntProperty(P2PMessagingClient.platformVersionProperty, versionInfo.platformVersion) - putStringProperty(P2PMessagingClient.topicProperty, SimpleString(job.message.topic)) - sendMessageSizeMetric.update(job.message.data.bytes.size) - writeBodyBufferBytes(job.message.data.bytes) - // Use the magic deduplication property built into Artemis as our message identity too - putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(job.message.uniqueMessageId.toString)) - - // For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended - if (amqDelayMillis > 0 && job.message.topic == FlowMessagingImpl.sessionTopic) { - putLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelayMillis) - } - } + val artemisMessage = cordaToArtemisMessage(job.message) log.trace { "Send to: $mqAddress topic: ${job.message.topic} " + "sessionID: ${job.message.topic} id: ${job.message.uniqueMessageId}" } - producer.send(SimpleString(mqAddress), artemisMessage) { job.sentFuture.set(Unit) } + producer.send(SimpleString(mqAddress), artemisMessage, { job.sentFuture.set(Unit) }) + } + + internal fun cordaToArtemisMessage(message: Message): ClientMessage? { + return session.createMessage(true).apply { + putStringProperty(P2PMessagingClient.cordaVendorProperty, cordaVendor) + putStringProperty(P2PMessagingClient.releaseVersionProperty, releaseVersion) + putIntProperty(P2PMessagingClient.platformVersionProperty, versionInfo.platformVersion) + putStringProperty(P2PMessagingClient.topicProperty, SimpleString(message.topic)) + sendMessageSizeMetric.update(message.data.bytes.size) + writeBodyBufferBytes(message.data.bytes) + // Use the magic deduplication property built into Artemis as our message identity too + putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString)) + // If we are the sender (ie. we are not going through recovery of some sort), use sequence number short cut. + if (ourSenderUUID == message.senderUUID) { + putStringProperty(P2PMessagingClient.senderUUID, SimpleString(ourSenderUUID)) + putLongProperty(P2PMessagingClient.senderSeqNo, ourSenderSeqNo.getAndIncrement()) + } + // For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended + if (amqDelayMillis > 0 && message.topic == FlowMessagingImpl.sessionTopic) { + putLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelayMillis) + } + } } private fun acknowledgeJob(job: Job.Acknowledge) { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt new file mode 100644 index 0000000000..4ffeee86e5 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt @@ -0,0 +1,98 @@ +package net.corda.node.services.messaging + +import com.google.common.cache.CacheBuilder +import net.corda.core.identity.CordaX500Name +import net.corda.node.services.statemachine.DeduplicationId +import net.corda.node.utilities.AppendOnlyPersistentMap +import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX +import java.time.Instant +import java.util.* +import java.util.concurrent.TimeUnit +import javax.persistence.Column +import javax.persistence.Entity +import javax.persistence.Id + +/** + * Encapsulate the de-duplication logic. + */ +class P2PMessageDeduplicator(private val database: CordaPersistence) { + val ourSenderUUID = UUID.randomUUID().toString() + + private val processedMessages = createProcessedMessages() + // We add the peer to the key, so other peers cannot attempt malicious meddling with sequence numbers. + // Expire after 7 days since we last touched an entry, to avoid infinite growth. + private val senderUUIDSeqNoHWM: MutableMap, Long> = CacheBuilder.newBuilder().expireAfterAccess(7, TimeUnit.DAYS).build, Long>().asMap() + + private fun createProcessedMessages(): AppendOnlyPersistentMap { + return AppendOnlyPersistentMap( + toPersistentEntityKey = { it.toString }, + fromPersistentEntity = { Pair(DeduplicationId(it.id), it.insertionTime) }, + toPersistentEntity = { key: DeduplicationId, value: Instant -> + ProcessedMessage().apply { + id = key.toString + insertionTime = value + } + }, + persistentEntityClass = ProcessedMessage::class.java + ) + } + + private fun isDuplicateInDatabase(msg: ReceivedMessage): Boolean = database.transaction { msg.uniqueMessageId in processedMessages } + + /** + * We assign the sender a random identifier [ourSenderUUID] (passed to [MessagingExecutor]). If the sender is also the creator of a message + * (i.e. not from recovered checkpoints), assign a sequence number. Recipients know it is not a duplicate if the sequence number + * is greater than the highest they have seen, otherwise fallback to prior, slower, logic within the database. + * + * The UUIDs will change each time the sender restarts their JVM, and so we may need to prune UUIDs over time. To this end, + * we only remember UUIDs for 7 days from the time we last interacted with them, rebuilding the cached value if we + * ever re-encounter them. + * + * We also ensure the UUID cannot be spoofed, by incorporating the authenticated sender into the key of the map/cache. + */ + private fun isDuplicateWithPotentialOptimization(receivedSenderUUID: String, receivedSenderSeqNo: Long, msg: ReceivedMessage): Boolean { + return senderUUIDSeqNoHWM.compute(Pair(receivedSenderUUID, msg.peer)) { key, existingSeqNoHWM -> + val isNewHWM = (existingSeqNoHWM != null && existingSeqNoHWM < receivedSenderSeqNo) + if (isNewHWM) { + // If we are the new HWM, set the HWM to us. + receivedSenderSeqNo + } else { + // If we are a duplicate, unset the HWM, since it seems like re-delivery is happening for traffic from that sender. + // else if we are not a duplicate, (re)set the HWM to us. + if (isDuplicateInDatabase(msg)) null else receivedSenderSeqNo + } + } != receivedSenderSeqNo + } + + /** + * @return true if we have seen this message before. + */ + fun isDuplicate(msg: ReceivedMessage): Boolean { + val receivedSenderUUID = msg.senderUUID + val receivedSenderSeqNo = msg.senderSeqNo + // If we have received a new higher sequence number, then it cannot be a duplicate, and we don't need to check database. + // If we are seeing a sender for the first time, fall back to a database check. + // If we have no information about the sender, also fall back to a database check. + return if (receivedSenderUUID != null && receivedSenderSeqNo != null) { + isDuplicateWithPotentialOptimization(receivedSenderUUID, receivedSenderSeqNo, msg) + } else { + isDuplicateInDatabase(msg) + } + } + + fun persistDeduplicationId(msg: ReceivedMessage) { + processedMessages[msg.uniqueMessageId] = Instant.now() + } + + @Entity + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}message_ids") + class ProcessedMessage( + @Id + @Column(name = "message_id", length = 64) + var id: String = "", + + @Column(name = "insertion_time") + var insertionTime: Instant = Instant.now() + ) +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index e0769c8f27..bbe7ed437a 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -21,7 +21,6 @@ import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.statemachine.DeduplicationId import net.corda.node.utilities.AffinityExecutor -import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.node.utilities.PersistentMap import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingComponent.* @@ -102,21 +101,10 @@ class P2PMessagingClient(val config: NodeConfiguration, val cordaVendorProperty = SimpleString("corda-vendor") val releaseVersionProperty = SimpleString("release-version") val platformVersionProperty = SimpleString("platform-version") - private val messageMaxRetryCount: Int = 3 + val senderUUID = SimpleString("sender-uuid") + val senderSeqNo = SimpleString("send-seq-no") - fun createProcessedMessages(): AppendOnlyPersistentMap { - return AppendOnlyPersistentMap( - toPersistentEntityKey = { it.toString }, - fromPersistentEntity = { Pair(DeduplicationId(it.id), it.insertionTime) }, - toPersistentEntity = { key: DeduplicationId, value: Instant -> - ProcessedMessage().apply { - id = key.toString - insertionTime = value - } - }, - persistentEntityClass = ProcessedMessage::class.java - ) - } + private val messageMaxRetryCount: Int = 3 fun createMessageToRedeliver(): PersistentMap, RetryMessage, Long> { return PersistentMap( @@ -138,7 +126,7 @@ class P2PMessagingClient(val config: NodeConfiguration, ) } - private class NodeClientMessage(override val topic: String, override val data: ByteSequence, override val uniqueMessageId: DeduplicationId) : Message { + private class NodeClientMessage(override val topic: String, override val data: ByteSequence, override val uniqueMessageId: DeduplicationId, override val senderUUID: String?) : Message { override val debugTimestamp: Instant = Instant.now() override fun toString() = "$topic#${String(data.bytes)}" } @@ -176,19 +164,8 @@ class P2PMessagingClient(val config: NodeConfiguration, private val handlers = ConcurrentHashMap() - private val processedMessages = createProcessedMessages() - private var messagingExecutor: MessagingExecutor? = null - - @Entity - @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}message_ids") - class ProcessedMessage( - @Id - @Column(name = "message_id", length = 64) - var id: String = "", - - @Column(name = "insertion_time") - var insertionTime: Instant = Instant.now() - ) + private val deduplicator = P2PMessageDeduplicator(database) + internal var messagingExecutor: MessagingExecutor? = null @Entity @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}message_retry") @@ -233,7 +210,8 @@ class P2PMessagingClient(val config: NodeConfiguration, versionInfo, this@P2PMessagingClient, metricRegistry, - queueBound = config.enterpriseConfiguration.tuning.maximumMessagingBatchSize + queueBound = config.enterpriseConfiguration.tuning.maximumMessagingBatchSize, + ourSenderUUID = deduplicator.ourSenderUUID ) this@P2PMessagingClient.messagingExecutor = messagingExecutor messagingExecutor.start() @@ -247,7 +225,9 @@ class P2PMessagingClient(val config: NodeConfiguration, private fun InnerState.registerBridgeControl(session: ClientSession, inboxes: List) { val bridgeNotifyQueue = "$BRIDGE_NOTIFY.${myIdentity.toStringShort()}" - session.createTemporaryQueue(BRIDGE_NOTIFY, RoutingType.MULTICAST, bridgeNotifyQueue) + if (!session.queueQuery(SimpleString(bridgeNotifyQueue)).isExists) { + session.createTemporaryQueue(BRIDGE_NOTIFY, RoutingType.MULTICAST, bridgeNotifyQueue) + } val bridgeConsumer = session.createConsumer(bridgeNotifyQueue) bridgeNotifyConsumer = bridgeConsumer bridgeConsumer.setMessageHandler { msg -> @@ -354,9 +334,7 @@ class P2PMessagingClient(val config: NodeConfiguration, null } ?: return false - val message: ReceivedMessage? = artemisToCordaMessage(artemisMessage) - if (message != null) - deliver(artemisMessage, message) + deliver(artemisMessage) return true } @@ -386,9 +364,11 @@ class P2PMessagingClient(val config: NodeConfiguration, val platformVersion = message.required(platformVersionProperty) { getIntProperty(it) } // Use the magic deduplication property built into Artemis as our message identity too val uniqueMessageId = message.required(HDR_DUPLICATE_DETECTION_ID) { DeduplicationId(message.getStringProperty(it)) } - log.trace { "Received message from: ${message.address} user: $user topic: $topic id: $uniqueMessageId" } + val receivedSenderUUID = message.getStringProperty(senderUUID) + val receivedSenderSeqNo = if (message.containsProperty(senderSeqNo)) message.getLongProperty(senderSeqNo) else null + log.trace { "Received message from: ${message.address} user: $user topic: $topic id: $uniqueMessageId senderUUID: $receivedSenderUUID senderSeqNo: $receivedSenderSeqNo" } - return ArtemisReceivedMessage(topic, CordaX500Name.parse(user), platformVersion, uniqueMessageId, message) + return ArtemisReceivedMessage(topic, CordaX500Name.parse(user), platformVersion, uniqueMessageId, receivedSenderUUID, receivedSenderSeqNo, message) } catch (e: Exception) { log.error("Unable to process message, ignoring it: $message", e) return null @@ -404,12 +384,20 @@ class P2PMessagingClient(val config: NodeConfiguration, override val peer: CordaX500Name, override val platformVersion: Int, override val uniqueMessageId: DeduplicationId, + override val senderUUID: String?, + override val senderSeqNo: Long?, private val message: ClientMessage) : ReceivedMessage { override val data: ByteSequence by lazy { OpaqueBytes(ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }) } override val debugTimestamp: Instant get() = Instant.ofEpochMilli(message.timestamp) override fun toString() = "$topic#$data" } + internal fun deliver(artemisMessage: ClientMessage) { + val message: ReceivedMessage? = artemisToCordaMessage(artemisMessage) + if (message != null) + deliver(artemisMessage, message) + } + private fun deliver(artemisMessage: ClientMessage, msg: ReceivedMessage) { state.checkNotLocked() val deliverTo = handlers[msg.topic] @@ -423,14 +411,13 @@ class P2PMessagingClient(val config: NodeConfiguration, // Note that handlers may re-enter this class. We aren't holding any locks and methods like // start/run/stop have re-entrancy assertions at the top, so it is OK. if (deliverTo != null) { - val isDuplicate = database.transaction { msg.uniqueMessageId in processedMessages } - if (isDuplicate) { + if (deduplicator.isDuplicate(msg)) { log.trace { "Discard duplicate message ${msg.uniqueMessageId} for ${msg.topic}" } return } val acknowledgeHandle = object : AcknowledgeHandle { override fun persistDeduplicationId() { - processedMessages[msg.uniqueMessageId] = Instant.now() + deduplicator.persistDeduplicationId(msg) } // ACKing a message calls back into the session which isn't thread safe, so we have to ensure it @@ -601,7 +588,7 @@ class P2PMessagingClient(val config: NodeConfiguration, } override fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId): Message { - return NodeClientMessage(topic, OpaqueBytes(data), deduplicationId) + return NodeClientMessage(topic, OpaqueBytes(data), deduplicationId, deduplicator.ourSenderUUID) } override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients { diff --git a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt index 1d198e7c0e..b9ab9aa88f 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt @@ -14,6 +14,7 @@ import net.corda.node.services.api.SchemaService.SchemaOptions import net.corda.node.services.events.NodeSchedulerService import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.keys.PersistentKeyManagementService +import net.corda.node.services.messaging.P2PMessageDeduplicator import net.corda.node.services.messaging.P2PMessagingClient import net.corda.node.services.persistence.* import net.corda.node.services.transactions.BFTNonValidatingNotaryService @@ -40,7 +41,7 @@ class NodeSchemaService(extraSchemas: Set = emptySet(), includeNot PersistentKeyManagementService.PersistentKey::class.java, NodeSchedulerService.PersistentScheduledState::class.java, NodeAttachmentService.DBAttachment::class.java, - P2PMessagingClient.ProcessedMessage::class.java, + P2PMessageDeduplicator.ProcessedMessage::class.java, P2PMessagingClient.RetryMessage::class.java, NodeAttachmentService.DBAttachment::class.java, PersistentIdentityService.PersistentIdentity::class.java, diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt index c3f9f73b89..ff4a7fa595 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt @@ -18,6 +18,8 @@ import net.corda.testing.internal.LogHelper import net.corda.testing.internal.rigorousMock import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties +import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER +import org.apache.activemq.artemis.api.core.SimpleString import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.After @@ -140,6 +142,124 @@ class ArtemisMessagingTest { assertThat(received.platformVersion).isEqualTo(3) } + @Test + fun `we can fake send and receive`() { + val (messagingClient, receivedMessages) = createAndStartClientAndServer() + val message = messagingClient.createMessage(TOPIC, data = "first msg".toByteArray()) + val fakeMsg = messagingClient.messagingExecutor!!.cordaToArtemisMessage(message) + fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US")) + messagingClient.deliver(fakeMsg) + val received = receivedMessages.take() + assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg") + } + + @Test + fun `redelivery from same client is ignored`() { + val (messagingClient, receivedMessages) = createAndStartClientAndServer() + val message = messagingClient.createMessage(TOPIC, data = "first msg".toByteArray()) + val fakeMsg = messagingClient.messagingExecutor!!.cordaToArtemisMessage(message) + fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US")) + messagingClient.deliver(fakeMsg) + messagingClient.deliver(fakeMsg) + val received = receivedMessages.take() + assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg") + val received2 = receivedMessages.poll() + assertThat(received2).isNull() + } + + // Redelivery from a sender who stops and restarts (some re-sends from the sender, with sender state reset with exception of recovered checkpoints) + @Test + fun `re-send from different client is ignored`() { + val (messagingClient1, receivedMessages) = createAndStartClientAndServer() + val message = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray()) + val fakeMsg = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message) + fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US")) + messagingClient1.deliver(fakeMsg) + + // Now change the sender + try { + val messagingClient2 = createMessagingClient() + startNodeMessagingClient() + val fakeMsg2 = messagingClient2.messagingExecutor!!.cordaToArtemisMessage(message) + fakeMsg2!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US")) + + messagingClient1.deliver(fakeMsg2) + val received = receivedMessages.take() + assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg") + val received2 = receivedMessages.poll() + assertThat(received2).isNull() + } finally { + messagingClient1.stop() + } + } + + // Redelivery to a receiver who stops and restarts (some re-deliveries from Artemis, but with receiver state reset) + @Test + fun `re-receive from different client is ignored`() { + val (messagingClient1, receivedMessages) = createAndStartClientAndServer() + val message = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray()) + val fakeMsg = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message) + fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US")) + messagingClient1.deliver(fakeMsg) + + // Now change the receiver + try { + val messagingClient2 = createMessagingClient() + messagingClient2.addMessageHandler(TOPIC) { message, _, handle -> + database.transaction { handle.persistDeduplicationId() } + handle.acknowledge() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages] + receivedMessages.add(message) + } + startNodeMessagingClient() + + messagingClient2.deliver(fakeMsg) + + val received = receivedMessages.take() + assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg") + val received2 = receivedMessages.poll() + assertThat(received2).isNull() + } finally { + messagingClient1.stop() + } + } + + // Re-receive on different client from re-started sender + @Test + fun `re-send from different client and re-receive from different client is ignored`() { + val (messagingClient1, receivedMessages) = createAndStartClientAndServer() + val message = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray()) + val fakeMsg = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message) + fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US")) + messagingClient1.deliver(fakeMsg) + + // Now change the send *and* receiver + val messagingClient2 = createMessagingClient() + try { + startNodeMessagingClient() + val fakeMsg2 = messagingClient2.messagingExecutor!!.cordaToArtemisMessage(message) + fakeMsg2!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US")) + + val messagingClient3 = createMessagingClient() + messagingClient3.addMessageHandler(TOPIC) { message, _, handle -> + database.transaction { handle.persistDeduplicationId() } + handle.acknowledge() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages] + receivedMessages.add(message) + } + startNodeMessagingClient() + + messagingClient3.deliver(fakeMsg2) + + val received = receivedMessages.take() + assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg") + val received2 = receivedMessages.poll() + assertThat(received2).isNull() + } finally { + messagingClient1.stop() + messagingClient2.stop() + } + } + + private fun startNodeMessagingClient() { messagingClient!!.start() } @@ -152,6 +272,7 @@ class ArtemisMessagingTest { val messagingClient = createMessagingClient(platformVersion = platformVersion) startNodeMessagingClient() messagingClient.addMessageHandler(TOPIC) { message, _, handle -> + database.transaction { handle.persistDeduplicationId() } handle.acknowledge() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages] receivedMessages.add(message) } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt index 6c2ee03fe4..1b0bde00e5 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt @@ -22,6 +22,7 @@ import net.corda.node.services.messaging.* import net.corda.node.services.statemachine.DeduplicationId import net.corda.node.utilities.AffinityExecutor import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.testing.node.InMemoryMessagingNetwork.TestMessagingService import org.apache.activemq.artemis.utils.ReusableLatch import org.slf4j.LoggerFactory import rx.Observable @@ -247,7 +248,8 @@ class InMemoryMessagingNetwork internal constructor( data class InMemoryMessage(override val topic: String, override val data: ByteSequence, override val uniqueMessageId: DeduplicationId, - override val debugTimestamp: Instant = Instant.now()) : Message { + override val debugTimestamp: Instant = Instant.now(), + override val senderUUID: String? = null) : Message { override fun toString() = "$topic#${String(data.bytes)}" } @@ -256,7 +258,9 @@ class InMemoryMessagingNetwork internal constructor( override val platformVersion: Int, override val uniqueMessageId: DeduplicationId, override val debugTimestamp: Instant, - override val peer: CordaX500Name) : ReceivedMessage + override val peer: CordaX500Name, + override val senderUUID: String? = null, + override val senderSeqNo: Long? = null) : ReceivedMessage /** * A [TestMessagingService] that provides a [MessagingService] abstraction that also contains the ability to