ENT-1391 Create P2P message de-duplication fast path. (#443)

This commit is contained in:
Rick Parker 2018-02-12 18:48:34 +00:00 committed by GitHub
parent e6e2836119
commit ef45900fda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 283 additions and 59 deletions

View File

@ -126,6 +126,7 @@ interface Message {
val data: ByteSequence val data: ByteSequence
val debugTimestamp: Instant val debugTimestamp: Instant
val uniqueMessageId: DeduplicationId val uniqueMessageId: DeduplicationId
val senderUUID: String?
} }
// TODO Have ReceivedMessage point to the TLS certificate of the peer, and [peer] would simply be the subject DN of that. // TODO Have ReceivedMessage point to the TLS certificate of the peer, and [peer] would simply be the subject DN of that.
@ -136,6 +137,7 @@ interface ReceivedMessage : Message {
val peer: CordaX500Name val peer: CordaX500Name
/** Platform version of the sender's node. */ /** Platform version of the sender's node. */
val platformVersion: Int val platformVersion: Int
val senderSeqNo: Long?
} }
/** A singleton that's useful for validating topic strings */ /** A singleton that's useful for validating topic strings */

View File

@ -17,6 +17,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession
import java.util.* import java.util.*
import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException
import java.util.concurrent.atomic.AtomicLong
import kotlin.concurrent.thread import kotlin.concurrent.thread
interface AddressToArtemisQueueResolver { interface AddressToArtemisQueueResolver {
@ -38,6 +39,7 @@ class MessagingExecutor(
val versionInfo: VersionInfo, val versionInfo: VersionInfo,
val resolver: AddressToArtemisQueueResolver, val resolver: AddressToArtemisQueueResolver,
metricRegistry: MetricRegistry, metricRegistry: MetricRegistry,
val ourSenderUUID: String,
queueBound: Int queueBound: Int
) { ) {
private sealed class Job { private sealed class Job {
@ -59,6 +61,7 @@ class MessagingExecutor(
private val sendMessageSizeMetric = metricRegistry.histogram("SendMessageSize") private val sendMessageSizeMetric = metricRegistry.histogram("SendMessageSize")
private val sendLatencyMetric = metricRegistry.timer("SendLatency") private val sendLatencyMetric = metricRegistry.timer("SendLatency")
private val sendBatchSizeMetric = metricRegistry.histogram("SendBatchSize") private val sendBatchSizeMetric = metricRegistry.histogram("SendBatchSize")
private val ourSenderSeqNo = AtomicLong()
private companion object { private companion object {
val log = contextLogger() val log = contextLogger()
@ -193,26 +196,34 @@ class MessagingExecutor(
private fun sendJob(job: Job.Send) { private fun sendJob(job: Job.Send) {
val mqAddress = resolver.resolveTargetToArtemisQueue(job.target) val mqAddress = resolver.resolveTargetToArtemisQueue(job.target)
val artemisMessage = session.createMessage(true).apply { val artemisMessage = cordaToArtemisMessage(job.message)
putStringProperty(P2PMessagingClient.cordaVendorProperty, cordaVendor)
putStringProperty(P2PMessagingClient.releaseVersionProperty, releaseVersion)
putIntProperty(P2PMessagingClient.platformVersionProperty, versionInfo.platformVersion)
putStringProperty(P2PMessagingClient.topicProperty, SimpleString(job.message.topic))
sendMessageSizeMetric.update(job.message.data.bytes.size)
writeBodyBufferBytes(job.message.data.bytes)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(job.message.uniqueMessageId.toString))
// For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended
if (amqDelayMillis > 0 && job.message.topic == FlowMessagingImpl.sessionTopic) {
putLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelayMillis)
}
}
log.trace { log.trace {
"Send to: $mqAddress topic: ${job.message.topic} " + "Send to: $mqAddress topic: ${job.message.topic} " +
"sessionID: ${job.message.topic} id: ${job.message.uniqueMessageId}" "sessionID: ${job.message.topic} id: ${job.message.uniqueMessageId}"
} }
producer.send(SimpleString(mqAddress), artemisMessage) { job.sentFuture.set(Unit) } producer.send(SimpleString(mqAddress), artemisMessage, { job.sentFuture.set(Unit) })
}
internal fun cordaToArtemisMessage(message: Message): ClientMessage? {
return session.createMessage(true).apply {
putStringProperty(P2PMessagingClient.cordaVendorProperty, cordaVendor)
putStringProperty(P2PMessagingClient.releaseVersionProperty, releaseVersion)
putIntProperty(P2PMessagingClient.platformVersionProperty, versionInfo.platformVersion)
putStringProperty(P2PMessagingClient.topicProperty, SimpleString(message.topic))
sendMessageSizeMetric.update(message.data.bytes.size)
writeBodyBufferBytes(message.data.bytes)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString))
// If we are the sender (ie. we are not going through recovery of some sort), use sequence number short cut.
if (ourSenderUUID == message.senderUUID) {
putStringProperty(P2PMessagingClient.senderUUID, SimpleString(ourSenderUUID))
putLongProperty(P2PMessagingClient.senderSeqNo, ourSenderSeqNo.getAndIncrement())
}
// For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended
if (amqDelayMillis > 0 && message.topic == FlowMessagingImpl.sessionTopic) {
putLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelayMillis)
}
}
} }
private fun acknowledgeJob(job: Job.Acknowledge) { private fun acknowledgeJob(job: Job.Acknowledge) {

View File

@ -0,0 +1,98 @@
package net.corda.node.services.messaging
import com.google.common.cache.CacheBuilder
import net.corda.core.identity.CordaX500Name
import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import java.time.Instant
import java.util.*
import java.util.concurrent.TimeUnit
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
/**
* Encapsulate the de-duplication logic.
*/
class P2PMessageDeduplicator(private val database: CordaPersistence) {
val ourSenderUUID = UUID.randomUUID().toString()
private val processedMessages = createProcessedMessages()
// We add the peer to the key, so other peers cannot attempt malicious meddling with sequence numbers.
// Expire after 7 days since we last touched an entry, to avoid infinite growth.
private val senderUUIDSeqNoHWM: MutableMap<Pair<String, CordaX500Name>, Long> = CacheBuilder.newBuilder().expireAfterAccess(7, TimeUnit.DAYS).build<Pair<String, CordaX500Name>, Long>().asMap()
private fun createProcessedMessages(): AppendOnlyPersistentMap<DeduplicationId, Instant, ProcessedMessage, String> {
return AppendOnlyPersistentMap(
toPersistentEntityKey = { it.toString },
fromPersistentEntity = { Pair(DeduplicationId(it.id), it.insertionTime) },
toPersistentEntity = { key: DeduplicationId, value: Instant ->
ProcessedMessage().apply {
id = key.toString
insertionTime = value
}
},
persistentEntityClass = ProcessedMessage::class.java
)
}
private fun isDuplicateInDatabase(msg: ReceivedMessage): Boolean = database.transaction { msg.uniqueMessageId in processedMessages }
/**
* We assign the sender a random identifier [ourSenderUUID] (passed to [MessagingExecutor]). If the sender is also the creator of a message
* (i.e. not from recovered checkpoints), assign a sequence number. Recipients know it is not a duplicate if the sequence number
* is greater than the highest they have seen, otherwise fallback to prior, slower, logic within the database.
*
* The UUIDs will change each time the sender restarts their JVM, and so we may need to prune UUIDs over time. To this end,
* we only remember UUIDs for 7 days from the time we last interacted with them, rebuilding the cached value if we
* ever re-encounter them.
*
* We also ensure the UUID cannot be spoofed, by incorporating the authenticated sender into the key of the map/cache.
*/
private fun isDuplicateWithPotentialOptimization(receivedSenderUUID: String, receivedSenderSeqNo: Long, msg: ReceivedMessage): Boolean {
return senderUUIDSeqNoHWM.compute(Pair(receivedSenderUUID, msg.peer)) { key, existingSeqNoHWM ->
val isNewHWM = (existingSeqNoHWM != null && existingSeqNoHWM < receivedSenderSeqNo)
if (isNewHWM) {
// If we are the new HWM, set the HWM to us.
receivedSenderSeqNo
} else {
// If we are a duplicate, unset the HWM, since it seems like re-delivery is happening for traffic from that sender.
// else if we are not a duplicate, (re)set the HWM to us.
if (isDuplicateInDatabase(msg)) null else receivedSenderSeqNo
}
} != receivedSenderSeqNo
}
/**
* @return true if we have seen this message before.
*/
fun isDuplicate(msg: ReceivedMessage): Boolean {
val receivedSenderUUID = msg.senderUUID
val receivedSenderSeqNo = msg.senderSeqNo
// If we have received a new higher sequence number, then it cannot be a duplicate, and we don't need to check database.
// If we are seeing a sender for the first time, fall back to a database check.
// If we have no information about the sender, also fall back to a database check.
return if (receivedSenderUUID != null && receivedSenderSeqNo != null) {
isDuplicateWithPotentialOptimization(receivedSenderUUID, receivedSenderSeqNo, msg)
} else {
isDuplicateInDatabase(msg)
}
}
fun persistDeduplicationId(msg: ReceivedMessage) {
processedMessages[msg.uniqueMessageId] = Instant.now()
}
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}message_ids")
class ProcessedMessage(
@Id
@Column(name = "message_id", length = 64)
var id: String = "",
@Column(name = "insertion_time")
var insertionTime: Instant = Instant.now()
)
}

View File

@ -21,7 +21,6 @@ import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.statemachine.DeduplicationId import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.PersistentMap import net.corda.node.utilities.PersistentMap
import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent.* import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
@ -102,21 +101,10 @@ class P2PMessagingClient(val config: NodeConfiguration,
val cordaVendorProperty = SimpleString("corda-vendor") val cordaVendorProperty = SimpleString("corda-vendor")
val releaseVersionProperty = SimpleString("release-version") val releaseVersionProperty = SimpleString("release-version")
val platformVersionProperty = SimpleString("platform-version") val platformVersionProperty = SimpleString("platform-version")
private val messageMaxRetryCount: Int = 3 val senderUUID = SimpleString("sender-uuid")
val senderSeqNo = SimpleString("send-seq-no")
fun createProcessedMessages(): AppendOnlyPersistentMap<DeduplicationId, Instant, ProcessedMessage, String> { private val messageMaxRetryCount: Int = 3
return AppendOnlyPersistentMap(
toPersistentEntityKey = { it.toString },
fromPersistentEntity = { Pair(DeduplicationId(it.id), it.insertionTime) },
toPersistentEntity = { key: DeduplicationId, value: Instant ->
ProcessedMessage().apply {
id = key.toString
insertionTime = value
}
},
persistentEntityClass = ProcessedMessage::class.java
)
}
fun createMessageToRedeliver(): PersistentMap<Long, Pair<Message, MessageRecipients>, RetryMessage, Long> { fun createMessageToRedeliver(): PersistentMap<Long, Pair<Message, MessageRecipients>, RetryMessage, Long> {
return PersistentMap( return PersistentMap(
@ -138,7 +126,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
) )
} }
private class NodeClientMessage(override val topic: String, override val data: ByteSequence, override val uniqueMessageId: DeduplicationId) : Message { private class NodeClientMessage(override val topic: String, override val data: ByteSequence, override val uniqueMessageId: DeduplicationId, override val senderUUID: String?) : Message {
override val debugTimestamp: Instant = Instant.now() override val debugTimestamp: Instant = Instant.now()
override fun toString() = "$topic#${String(data.bytes)}" override fun toString() = "$topic#${String(data.bytes)}"
} }
@ -176,19 +164,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
private val handlers = ConcurrentHashMap<String, MessageHandler>() private val handlers = ConcurrentHashMap<String, MessageHandler>()
private val processedMessages = createProcessedMessages() private val deduplicator = P2PMessageDeduplicator(database)
private var messagingExecutor: MessagingExecutor? = null internal var messagingExecutor: MessagingExecutor? = null
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}message_ids")
class ProcessedMessage(
@Id
@Column(name = "message_id", length = 64)
var id: String = "",
@Column(name = "insertion_time")
var insertionTime: Instant = Instant.now()
)
@Entity @Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}message_retry") @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}message_retry")
@ -233,7 +210,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
versionInfo, versionInfo,
this@P2PMessagingClient, this@P2PMessagingClient,
metricRegistry, metricRegistry,
queueBound = config.enterpriseConfiguration.tuning.maximumMessagingBatchSize queueBound = config.enterpriseConfiguration.tuning.maximumMessagingBatchSize,
ourSenderUUID = deduplicator.ourSenderUUID
) )
this@P2PMessagingClient.messagingExecutor = messagingExecutor this@P2PMessagingClient.messagingExecutor = messagingExecutor
messagingExecutor.start() messagingExecutor.start()
@ -247,7 +225,9 @@ class P2PMessagingClient(val config: NodeConfiguration,
private fun InnerState.registerBridgeControl(session: ClientSession, inboxes: List<String>) { private fun InnerState.registerBridgeControl(session: ClientSession, inboxes: List<String>) {
val bridgeNotifyQueue = "$BRIDGE_NOTIFY.${myIdentity.toStringShort()}" val bridgeNotifyQueue = "$BRIDGE_NOTIFY.${myIdentity.toStringShort()}"
if (!session.queueQuery(SimpleString(bridgeNotifyQueue)).isExists) {
session.createTemporaryQueue(BRIDGE_NOTIFY, RoutingType.MULTICAST, bridgeNotifyQueue) session.createTemporaryQueue(BRIDGE_NOTIFY, RoutingType.MULTICAST, bridgeNotifyQueue)
}
val bridgeConsumer = session.createConsumer(bridgeNotifyQueue) val bridgeConsumer = session.createConsumer(bridgeNotifyQueue)
bridgeNotifyConsumer = bridgeConsumer bridgeNotifyConsumer = bridgeConsumer
bridgeConsumer.setMessageHandler { msg -> bridgeConsumer.setMessageHandler { msg ->
@ -354,9 +334,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
null null
} ?: return false } ?: return false
val message: ReceivedMessage? = artemisToCordaMessage(artemisMessage) deliver(artemisMessage)
if (message != null)
deliver(artemisMessage, message)
return true return true
} }
@ -386,9 +364,11 @@ class P2PMessagingClient(val config: NodeConfiguration,
val platformVersion = message.required(platformVersionProperty) { getIntProperty(it) } val platformVersion = message.required(platformVersionProperty) { getIntProperty(it) }
// Use the magic deduplication property built into Artemis as our message identity too // Use the magic deduplication property built into Artemis as our message identity too
val uniqueMessageId = message.required(HDR_DUPLICATE_DETECTION_ID) { DeduplicationId(message.getStringProperty(it)) } val uniqueMessageId = message.required(HDR_DUPLICATE_DETECTION_ID) { DeduplicationId(message.getStringProperty(it)) }
log.trace { "Received message from: ${message.address} user: $user topic: $topic id: $uniqueMessageId" } val receivedSenderUUID = message.getStringProperty(senderUUID)
val receivedSenderSeqNo = if (message.containsProperty(senderSeqNo)) message.getLongProperty(senderSeqNo) else null
log.trace { "Received message from: ${message.address} user: $user topic: $topic id: $uniqueMessageId senderUUID: $receivedSenderUUID senderSeqNo: $receivedSenderSeqNo" }
return ArtemisReceivedMessage(topic, CordaX500Name.parse(user), platformVersion, uniqueMessageId, message) return ArtemisReceivedMessage(topic, CordaX500Name.parse(user), platformVersion, uniqueMessageId, receivedSenderUUID, receivedSenderSeqNo, message)
} catch (e: Exception) { } catch (e: Exception) {
log.error("Unable to process message, ignoring it: $message", e) log.error("Unable to process message, ignoring it: $message", e)
return null return null
@ -404,12 +384,20 @@ class P2PMessagingClient(val config: NodeConfiguration,
override val peer: CordaX500Name, override val peer: CordaX500Name,
override val platformVersion: Int, override val platformVersion: Int,
override val uniqueMessageId: DeduplicationId, override val uniqueMessageId: DeduplicationId,
override val senderUUID: String?,
override val senderSeqNo: Long?,
private val message: ClientMessage) : ReceivedMessage { private val message: ClientMessage) : ReceivedMessage {
override val data: ByteSequence by lazy { OpaqueBytes(ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }) } override val data: ByteSequence by lazy { OpaqueBytes(ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }) }
override val debugTimestamp: Instant get() = Instant.ofEpochMilli(message.timestamp) override val debugTimestamp: Instant get() = Instant.ofEpochMilli(message.timestamp)
override fun toString() = "$topic#$data" override fun toString() = "$topic#$data"
} }
internal fun deliver(artemisMessage: ClientMessage) {
val message: ReceivedMessage? = artemisToCordaMessage(artemisMessage)
if (message != null)
deliver(artemisMessage, message)
}
private fun deliver(artemisMessage: ClientMessage, msg: ReceivedMessage) { private fun deliver(artemisMessage: ClientMessage, msg: ReceivedMessage) {
state.checkNotLocked() state.checkNotLocked()
val deliverTo = handlers[msg.topic] val deliverTo = handlers[msg.topic]
@ -423,14 +411,13 @@ class P2PMessagingClient(val config: NodeConfiguration,
// Note that handlers may re-enter this class. We aren't holding any locks and methods like // Note that handlers may re-enter this class. We aren't holding any locks and methods like
// start/run/stop have re-entrancy assertions at the top, so it is OK. // start/run/stop have re-entrancy assertions at the top, so it is OK.
if (deliverTo != null) { if (deliverTo != null) {
val isDuplicate = database.transaction { msg.uniqueMessageId in processedMessages } if (deduplicator.isDuplicate(msg)) {
if (isDuplicate) {
log.trace { "Discard duplicate message ${msg.uniqueMessageId} for ${msg.topic}" } log.trace { "Discard duplicate message ${msg.uniqueMessageId} for ${msg.topic}" }
return return
} }
val acknowledgeHandle = object : AcknowledgeHandle { val acknowledgeHandle = object : AcknowledgeHandle {
override fun persistDeduplicationId() { override fun persistDeduplicationId() {
processedMessages[msg.uniqueMessageId] = Instant.now() deduplicator.persistDeduplicationId(msg)
} }
// ACKing a message calls back into the session which isn't thread safe, so we have to ensure it // ACKing a message calls back into the session which isn't thread safe, so we have to ensure it
@ -601,7 +588,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
} }
override fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId): Message { override fun createMessage(topic: String, data: ByteArray, deduplicationId: DeduplicationId): Message {
return NodeClientMessage(topic, OpaqueBytes(data), deduplicationId) return NodeClientMessage(topic, OpaqueBytes(data), deduplicationId, deduplicator.ourSenderUUID)
} }
override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients { override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients {

View File

@ -14,6 +14,7 @@ import net.corda.node.services.api.SchemaService.SchemaOptions
import net.corda.node.services.events.NodeSchedulerService import net.corda.node.services.events.NodeSchedulerService
import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.P2PMessageDeduplicator
import net.corda.node.services.messaging.P2PMessagingClient import net.corda.node.services.messaging.P2PMessagingClient
import net.corda.node.services.persistence.* import net.corda.node.services.persistence.*
import net.corda.node.services.transactions.BFTNonValidatingNotaryService import net.corda.node.services.transactions.BFTNonValidatingNotaryService
@ -40,7 +41,7 @@ class NodeSchemaService(extraSchemas: Set<MappedSchema> = emptySet(), includeNot
PersistentKeyManagementService.PersistentKey::class.java, PersistentKeyManagementService.PersistentKey::class.java,
NodeSchedulerService.PersistentScheduledState::class.java, NodeSchedulerService.PersistentScheduledState::class.java,
NodeAttachmentService.DBAttachment::class.java, NodeAttachmentService.DBAttachment::class.java,
P2PMessagingClient.ProcessedMessage::class.java, P2PMessageDeduplicator.ProcessedMessage::class.java,
P2PMessagingClient.RetryMessage::class.java, P2PMessagingClient.RetryMessage::class.java,
NodeAttachmentService.DBAttachment::class.java, NodeAttachmentService.DBAttachment::class.java,
PersistentIdentityService.PersistentIdentity::class.java, PersistentIdentityService.PersistentIdentity::class.java,

View File

@ -18,6 +18,8 @@ import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.rigorousMock import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
import org.apache.activemq.artemis.api.core.SimpleString
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.After import org.junit.After
@ -140,6 +142,124 @@ class ArtemisMessagingTest {
assertThat(received.platformVersion).isEqualTo(3) assertThat(received.platformVersion).isEqualTo(3)
} }
@Test
fun `we can fake send and receive`() {
val (messagingClient, receivedMessages) = createAndStartClientAndServer()
val message = messagingClient.createMessage(TOPIC, data = "first msg".toByteArray())
val fakeMsg = messagingClient.messagingExecutor!!.cordaToArtemisMessage(message)
fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
messagingClient.deliver(fakeMsg)
val received = receivedMessages.take()
assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg")
}
@Test
fun `redelivery from same client is ignored`() {
val (messagingClient, receivedMessages) = createAndStartClientAndServer()
val message = messagingClient.createMessage(TOPIC, data = "first msg".toByteArray())
val fakeMsg = messagingClient.messagingExecutor!!.cordaToArtemisMessage(message)
fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
messagingClient.deliver(fakeMsg)
messagingClient.deliver(fakeMsg)
val received = receivedMessages.take()
assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg")
val received2 = receivedMessages.poll()
assertThat(received2).isNull()
}
// Redelivery from a sender who stops and restarts (some re-sends from the sender, with sender state reset with exception of recovered checkpoints)
@Test
fun `re-send from different client is ignored`() {
val (messagingClient1, receivedMessages) = createAndStartClientAndServer()
val message = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray())
val fakeMsg = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message)
fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
messagingClient1.deliver(fakeMsg)
// Now change the sender
try {
val messagingClient2 = createMessagingClient()
startNodeMessagingClient()
val fakeMsg2 = messagingClient2.messagingExecutor!!.cordaToArtemisMessage(message)
fakeMsg2!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
messagingClient1.deliver(fakeMsg2)
val received = receivedMessages.take()
assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg")
val received2 = receivedMessages.poll()
assertThat(received2).isNull()
} finally {
messagingClient1.stop()
}
}
// Redelivery to a receiver who stops and restarts (some re-deliveries from Artemis, but with receiver state reset)
@Test
fun `re-receive from different client is ignored`() {
val (messagingClient1, receivedMessages) = createAndStartClientAndServer()
val message = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray())
val fakeMsg = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message)
fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
messagingClient1.deliver(fakeMsg)
// Now change the receiver
try {
val messagingClient2 = createMessagingClient()
messagingClient2.addMessageHandler(TOPIC) { message, _, handle ->
database.transaction { handle.persistDeduplicationId() }
handle.acknowledge() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages]
receivedMessages.add(message)
}
startNodeMessagingClient()
messagingClient2.deliver(fakeMsg)
val received = receivedMessages.take()
assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg")
val received2 = receivedMessages.poll()
assertThat(received2).isNull()
} finally {
messagingClient1.stop()
}
}
// Re-receive on different client from re-started sender
@Test
fun `re-send from different client and re-receive from different client is ignored`() {
val (messagingClient1, receivedMessages) = createAndStartClientAndServer()
val message = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray())
val fakeMsg = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message)
fakeMsg!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
messagingClient1.deliver(fakeMsg)
// Now change the send *and* receiver
val messagingClient2 = createMessagingClient()
try {
startNodeMessagingClient()
val fakeMsg2 = messagingClient2.messagingExecutor!!.cordaToArtemisMessage(message)
fakeMsg2!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
val messagingClient3 = createMessagingClient()
messagingClient3.addMessageHandler(TOPIC) { message, _, handle ->
database.transaction { handle.persistDeduplicationId() }
handle.acknowledge() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages]
receivedMessages.add(message)
}
startNodeMessagingClient()
messagingClient3.deliver(fakeMsg2)
val received = receivedMessages.take()
assertThat(String(received.data.bytes, Charsets.UTF_8)).isEqualTo("first msg")
val received2 = receivedMessages.poll()
assertThat(received2).isNull()
} finally {
messagingClient1.stop()
messagingClient2.stop()
}
}
private fun startNodeMessagingClient() { private fun startNodeMessagingClient() {
messagingClient!!.start() messagingClient!!.start()
} }
@ -152,6 +272,7 @@ class ArtemisMessagingTest {
val messagingClient = createMessagingClient(platformVersion = platformVersion) val messagingClient = createMessagingClient(platformVersion = platformVersion)
startNodeMessagingClient() startNodeMessagingClient()
messagingClient.addMessageHandler(TOPIC) { message, _, handle -> messagingClient.addMessageHandler(TOPIC) { message, _, handle ->
database.transaction { handle.persistDeduplicationId() }
handle.acknowledge() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages] handle.acknowledge() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages]
receivedMessages.add(message) receivedMessages.add(message)
} }

