diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt
index b115201602..16a1a34436 100644
--- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt
+++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt
@@ -17,6 +17,7 @@ import net.corda.core.serialization.serialize
 import net.corda.core.utilities.*
 import net.corda.node.VersionInfo
 import net.corda.node.internal.LifecycleSupport
+import net.corda.node.internal.artemis.ReactiveArtemisConsumer
 import net.corda.node.internal.artemis.ReactiveArtemisConsumer.Companion.multiplex
 import net.corda.node.services.api.NetworkMapCacheInternal
 import net.corda.node.services.config.NodeConfiguration
@@ -418,7 +419,7 @@ class P2PMessagingClient(private val config: NodeConfiguration,
         state.checkNotLocked()
         // Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
         // or removed whilst the filter is executing will not affect anything.
-        val deliverTo = handlers.filter { it.topic.isBlank() || it.topic== msg.topic }
+        val deliverTo = handlers.filter { it.topic.isBlank() || it.topic == msg.topic }
         try {
             // This will perform a BLOCKING call onto the executor. Thus if the handlers are slow, we will
             // be slow, and Artemis can handle that case intelligently. We don't just invoke the handler
@@ -469,8 +470,8 @@ class P2PMessagingClient(private val config: NodeConfiguration,
             val prevRunning = running
             running = false
             networkChangeSubscription?.unsubscribe()
-            require(p2pConsumer != null, {"stop can't be called twice"})
-            require(producer != null, {"stop can't be called twice"})
+            require(p2pConsumer != null, { "stop can't be called twice" })
+            require(producer != null, { "stop can't be called twice" })
 
             close(p2pConsumer)
             p2pConsumer = null
@@ -506,7 +507,7 @@ class P2PMessagingClient(private val config: NodeConfiguration,
     override fun close() = stop()
 
     override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any, additionalHeaders: Map<String, String>) {
-       sendInternal(message, target, retryId, additionalHeaders)
+        sendInternal(message, target, retryId, additionalHeaders)
     }
 
     private fun sendInternal(message: Message, target: MessageRecipients, retryId: Long?, additionalHeaders: Map<String, String> = emptyMap()) {
@@ -528,7 +529,7 @@ class P2PMessagingClient(private val config: NodeConfiguration,
                     if (amqDelayMillis > 0 && message.topic == StateMachineManagerImpl.sessionTopic) {
                         putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelayMillis)
                     }
-                    additionalHeaders.forEach { key, value -> putStringProperty(key, value)}
+                    additionalHeaders.forEach { key, value -> putStringProperty(key, value) }
                 }
                 log.trace {
                     "Send to: $mqAddress topic: ${message.topic} uuid: ${message.uniqueMessageId}"
@@ -598,7 +599,8 @@ class P2PMessagingClient(private val config: NodeConfiguration,
         } else {
             // Otherwise we send the message to an internal queue for the target residing on our broker. It's then the
             // broker's job to route the message to the target's P2P queue.
-            val internalTargetQueue = (target as? ArtemisAddress)?.queueName ?: throw IllegalArgumentException("Not an Artemis address")
+            val internalTargetQueue = (target as? ArtemisAddress)?.queueName
+                    ?: throw IllegalArgumentException("Not an Artemis address")
             state.locked {
                 createQueueIfAbsent(internalTargetQueue, producerSession!!)
             }
@@ -660,30 +662,30 @@ private class P2PMessagingConsumer(
         private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>) : LifecycleSupport {
 
     private companion object {
-        private const val initialSessionMessages = "${P2PMessagingHeaders.Type.KEY}='${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'"
-        private const val existingSessionMessages = "${P2PMessagingHeaders.Type.KEY}<>'${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'"
+        private const val initialSessionMessages = "${P2PMessagingHeaders.Type.KEY}<>'${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'"
     }
 
     private var startedFlag = false
 
     val messages: PublishSubject<ClientMessage> = PublishSubject.create<ClientMessage>()
 
-    private var initialConsumer = multiplex(queueNames, createSession, initialSessionMessages)
-    private var existingConsumer = multiplex(queueNames, createSession, existingSessionMessages)
+    private val existingOnlyConsumer = multiplex(queueNames, createSession, initialSessionMessages)
+    private val initialAndExistingConsumer = multiplex(queueNames, createSession)
     private val subscriptions = mutableSetOf<Subscription>()
 
     override fun start() {
 
         synchronized(this) {
             require(!startedFlag)
-            drainingModeWasChangedEvents.filter { change -> change.switchedOn() }.doOnNext { pauseInitial() }.subscribe()
-            drainingModeWasChangedEvents.filter { change -> change.switchedOff() }.doOnNext { resumeInitial() }.subscribe()
-            subscriptions += initialConsumer.messages.doOnNext(messages::onNext).subscribe()
-            subscriptions += existingConsumer.messages.doOnNext(messages::onNext).subscribe()
-            if (!isDrainingModeOn()) {
-                initialConsumer.start()
+            drainingModeWasChangedEvents.filter { change -> change.switchedOn() }.doOnNext { initialAndExistingConsumer.switchTo(existingOnlyConsumer) }.subscribe()
+            drainingModeWasChangedEvents.filter { change -> change.switchedOff() }.doOnNext { existingOnlyConsumer.switchTo(initialAndExistingConsumer) }.subscribe()
+            subscriptions += existingOnlyConsumer.messages.doOnNext(messages::onNext).subscribe()
+            subscriptions += initialAndExistingConsumer.messages.doOnNext(messages::onNext).subscribe()
+            if (isDrainingModeOn()) {
+                existingOnlyConsumer.start()
+            } else {
+                initialAndExistingConsumer.start()
             }
-            existingConsumer.start()
             startedFlag = true
         }
     }
@@ -692,8 +694,8 @@ private class P2PMessagingConsumer(
 
         synchronized(this) {
             if (startedFlag) {
-                initialConsumer.stop()
-                existingConsumer.stop()
+                existingOnlyConsumer.stop()
+                initialAndExistingConsumer.stop()
                 subscriptions.forEach(Subscription::unsubscribe)
                 subscriptions.clear()
                 startedFlag = false
@@ -705,25 +707,16 @@ private class P2PMessagingConsumer(
     override val started: Boolean
         get() = startedFlag
 
-
-    private fun pauseInitial() {
-
-        if (initialConsumer.started && initialConsumer.connected) {
-            initialConsumer.disconnect()
-        }
-    }
-
-    private fun resumeInitial() {
-
-        if(!initialConsumer.started) {
-            initialConsumer.start()
-        }
-        if (!initialConsumer.connected) {
-            initialConsumer.connect()
-        }
-    }
-
     private fun Pair<Boolean, Boolean>.switchedOff() = first && !second
 
     private fun Pair<Boolean, Boolean>.switchedOn() = !first && second
+}
+
+private fun ReactiveArtemisConsumer.switchTo(other: ReactiveArtemisConsumer) {
+
+    disconnect()
+    when {
+        !other.started -> other.start()
+        !other.connected -> other.connect()
+    }
 }
\ No newline at end of file