diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index a35cadfa85..d87ad36f9c 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -27,9 +27,14 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.serialization.serialize -import net.corda.core.utilities.* +import net.corda.core.utilities.ByteSequence +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.trace import net.corda.node.VersionInfo import net.corda.node.internal.LifecycleSupport +import net.corda.node.internal.artemis.ReactiveArtemisConsumer import net.corda.node.internal.artemis.ReactiveArtemisConsumer.Companion.multiplex import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.services.config.NodeConfiguration @@ -39,11 +44,14 @@ import net.corda.node.utilities.PersistentMap import net.corda.nodeapi.ArtemisTcpTransport import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.internal.ArtemisMessagingComponent -import net.corda.nodeapi.internal.ArtemisMessagingComponent.* +import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX +import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress +import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress +import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress import net.corda.nodeapi.internal.bridging.BridgeControl import net.corda.nodeapi.internal.bridging.BridgeEntry import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -53,7 +61,12 @@ import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString -import org.apache.activemq.artemis.api.core.client.* +import org.apache.activemq.artemis.api.core.client.ActiveMQClient +import org.apache.activemq.artemis.api.core.client.ClientConsumer +import org.apache.activemq.artemis.api.core.client.ClientMessage +import org.apache.activemq.artemis.api.core.client.ClientProducer +import org.apache.activemq.artemis.api.core.client.ClientSession +import org.apache.activemq.artemis.api.core.client.ServerLocator import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY import rx.Observable import rx.Subscription @@ -483,8 +496,8 @@ class P2PMessagingClient(val config: NodeConfiguration, running = false runningFuture = openFuture() networkChangeSubscription?.unsubscribe() - require(p2pConsumer != null, {"stop can't be called twice"}) - require(producer != null, {"stop can't be called twice"}) + require(p2pConsumer != null, { "stop can't be called twice" }) + require(producer != null, { "stop can't be called twice" }) close(p2pConsumer) p2pConsumer = null @@ -642,30 +655,30 @@ private class P2PMessagingConsumer( private val drainingModeWasChangedEvents: Observable>) : LifecycleSupport { private companion object { - private const val initialSessionMessages = "${P2PMessagingHeaders.Type.KEY}='${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'" - private const val existingSessionMessages = "${P2PMessagingHeaders.Type.KEY}<>'${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'" + private const val initialSessionMessages = "${P2PMessagingHeaders.Type.KEY}<>'${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'" } private var startedFlag = false val messages: PublishSubject = PublishSubject.create() - private var initialConsumer = multiplex(queueNames, createSession, initialSessionMessages) - private var existingConsumer = multiplex(queueNames, createSession, existingSessionMessages) + private val existingOnlyConsumer = multiplex(queueNames, createSession, initialSessionMessages) + private val initialAndExistingConsumer = multiplex(queueNames, createSession) private val subscriptions = mutableSetOf() override fun start() { synchronized(this) { require(!startedFlag) - drainingModeWasChangedEvents.filter { change -> change.switchedOn() }.doOnNext { pauseInitial() }.subscribe() - drainingModeWasChangedEvents.filter { change -> change.switchedOff() }.doOnNext { resumeInitial() }.subscribe() - subscriptions += initialConsumer.messages.doOnNext(messages::onNext).subscribe() - subscriptions += existingConsumer.messages.doOnNext(messages::onNext).subscribe() - if (!isDrainingModeOn()) { - initialConsumer.start() + drainingModeWasChangedEvents.filter { change -> change.switchedOn() }.doOnNext { initialAndExistingConsumer.switchTo(existingOnlyConsumer) }.subscribe() + drainingModeWasChangedEvents.filter { change -> change.switchedOff() }.doOnNext { existingOnlyConsumer.switchTo(initialAndExistingConsumer) }.subscribe() + subscriptions += existingOnlyConsumer.messages.doOnNext(messages::onNext).subscribe() + subscriptions += initialAndExistingConsumer.messages.doOnNext(messages::onNext).subscribe() + if (isDrainingModeOn()) { + existingOnlyConsumer.start() + } else { + initialAndExistingConsumer.start() } - existingConsumer.start() startedFlag = true } } @@ -674,8 +687,8 @@ private class P2PMessagingConsumer( synchronized(this) { if (startedFlag) { - initialConsumer.stop() - existingConsumer.stop() + existingOnlyConsumer.stop() + initialAndExistingConsumer.stop() subscriptions.forEach(Subscription::unsubscribe) subscriptions.clear() startedFlag = false @@ -687,25 +700,16 @@ private class P2PMessagingConsumer( override val started: Boolean get() = startedFlag - - private fun pauseInitial() { - - if (initialConsumer.started && initialConsumer.connected) { - initialConsumer.disconnect() - } - } - - private fun resumeInitial() { - - if(!initialConsumer.started) { - initialConsumer.start() - } - if (!initialConsumer.connected) { - initialConsumer.connect() - } - } - private fun Pair.switchedOff() = first && !second private fun Pair.switchedOn() = !first && second } + +private fun ReactiveArtemisConsumer.switchTo(other: ReactiveArtemisConsumer) { + + disconnect() + when { + !other.started -> other.start() + !other.connected -> other.connect() + } +} \ No newline at end of file