From ce8771900eec874f12daa7d49f54c1febad03794 Mon Sep 17 00:00:00 2001 From: Michele Sollecito Date: Wed, 21 Mar 2018 11:24:53 +0000 Subject: [PATCH] [CORDA-1235]: Prevent out of order consumption of initiation vs subsequent messages (#2850) --- .../services/messaging/P2PMessagingClient.kt | 67 +++++++++---------- 1 file changed, 30 insertions(+), 37 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index b115201602..16a1a34436 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -17,6 +17,7 @@ import net.corda.core.serialization.serialize import net.corda.core.utilities.* import net.corda.node.VersionInfo import net.corda.node.internal.LifecycleSupport +import net.corda.node.internal.artemis.ReactiveArtemisConsumer import net.corda.node.internal.artemis.ReactiveArtemisConsumer.Companion.multiplex import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.services.config.NodeConfiguration @@ -418,7 +419,7 @@ class P2PMessagingClient(private val config: NodeConfiguration, state.checkNotLocked() // Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added // or removed whilst the filter is executing will not affect anything. - val deliverTo = handlers.filter { it.topic.isBlank() || it.topic== msg.topic } + val deliverTo = handlers.filter { it.topic.isBlank() || it.topic == msg.topic } try { // This will perform a BLOCKING call onto the executor. Thus if the handlers are slow, we will // be slow, and Artemis can handle that case intelligently. We don't just invoke the handler @@ -469,8 +470,8 @@ class P2PMessagingClient(private val config: NodeConfiguration, val prevRunning = running running = false networkChangeSubscription?.unsubscribe() - require(p2pConsumer != null, {"stop can't be called twice"}) - require(producer != null, {"stop can't be called twice"}) + require(p2pConsumer != null, { "stop can't be called twice" }) + require(producer != null, { "stop can't be called twice" }) close(p2pConsumer) p2pConsumer = null @@ -506,7 +507,7 @@ class P2PMessagingClient(private val config: NodeConfiguration, override fun close() = stop() override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any, additionalHeaders: Map) { - sendInternal(message, target, retryId, additionalHeaders) + sendInternal(message, target, retryId, additionalHeaders) } private fun sendInternal(message: Message, target: MessageRecipients, retryId: Long?, additionalHeaders: Map = emptyMap()) { @@ -528,7 +529,7 @@ class P2PMessagingClient(private val config: NodeConfiguration, if (amqDelayMillis > 0 && message.topic == StateMachineManagerImpl.sessionTopic) { putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelayMillis) } - additionalHeaders.forEach { key, value -> putStringProperty(key, value)} + additionalHeaders.forEach { key, value -> putStringProperty(key, value) } } log.trace { "Send to: $mqAddress topic: ${message.topic} uuid: ${message.uniqueMessageId}" @@ -598,7 +599,8 @@ class P2PMessagingClient(private val config: NodeConfiguration, } else { // Otherwise we send the message to an internal queue for the target residing on our broker. It's then the // broker's job to route the message to the target's P2P queue. - val internalTargetQueue = (target as? ArtemisAddress)?.queueName ?: throw IllegalArgumentException("Not an Artemis address") + val internalTargetQueue = (target as? ArtemisAddress)?.queueName + ?: throw IllegalArgumentException("Not an Artemis address") state.locked { createQueueIfAbsent(internalTargetQueue, producerSession!!) } @@ -660,30 +662,30 @@ private class P2PMessagingConsumer( private val drainingModeWasChangedEvents: Observable>) : LifecycleSupport { private companion object { - private const val initialSessionMessages = "${P2PMessagingHeaders.Type.KEY}='${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'" - private const val existingSessionMessages = "${P2PMessagingHeaders.Type.KEY}<>'${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'" + private const val initialSessionMessages = "${P2PMessagingHeaders.Type.KEY}<>'${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'" } private var startedFlag = false val messages: PublishSubject = PublishSubject.create() - private var initialConsumer = multiplex(queueNames, createSession, initialSessionMessages) - private var existingConsumer = multiplex(queueNames, createSession, existingSessionMessages) + private val existingOnlyConsumer = multiplex(queueNames, createSession, initialSessionMessages) + private val initialAndExistingConsumer = multiplex(queueNames, createSession) private val subscriptions = mutableSetOf() override fun start() { synchronized(this) { require(!startedFlag) - drainingModeWasChangedEvents.filter { change -> change.switchedOn() }.doOnNext { pauseInitial() }.subscribe() - drainingModeWasChangedEvents.filter { change -> change.switchedOff() }.doOnNext { resumeInitial() }.subscribe() - subscriptions += initialConsumer.messages.doOnNext(messages::onNext).subscribe() - subscriptions += existingConsumer.messages.doOnNext(messages::onNext).subscribe() - if (!isDrainingModeOn()) { - initialConsumer.start() + drainingModeWasChangedEvents.filter { change -> change.switchedOn() }.doOnNext { initialAndExistingConsumer.switchTo(existingOnlyConsumer) }.subscribe() + drainingModeWasChangedEvents.filter { change -> change.switchedOff() }.doOnNext { existingOnlyConsumer.switchTo(initialAndExistingConsumer) }.subscribe() + subscriptions += existingOnlyConsumer.messages.doOnNext(messages::onNext).subscribe() + subscriptions += initialAndExistingConsumer.messages.doOnNext(messages::onNext).subscribe() + if (isDrainingModeOn()) { + existingOnlyConsumer.start() + } else { + initialAndExistingConsumer.start() } - existingConsumer.start() startedFlag = true } } @@ -692,8 +694,8 @@ private class P2PMessagingConsumer( synchronized(this) { if (startedFlag) { - initialConsumer.stop() - existingConsumer.stop() + existingOnlyConsumer.stop() + initialAndExistingConsumer.stop() subscriptions.forEach(Subscription::unsubscribe) subscriptions.clear() startedFlag = false @@ -705,25 +707,16 @@ private class P2PMessagingConsumer( override val started: Boolean get() = startedFlag - - private fun pauseInitial() { - - if (initialConsumer.started && initialConsumer.connected) { - initialConsumer.disconnect() - } - } - - private fun resumeInitial() { - - if(!initialConsumer.started) { - initialConsumer.start() - } - if (!initialConsumer.connected) { - initialConsumer.connect() - } - } - private fun Pair.switchedOff() = first && !second private fun Pair.switchedOn() = !first && second +} + +private fun ReactiveArtemisConsumer.switchTo(other: ReactiveArtemisConsumer) { + + disconnect() + when { + !other.started -> other.start() + !other.connected -> other.connect() + } } \ No newline at end of file