mirror of
https://github.com/corda/corda.git
synced 2025-01-15 17:30:02 +00:00
ENT-1729 P2P message de-duplication performance optimisation has a flaw (#675)
This commit is contained in:
parent
6c2cfc3880
commit
23184b7495
@ -11,42 +11,49 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.node.services.statemachine.DeduplicationId
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import org.apache.mina.util.ConcurrentHashSet
|
||||
import java.io.Serializable
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.TimeUnit
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.Id
|
||||
|
||||
typealias SenderHashToSeqNo = Pair<String, Long?>
|
||||
|
||||
/**
|
||||
* Encapsulate the de-duplication logic.
|
||||
*/
|
||||
class P2PMessageDeduplicator(private val database: CordaPersistence) {
|
||||
val ourSenderUUID = UUID.randomUUID().toString()
|
||||
|
||||
// A temporary in-memory set of deduplication IDs. When we receive a message we don't persist the ID immediately,
|
||||
// A temporary in-memory set of deduplication IDs and associated high water mark details.
|
||||
// When we receive a message we don't persist the ID immediately,
|
||||
// so we store the ID here in the meantime (until the persisting db tx has committed). This is because Artemis may
|
||||
// redeliver messages to the same consumer if they weren't ACKed.
|
||||
private val beingProcessedMessages = ConcurrentHashSet<DeduplicationId>()
|
||||
private val beingProcessedMessages = ConcurrentHashMap<DeduplicationId, MessageMeta>()
|
||||
private val processedMessages = createProcessedMessages()
|
||||
// We add the peer to the key, so other peers cannot attempt malicious meddling with sequence numbers.
|
||||
// Expire after 7 days since we last touched an entry, to avoid infinite growth.
|
||||
private val senderUUIDSeqNoHWM: MutableMap<Triple<String, CordaX500Name, Boolean>, Long> = Caffeine.newBuilder().expireAfterAccess(7, TimeUnit.DAYS).build<Triple<String, CordaX500Name, Boolean>, Long>().asMap()
|
||||
private val senderUUIDSeqNoHWM: MutableMap<SenderKey, SenderHashToSeqNo> = Caffeine.newBuilder().expireAfterAccess(7, TimeUnit.DAYS).build<SenderKey, SenderHashToSeqNo>().asMap()
|
||||
|
||||
private fun createProcessedMessages(): AppendOnlyPersistentMap<DeduplicationId, Instant, ProcessedMessage, String> {
|
||||
private fun createProcessedMessages(): AppendOnlyPersistentMap<DeduplicationId, MessageMeta, ProcessedMessage, String> {
|
||||
return AppendOnlyPersistentMap(
|
||||
toPersistentEntityKey = { it.toString },
|
||||
fromPersistentEntity = { Pair(DeduplicationId(it.id), it.insertionTime) },
|
||||
toPersistentEntity = { key: DeduplicationId, value: Instant ->
|
||||
fromPersistentEntity = { Pair(DeduplicationId(it.id), MessageMeta(it.insertionTime, it.hash, it.seqNo)) },
|
||||
toPersistentEntity = { key: DeduplicationId, value: MessageMeta ->
|
||||
ProcessedMessage().apply {
|
||||
id = key.toString
|
||||
insertionTime = value
|
||||
insertionTime = value.insertionTime
|
||||
hash = value.senderHash
|
||||
seqNo = value.senderSeqNo
|
||||
}
|
||||
},
|
||||
persistentEntityClass = ProcessedMessage::class.java
|
||||
@ -67,24 +74,40 @@ class P2PMessageDeduplicator(private val database: CordaPersistence) {
|
||||
* We also ensure the UUID cannot be spoofed, by incorporating the authenticated sender into the key of the map/cache.
|
||||
*/
|
||||
private fun isDuplicateWithPotentialOptimization(receivedSenderUUID: String, receivedSenderSeqNo: Long, msg: ReceivedMessage): Boolean {
|
||||
return senderUUIDSeqNoHWM.compute(Triple(receivedSenderUUID, msg.peer, msg.isSessionInit)) { key, existingSeqNoHWM ->
|
||||
val isNewHWM = (existingSeqNoHWM != null && existingSeqNoHWM < receivedSenderSeqNo)
|
||||
if (isNewHWM) {
|
||||
// If we are the new HWM, set the HWM to us.
|
||||
receivedSenderSeqNo
|
||||
} else {
|
||||
// If we are a duplicate, unset the HWM, since it seems like re-delivery is happening for traffic from that sender.
|
||||
// else if we are not a duplicate, (re)set the HWM to us.
|
||||
if (isDuplicateInDatabase(msg)) null else receivedSenderSeqNo
|
||||
}
|
||||
} != receivedSenderSeqNo
|
||||
val senderKey = SenderKey(receivedSenderUUID, msg.peer, msg.isSessionInit)
|
||||
val (senderHash, existingSeqNoHWM) = senderUUIDSeqNoHWM.computeIfAbsent(senderKey) {
|
||||
highestSeqNoHWMInDatabaseFor(senderKey)
|
||||
}
|
||||
val isNewHWM = (existingSeqNoHWM == null || existingSeqNoHWM < receivedSenderSeqNo)
|
||||
return if (isNewHWM) {
|
||||
senderUUIDSeqNoHWM[senderKey] = senderHash to receivedSenderSeqNo
|
||||
false
|
||||
} else isDuplicateInDatabase(msg)
|
||||
}
|
||||
|
||||
/**
|
||||
* Work out the highest sequence number for the given sender, as persisted last time we ran.
|
||||
*
|
||||
* TODO: consider the performance of doing this per sender vs. one big load at startup, vs. adding an index (and impact on inserts).
|
||||
*/
|
||||
private fun highestSeqNoHWMInDatabaseFor(senderKey: SenderKey): SenderHashToSeqNo {
|
||||
val senderHash = senderHash(senderKey)
|
||||
return senderHash to database.transaction {
|
||||
val cb1 = session.criteriaBuilder
|
||||
val cq1 = cb1.createQuery(Long::class.java)
|
||||
val root = cq1.from(ProcessedMessage::class.java)
|
||||
session.createQuery(cq1.select(cb1.max(root.get<Long>(ProcessedMessage::seqNo.name))).where(cb1.equal(root.get<String>(ProcessedMessage::hash.name), senderHash))).singleResult
|
||||
}
|
||||
}
|
||||
|
||||
// We need to incorporate the sending party, and the sessionInit flag as per the in-memory cache.
|
||||
private fun senderHash(senderKey: SenderKey) = SecureHash.sha256(senderKey.peer.toString() + senderKey.isSessionInit.toString() + senderKey.senderUUID).toString()
|
||||
|
||||
/**
|
||||
* @return true if we have seen this message before.
|
||||
*/
|
||||
fun isDuplicate(msg: ReceivedMessage): Boolean {
|
||||
if (msg.uniqueMessageId in beingProcessedMessages) {
|
||||
if (beingProcessedMessages.containsKey(msg.uniqueMessageId)) {
|
||||
return true
|
||||
}
|
||||
val receivedSenderUUID = msg.senderUUID
|
||||
@ -102,15 +125,20 @@ class P2PMessageDeduplicator(private val database: CordaPersistence) {
|
||||
/**
|
||||
* Called the first time we encounter [deduplicationId].
|
||||
*/
|
||||
fun signalMessageProcessStart(deduplicationId: DeduplicationId) {
|
||||
beingProcessedMessages.add(deduplicationId)
|
||||
fun signalMessageProcessStart(msg: ReceivedMessage) {
|
||||
val receivedSenderUUID = msg.senderUUID
|
||||
val receivedSenderSeqNo = msg.senderSeqNo
|
||||
// We don't want a mix of nulls and values so we ensure that here.
|
||||
val senderHash: String? = if (receivedSenderUUID != null && receivedSenderSeqNo != null) senderUUIDSeqNoHWM[SenderKey(receivedSenderUUID, msg.peer, msg.isSessionInit)]?.first else null
|
||||
val senderSeqNo: Long? = if (senderHash != null) msg.senderSeqNo else null
|
||||
beingProcessedMessages[msg.uniqueMessageId] = MessageMeta(Instant.now(), senderHash, senderSeqNo)
|
||||
}
|
||||
|
||||
/**
|
||||
* Called inside a DB transaction to persist [deduplicationId].
|
||||
*/
|
||||
fun persistDeduplicationId(deduplicationId: DeduplicationId) {
|
||||
processedMessages[deduplicationId] = Instant.now()
|
||||
processedMessages[deduplicationId] = beingProcessedMessages[deduplicationId]!!
|
||||
}
|
||||
|
||||
/**
|
||||
@ -129,6 +157,16 @@ class P2PMessageDeduplicator(private val database: CordaPersistence) {
|
||||
var id: String = "",
|
||||
|
||||
@Column(name = "insertion_time")
|
||||
var insertionTime: Instant = Instant.now()
|
||||
)
|
||||
var insertionTime: Instant = Instant.now(),
|
||||
|
||||
@Column(name = "sender", length = 64)
|
||||
var hash: String? = "",
|
||||
|
||||
@Column(name = "sequence_number")
|
||||
var seqNo: Long? = null
|
||||
) : Serializable
|
||||
|
||||
private data class MessageMeta(val insertionTime: Instant, val senderHash: String?, val senderSeqNo: Long?)
|
||||
|
||||
private data class SenderKey(val senderUUID: String, val peer: CordaX500Name, val isSessionInit: Boolean)
|
||||
}
|
@ -27,11 +27,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.internal.LifecycleSupport
|
||||
import net.corda.node.internal.artemis.ReactiveArtemisConsumer
|
||||
@ -44,14 +40,11 @@ import net.corda.node.utilities.PersistentMap
|
||||
import net.corda.nodeapi.ArtemisTcpTransport
|
||||
import net.corda.nodeapi.ConnectionDirection
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress
|
||||
import net.corda.nodeapi.internal.bridging.BridgeControl
|
||||
import net.corda.nodeapi.internal.bridging.BridgeEntry
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
@ -61,12 +54,7 @@ import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
|
||||
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||
import rx.Observable
|
||||
import rx.Subscription
|
||||
@ -192,17 +180,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
private val deduplicator = P2PMessageDeduplicator(database)
|
||||
internal var messagingExecutor: MessagingExecutor? = null
|
||||
|
||||
@Entity
|
||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}message_ids")
|
||||
class ProcessedMessage(
|
||||
@Id
|
||||
@Column(name = "message_id", length = 64)
|
||||
var uuid: String = "",
|
||||
|
||||
@Column(name = "insertion_time")
|
||||
var insertionTime: Instant = Instant.now()
|
||||
) : Serializable
|
||||
|
||||
@Entity
|
||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}message_retry")
|
||||
class RetryMessage(
|
||||
@ -456,7 +433,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
|
||||
artemisToCordaMessage(artemisMessage)?.let { cordaMessage ->
|
||||
if (!deduplicator.isDuplicate(cordaMessage)) {
|
||||
deduplicator.signalMessageProcessStart(cordaMessage.uniqueMessageId)
|
||||
deduplicator.signalMessageProcessStart(cordaMessage)
|
||||
deliver(cordaMessage, artemisMessage)
|
||||
} else {
|
||||
log.trace { "Discard duplicate message ${cordaMessage.uniqueMessageId} for ${cordaMessage.topic}" }
|
||||
|
@ -29,4 +29,10 @@
|
||||
</addColumn>
|
||||
</changeSet>
|
||||
|
||||
<changeSet author="R3.Corda" id="add_dedup_columns">
|
||||
<addColumn tableName="node_message_ids">
|
||||
<column name="sender" type="NVARCHAR(64)"/>
|
||||
<column name="sequence_number" type="BIGINT"/>
|
||||
</addColumn>
|
||||
</changeSet>
|
||||
</databaseChangeLog>
|
||||
|
@ -191,6 +191,7 @@ class ArtemisMessagingTest {
|
||||
try {
|
||||
val messagingClient2 = createMessagingClient()
|
||||
startNodeMessagingClient()
|
||||
|
||||
val fakeMsg2 = messagingClient2.messagingExecutor!!.cordaToArtemisMessage(message)
|
||||
fakeMsg2!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
|
||||
@ -234,6 +235,51 @@ class ArtemisMessagingTest {
|
||||
}
|
||||
}
|
||||
|
||||
// Redelivery to a receiver who stops and restarts (some re-deliveries from Artemis, but with receiver state reset), but the original
|
||||
// messages were recorded as consumed out of order, and only the *second* message was acked.
|
||||
@Test
|
||||
fun `re-receive from different client is not ignored when acked out of order`() {
|
||||
// Don't ack first message, pretend we exit before that happens (but after second message is acked).
|
||||
val (messagingClient1, receivedMessages) = createAndStartClientAndServer(dontAckCondition = { received -> String(received.data.bytes, Charsets.UTF_8) == "first msg" })
|
||||
val message1 = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray())
|
||||
val message2 = messagingClient1.createMessage(TOPIC, data = "second msg".toByteArray())
|
||||
val fakeMsg1 = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message1)
|
||||
val fakeMsg2 = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message2)
|
||||
fakeMsg1!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
fakeMsg2!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
|
||||
messagingClient1.deliver(fakeMsg1)
|
||||
messagingClient1.deliver(fakeMsg2)
|
||||
|
||||
// Now change the receiver
|
||||
try {
|
||||
val messagingClient2 = createMessagingClient()
|
||||
messagingClient2.addMessageHandler(TOPIC) { msg, _, handle ->
|
||||
// The try-finally causes the test to fail if there's a duplicate insert (which, naturally, is an error but otherwise gets swallowed).
|
||||
try {
|
||||
database.transaction { handle.insideDatabaseTransaction() }
|
||||
} finally {
|
||||
handle.afterDatabaseTransaction() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages]
|
||||
receivedMessages.add(msg)
|
||||
}
|
||||
}
|
||||
startNodeMessagingClient()
|
||||
|
||||
messagingClient2.deliver(fakeMsg1)
|
||||
messagingClient2.deliver(fakeMsg2)
|
||||
|
||||
// Should receive 2 and then 1 (and not 2 again).
|
||||
val received = receivedMessages.take()
|
||||
assertThat(received.senderSeqNo).isEqualTo(1)
|
||||
val received2 = receivedMessages.poll()
|
||||
assertThat(received2.senderSeqNo).isEqualTo(0)
|
||||
val received3 = receivedMessages.poll()
|
||||
assertThat(received3).isNull()
|
||||
} finally {
|
||||
messagingClient1.stop()
|
||||
}
|
||||
}
|
||||
|
||||
// Re-receive on different client from re-started sender
|
||||
@Test
|
||||
fun `re-send from different client and re-receive from different client is ignored`() {
|
||||
@ -274,13 +320,14 @@ class ArtemisMessagingTest {
|
||||
messagingClient!!.start()
|
||||
}
|
||||
|
||||
private fun createAndStartClientAndServer(platformVersion: Int = 1): Pair<P2PMessagingClient, BlockingQueue<ReceivedMessage>> {
|
||||
private fun createAndStartClientAndServer(platformVersion: Int = 1, dontAckCondition: (msg: ReceivedMessage) -> Boolean = { false }): Pair<P2PMessagingClient, BlockingQueue<ReceivedMessage>> {
|
||||
val receivedMessages = LinkedBlockingQueue<ReceivedMessage>()
|
||||
|
||||
createMessagingServer().start()
|
||||
|
||||
val messagingClient = createMessagingClient(platformVersion = platformVersion)
|
||||
messagingClient.addMessageHandler(TOPIC) { message, _, handle ->
|
||||
if (dontAckCondition(message)) return@addMessageHandler
|
||||
database.transaction { handle.insideDatabaseTransaction() }
|
||||
handle.afterDatabaseTransaction() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages]
|
||||
receivedMessages.add(message)
|
||||
|
Loading…
Reference in New Issue
Block a user