mirror of
https://github.com/corda/corda.git
synced 2025-04-07 19:34:41 +00:00
ENT-8811 OS port of flow draining fixes (#7269)
This commit is contained in:
parent
f8f374ec8c
commit
188027c1f5
@ -24,6 +24,10 @@ class P2PMessageDeduplicator(cacheFactory: NamedCacheFactory, private val databa
|
||||
private val beingProcessedMessages = ConcurrentHashMap<DeduplicationId, MessageMeta>()
|
||||
private val processedMessages = createProcessedMessages(cacheFactory)
|
||||
|
||||
enum class Outcome {
|
||||
NEW, DUPLICATE, IN_FLIGHT
|
||||
}
|
||||
|
||||
private fun createProcessedMessages(cacheFactory: NamedCacheFactory): AppendOnlyPersistentMap<DeduplicationId, MessageMeta, ProcessedMessage, String> {
|
||||
return AppendOnlyPersistentMap(
|
||||
cacheFactory = cacheFactory,
|
||||
@ -48,15 +52,17 @@ class P2PMessageDeduplicator(cacheFactory: NamedCacheFactory, private val databa
|
||||
private fun senderHash(senderKey: SenderKey) = SecureHash.sha256(senderKey.peer.toString() + senderKey.isSessionInit.toString() + senderKey.senderUUID).toString()
|
||||
|
||||
/**
|
||||
* @return true if we have seen this message before.
|
||||
* @return IN_FLIGHT if this message is currently being processed by the state machine, otherwise indicate if DUPLICATE or NEW.
|
||||
*/
|
||||
fun isDuplicate(msg: ReceivedMessage): Boolean {
|
||||
fun checkDuplicate(msg: ReceivedMessage): Outcome {
|
||||
if (beingProcessedMessages.containsKey(msg.uniqueMessageId)) {
|
||||
return true
|
||||
return Outcome.IN_FLIGHT
|
||||
}
|
||||
return isDuplicateInDatabase(msg)
|
||||
return booleanToEnum(isDuplicateInDatabase(msg))
|
||||
}
|
||||
|
||||
private fun booleanToEnum(isDuplicate: Boolean): Outcome = if (isDuplicate) Outcome.DUPLICATE else Outcome.NEW
|
||||
|
||||
/**
|
||||
* Called the first time we encounter [deduplicationId].
|
||||
*/
|
||||
|
@ -18,7 +18,15 @@ import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.minutes
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.internal.LifecycleSupport
|
||||
import net.corda.node.internal.artemis.ReactiveArtemisConsumer
|
||||
@ -31,12 +39,15 @@ import net.corda.node.services.statemachine.SenderDeduplicationId
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.errorAndTerminate
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HEADER_SIZE
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress
|
||||
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport
|
||||
import net.corda.nodeapi.internal.bridging.BridgeControl
|
||||
import net.corda.nodeapi.internal.bridging.BridgeEntry
|
||||
@ -65,7 +76,9 @@ import rx.subjects.PublishSubject
|
||||
import java.security.PublicKey
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.Collections
|
||||
import java.util.Timer
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
@ -440,12 +453,15 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
|
||||
internal fun deliver(artemisMessage: ClientMessage) {
|
||||
artemisToCordaMessage(artemisMessage)?.let { cordaMessage ->
|
||||
if (!deduplicator.isDuplicate(cordaMessage)) {
|
||||
val outcome = deduplicator.checkDuplicate(cordaMessage)
|
||||
if (outcome == P2PMessageDeduplicator.Outcome.NEW) {
|
||||
deduplicator.signalMessageProcessStart(cordaMessage)
|
||||
deliver(cordaMessage, artemisMessage)
|
||||
} else {
|
||||
log.trace { "Discard duplicate message ${cordaMessage.uniqueMessageId} for ${cordaMessage.topic}" }
|
||||
} else if (outcome == P2PMessageDeduplicator.Outcome.DUPLICATE) {
|
||||
log.debug { "Acknowledge duplicate message id: ${cordaMessage.uniqueMessageId} senderUUID: ${cordaMessage.senderUUID} senderSeqNo: ${cordaMessage.senderSeqNo} isSessionInit: ${cordaMessage.isSessionInit}" }
|
||||
messagingExecutor!!.acknowledge(artemisMessage)
|
||||
} else {
|
||||
log.debug { "Discard in-flight message id: ${cordaMessage.uniqueMessageId} senderUUID: ${cordaMessage.senderUUID} senderSeqNo: ${cordaMessage.senderSeqNo} isSessionInit: ${cordaMessage.isSessionInit}" }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user