View File

@ -22,6 +22,7 @@ import net.corda.node.services.messaging.*
import net.corda.node.services.statemachine.DeduplicationId import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.testing.node.InMemoryMessagingNetwork.TestMessagingService
import org.apache.activemq.artemis.utils.ReusableLatch import org.apache.activemq.artemis.utils.ReusableLatch
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import rx.Observable import rx.Observable
@ -247,7 +248,8 @@ class InMemoryMessagingNetwork internal constructor(
data class InMemoryMessage(override val topic: String, data class InMemoryMessage(override val topic: String,
override val data: ByteSequence, override val data: ByteSequence,
override val uniqueMessageId: DeduplicationId, override val uniqueMessageId: DeduplicationId,
override val debugTimestamp: Instant = Instant.now()) : Message { override val debugTimestamp: Instant = Instant.now(),
override val senderUUID: String? = null) : Message {
override fun toString() = "$topic#${String(data.bytes)}" override fun toString() = "$topic#${String(data.bytes)}"
} }
@ -256,7 +258,9 @@ class InMemoryMessagingNetwork internal constructor(
override val platformVersion: Int, override val platformVersion: Int,
override val uniqueMessageId: DeduplicationId, override val uniqueMessageId: DeduplicationId,
override val debugTimestamp: Instant, override val debugTimestamp: Instant,
override val peer: CordaX500Name) : ReceivedMessage override val peer: CordaX500Name,
override val senderUUID: String? = null,
override val senderSeqNo: Long? = null) : ReceivedMessage
/** /**
* A [TestMessagingService] that provides a [MessagingService] abstraction that also contains the ability to * A [TestMessagingService] that provides a [MessagingService] abstraction that also contains the ability to