diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt index 4782fbcfdd..0de1464d5f 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt @@ -11,42 +11,49 @@ package net.corda.node.services.messaging import com.github.benmanes.caffeine.cache.Caffeine +import net.corda.core.crypto.SecureHash import net.corda.core.identity.CordaX500Name import net.corda.node.services.statemachine.DeduplicationId import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX -import org.apache.mina.util.ConcurrentHashSet +import java.io.Serializable import java.time.Instant import java.util.* +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit import javax.persistence.Column import javax.persistence.Entity import javax.persistence.Id +typealias SenderHashToSeqNo = Pair + /** * Encapsulate the de-duplication logic. */ class P2PMessageDeduplicator(private val database: CordaPersistence) { val ourSenderUUID = UUID.randomUUID().toString() - // A temporary in-memory set of deduplication IDs. When we receive a message we don't persist the ID immediately, + // A temporary in-memory set of deduplication IDs and associated high water mark details. + // When we receive a message we don't persist the ID immediately, // so we store the ID here in the meantime (until the persisting db tx has committed). This is because Artemis may // redeliver messages to the same consumer if they weren't ACKed. - private val beingProcessedMessages = ConcurrentHashSet() + private val beingProcessedMessages = ConcurrentHashMap() private val processedMessages = createProcessedMessages() // We add the peer to the key, so other peers cannot attempt malicious meddling with sequence numbers. // Expire after 7 days since we last touched an entry, to avoid infinite growth. - private val senderUUIDSeqNoHWM: MutableMap, Long> = Caffeine.newBuilder().expireAfterAccess(7, TimeUnit.DAYS).build, Long>().asMap() + private val senderUUIDSeqNoHWM: MutableMap = Caffeine.newBuilder().expireAfterAccess(7, TimeUnit.DAYS).build().asMap() - private fun createProcessedMessages(): AppendOnlyPersistentMap { + private fun createProcessedMessages(): AppendOnlyPersistentMap { return AppendOnlyPersistentMap( toPersistentEntityKey = { it.toString }, - fromPersistentEntity = { Pair(DeduplicationId(it.id), it.insertionTime) }, - toPersistentEntity = { key: DeduplicationId, value: Instant -> + fromPersistentEntity = { Pair(DeduplicationId(it.id), MessageMeta(it.insertionTime, it.hash, it.seqNo)) }, + toPersistentEntity = { key: DeduplicationId, value: MessageMeta -> ProcessedMessage().apply { id = key.toString - insertionTime = value + insertionTime = value.insertionTime + hash = value.senderHash + seqNo = value.senderSeqNo } }, persistentEntityClass = ProcessedMessage::class.java @@ -67,24 +74,40 @@ class P2PMessageDeduplicator(private val database: CordaPersistence) { * We also ensure the UUID cannot be spoofed, by incorporating the authenticated sender into the key of the map/cache. */ private fun isDuplicateWithPotentialOptimization(receivedSenderUUID: String, receivedSenderSeqNo: Long, msg: ReceivedMessage): Boolean { - return senderUUIDSeqNoHWM.compute(Triple(receivedSenderUUID, msg.peer, msg.isSessionInit)) { key, existingSeqNoHWM -> - val isNewHWM = (existingSeqNoHWM != null && existingSeqNoHWM < receivedSenderSeqNo) - if (isNewHWM) { - // If we are the new HWM, set the HWM to us. - receivedSenderSeqNo - } else { - // If we are a duplicate, unset the HWM, since it seems like re-delivery is happening for traffic from that sender. - // else if we are not a duplicate, (re)set the HWM to us. - if (isDuplicateInDatabase(msg)) null else receivedSenderSeqNo - } - } != receivedSenderSeqNo + val senderKey = SenderKey(receivedSenderUUID, msg.peer, msg.isSessionInit) + val (senderHash, existingSeqNoHWM) = senderUUIDSeqNoHWM.computeIfAbsent(senderKey) { + highestSeqNoHWMInDatabaseFor(senderKey) + } + val isNewHWM = (existingSeqNoHWM == null || existingSeqNoHWM < receivedSenderSeqNo) + return if (isNewHWM) { + senderUUIDSeqNoHWM[senderKey] = senderHash to receivedSenderSeqNo + false + } else isDuplicateInDatabase(msg) } + /** + * Work out the highest sequence number for the given sender, as persisted last time we ran. + * + * TODO: consider the performance of doing this per sender vs. one big load at startup, vs. adding an index (and impact on inserts). + */ + private fun highestSeqNoHWMInDatabaseFor(senderKey: SenderKey): SenderHashToSeqNo { + val senderHash = senderHash(senderKey) + return senderHash to database.transaction { + val cb1 = session.criteriaBuilder + val cq1 = cb1.createQuery(Long::class.java) + val root = cq1.from(ProcessedMessage::class.java) + session.createQuery(cq1.select(cb1.max(root.get(ProcessedMessage::seqNo.name))).where(cb1.equal(root.get(ProcessedMessage::hash.name), senderHash))).singleResult + } + } + + // We need to incorporate the sending party, and the sessionInit flag as per the in-memory cache. + private fun senderHash(senderKey: SenderKey) = SecureHash.sha256(senderKey.peer.toString() + senderKey.isSessionInit.toString() + senderKey.senderUUID).toString() + /** * @return true if we have seen this message before. */ fun isDuplicate(msg: ReceivedMessage): Boolean { - if (msg.uniqueMessageId in beingProcessedMessages) { + if (beingProcessedMessages.containsKey(msg.uniqueMessageId)) { return true } val receivedSenderUUID = msg.senderUUID @@ -102,15 +125,20 @@ class P2PMessageDeduplicator(private val database: CordaPersistence) { /** * Called the first time we encounter [deduplicationId]. */ - fun signalMessageProcessStart(deduplicationId: DeduplicationId) { - beingProcessedMessages.add(deduplicationId) + fun signalMessageProcessStart(msg: ReceivedMessage) { + val receivedSenderUUID = msg.senderUUID + val receivedSenderSeqNo = msg.senderSeqNo + // We don't want a mix of nulls and values so we ensure that here. + val senderHash: String? = if (receivedSenderUUID != null && receivedSenderSeqNo != null) senderUUIDSeqNoHWM[SenderKey(receivedSenderUUID, msg.peer, msg.isSessionInit)]?.first else null + val senderSeqNo: Long? = if (senderHash != null) msg.senderSeqNo else null + beingProcessedMessages[msg.uniqueMessageId] = MessageMeta(Instant.now(), senderHash, senderSeqNo) } /** * Called inside a DB transaction to persist [deduplicationId]. */ fun persistDeduplicationId(deduplicationId: DeduplicationId) { - processedMessages[deduplicationId] = Instant.now() + processedMessages[deduplicationId] = beingProcessedMessages[deduplicationId]!! } /** @@ -129,6 +157,16 @@ class P2PMessageDeduplicator(private val database: CordaPersistence) { var id: String = "", @Column(name = "insertion_time") - var insertionTime: Instant = Instant.now() - ) + var insertionTime: Instant = Instant.now(), + + @Column(name = "sender", length = 64) + var hash: String? = "", + + @Column(name = "sequence_number") + var seqNo: Long? = null + ) : Serializable + + private data class MessageMeta(val insertionTime: Instant, val senderHash: String?, val senderSeqNo: Long?) + + private data class SenderKey(val senderUUID: String, val peer: CordaX500Name, val isSessionInit: Boolean) } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 51b41bbccc..48001bfd76 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -27,11 +27,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.serialization.serialize -import net.corda.core.utilities.ByteSequence -import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.OpaqueBytes -import net.corda.core.utilities.contextLogger -import net.corda.core.utilities.trace +import net.corda.core.utilities.* import net.corda.node.VersionInfo import net.corda.node.internal.LifecycleSupport import net.corda.node.internal.artemis.ReactiveArtemisConsumer @@ -44,14 +40,11 @@ import net.corda.node.utilities.PersistentMap import net.corda.nodeapi.ArtemisTcpTransport import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.internal.ArtemisMessagingComponent -import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress +import net.corda.nodeapi.internal.ArtemisMessagingComponent.* import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX -import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress -import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress -import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress import net.corda.nodeapi.internal.bridging.BridgeControl import net.corda.nodeapi.internal.bridging.BridgeEntry import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -61,12 +54,7 @@ import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString -import org.apache.activemq.artemis.api.core.client.ActiveMQClient -import org.apache.activemq.artemis.api.core.client.ClientConsumer -import org.apache.activemq.artemis.api.core.client.ClientMessage -import org.apache.activemq.artemis.api.core.client.ClientProducer -import org.apache.activemq.artemis.api.core.client.ClientSession -import org.apache.activemq.artemis.api.core.client.ServerLocator +import org.apache.activemq.artemis.api.core.client.* import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY import rx.Observable import rx.Subscription @@ -192,17 +180,6 @@ class P2PMessagingClient(val config: NodeConfiguration, private val deduplicator = P2PMessageDeduplicator(database) internal var messagingExecutor: MessagingExecutor? = null - @Entity - @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}message_ids") - class ProcessedMessage( - @Id - @Column(name = "message_id", length = 64) - var uuid: String = "", - - @Column(name = "insertion_time") - var insertionTime: Instant = Instant.now() - ) : Serializable - @Entity @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}message_retry") class RetryMessage( @@ -456,7 +433,7 @@ class P2PMessagingClient(val config: NodeConfiguration, artemisToCordaMessage(artemisMessage)?.let { cordaMessage -> if (!deduplicator.isDuplicate(cordaMessage)) { - deduplicator.signalMessageProcessStart(cordaMessage.uniqueMessageId) + deduplicator.signalMessageProcessStart(cordaMessage) deliver(cordaMessage, artemisMessage) } else { log.trace { "Discard duplicate message ${cordaMessage.uniqueMessageId} for ${cordaMessage.topic}" } diff --git a/node/src/main/resources/migration/node-core.changelog-v3.xml b/node/src/main/resources/migration/node-core.changelog-v3.xml index 9e0cc17293..5c34a74237 100644 --- a/node/src/main/resources/migration/node-core.changelog-v3.xml +++ b/node/src/main/resources/migration/node-core.changelog-v3.xml @@ -29,4 +29,10 @@ + + + + + + diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt index f653517196..b78f6a603c 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt @@ -191,6 +191,7 @@ class ArtemisMessagingTest { try { val messagingClient2 = createMessagingClient() startNodeMessagingClient() + val fakeMsg2 = messagingClient2.messagingExecutor!!.cordaToArtemisMessage(message) fakeMsg2!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US")) @@ -234,6 +235,51 @@ class ArtemisMessagingTest { } } + // Redelivery to a receiver who stops and restarts (some re-deliveries from Artemis, but with receiver state reset), but the original + // messages were recorded as consumed out of order, and only the *second* message was acked. + @Test + fun `re-receive from different client is not ignored when acked out of order`() { + // Don't ack first message, pretend we exit before that happens (but after second message is acked). + val (messagingClient1, receivedMessages) = createAndStartClientAndServer(dontAckCondition = { received -> String(received.data.bytes, Charsets.UTF_8) == "first msg" }) + val message1 = messagingClient1.createMessage(TOPIC, data = "first msg".toByteArray()) + val message2 = messagingClient1.createMessage(TOPIC, data = "second msg".toByteArray()) + val fakeMsg1 = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message1) + val fakeMsg2 = messagingClient1.messagingExecutor!!.cordaToArtemisMessage(message2) + fakeMsg1!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US")) + fakeMsg2!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US")) + + messagingClient1.deliver(fakeMsg1) + messagingClient1.deliver(fakeMsg2) + + // Now change the receiver + try { + val messagingClient2 = createMessagingClient() + messagingClient2.addMessageHandler(TOPIC) { msg, _, handle -> + // The try-finally causes the test to fail if there's a duplicate insert (which, naturally, is an error but otherwise gets swallowed). + try { + database.transaction { handle.insideDatabaseTransaction() } + } finally { + handle.afterDatabaseTransaction() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages] + receivedMessages.add(msg) + } + } + startNodeMessagingClient() + + messagingClient2.deliver(fakeMsg1) + messagingClient2.deliver(fakeMsg2) + + // Should receive 2 and then 1 (and not 2 again). + val received = receivedMessages.take() + assertThat(received.senderSeqNo).isEqualTo(1) + val received2 = receivedMessages.poll() + assertThat(received2.senderSeqNo).isEqualTo(0) + val received3 = receivedMessages.poll() + assertThat(received3).isNull() + } finally { + messagingClient1.stop() + } + } + // Re-receive on different client from re-started sender @Test fun `re-send from different client and re-receive from different client is ignored`() { @@ -274,13 +320,14 @@ class ArtemisMessagingTest { messagingClient!!.start() } - private fun createAndStartClientAndServer(platformVersion: Int = 1): Pair> { + private fun createAndStartClientAndServer(platformVersion: Int = 1, dontAckCondition: (msg: ReceivedMessage) -> Boolean = { false }): Pair> { val receivedMessages = LinkedBlockingQueue() createMessagingServer().start() val messagingClient = createMessagingClient(platformVersion = platformVersion) messagingClient.addMessageHandler(TOPIC) { message, _, handle -> + if (dontAckCondition(message)) return@addMessageHandler database.transaction { handle.insideDatabaseTransaction() } handle.afterDatabaseTransaction() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages] receivedMessages.add(message)