From 188027c1f5280cb9270dced2a0dfa323ce6484cd Mon Sep 17 00:00:00 2001 From: Rick Parker Date: Thu, 24 Nov 2022 10:27:00 +0000 Subject: [PATCH] ENT-8811 OS port of flow draining fixes (#7269) --- .../messaging/P2PMessageDeduplicator.kt | 14 +++++++--- .../services/messaging/P2PMessagingClient.kt | 28 +++++++++++++++---- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt index 4a63927fad..bb95590de6 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt @@ -24,6 +24,10 @@ class P2PMessageDeduplicator(cacheFactory: NamedCacheFactory, private val databa private val beingProcessedMessages = ConcurrentHashMap() private val processedMessages = createProcessedMessages(cacheFactory) + enum class Outcome { + NEW, DUPLICATE, IN_FLIGHT + } + private fun createProcessedMessages(cacheFactory: NamedCacheFactory): AppendOnlyPersistentMap { return AppendOnlyPersistentMap( cacheFactory = cacheFactory, @@ -48,15 +52,17 @@ class P2PMessageDeduplicator(cacheFactory: NamedCacheFactory, private val databa private fun senderHash(senderKey: SenderKey) = SecureHash.sha256(senderKey.peer.toString() + senderKey.isSessionInit.toString() + senderKey.senderUUID).toString() /** - * @return true if we have seen this message before. + * @return IN_FLIGHT if this message is currently being processed by the state machine, otherwise indicate if DUPLICATE or NEW. */ - fun isDuplicate(msg: ReceivedMessage): Boolean { + fun checkDuplicate(msg: ReceivedMessage): Outcome { if (beingProcessedMessages.containsKey(msg.uniqueMessageId)) { - return true + return Outcome.IN_FLIGHT } - return isDuplicateInDatabase(msg) + return booleanToEnum(isDuplicateInDatabase(msg)) } + private fun booleanToEnum(isDuplicate: Boolean): Outcome = if (isDuplicate) Outcome.DUPLICATE else Outcome.NEW + /** * Called the first time we encounter [deduplicationId]. */ diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 7e0aa7dd02..ce0fcfc9f8 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -18,7 +18,15 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.serialization.serialize -import net.corda.core.utilities.* +import net.corda.core.utilities.ByteSequence +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug +import net.corda.core.utilities.loggerFor +import net.corda.core.utilities.minutes +import net.corda.core.utilities.seconds +import net.corda.core.utilities.trace import net.corda.node.VersionInfo import net.corda.node.internal.LifecycleSupport import net.corda.node.internal.artemis.ReactiveArtemisConsumer @@ -31,12 +39,15 @@ import net.corda.node.services.statemachine.SenderDeduplicationId import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.errorAndTerminate import net.corda.nodeapi.internal.ArtemisMessagingComponent -import net.corda.nodeapi.internal.ArtemisMessagingComponent.* +import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HEADER_SIZE import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX +import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress +import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress +import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport import net.corda.nodeapi.internal.bridging.BridgeControl import net.corda.nodeapi.internal.bridging.BridgeEntry @@ -65,7 +76,9 @@ import rx.subjects.PublishSubject import java.security.PublicKey import java.time.Duration import java.time.Instant -import java.util.* +import java.util.Collections +import java.util.Timer +import java.util.UUID import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CountDownLatch import javax.annotation.concurrent.ThreadSafe @@ -440,12 +453,15 @@ class P2PMessagingClient(val config: NodeConfiguration, internal fun deliver(artemisMessage: ClientMessage) { artemisToCordaMessage(artemisMessage)?.let { cordaMessage -> - if (!deduplicator.isDuplicate(cordaMessage)) { + val outcome = deduplicator.checkDuplicate(cordaMessage) + if (outcome == P2PMessageDeduplicator.Outcome.NEW) { deduplicator.signalMessageProcessStart(cordaMessage) deliver(cordaMessage, artemisMessage) - } else { - log.trace { "Discard duplicate message ${cordaMessage.uniqueMessageId} for ${cordaMessage.topic}" } + } else if (outcome == P2PMessageDeduplicator.Outcome.DUPLICATE) { + log.debug { "Acknowledge duplicate message id: ${cordaMessage.uniqueMessageId} senderUUID: ${cordaMessage.senderUUID} senderSeqNo: ${cordaMessage.senderSeqNo} isSessionInit: ${cordaMessage.isSessionInit}" } messagingExecutor!!.acknowledge(artemisMessage) + } else { + log.debug { "Discard in-flight message id: ${cordaMessage.uniqueMessageId} senderUUID: ${cordaMessage.senderUUID} senderSeqNo: ${cordaMessage.senderSeqNo} isSessionInit: ${cordaMessage.isSessionInit}" } } } }