mirror of
https://github.com/corda/corda.git
synced 2024-12-29 09:18:58 +00:00
Merge remote-tracking branch 'remotes/open/master' into merges/march-21-13-45
# Conflicts: # node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt
This commit is contained in:
commit
3887838bc5
@ -27,9 +27,14 @@ import net.corda.core.serialization.SingletonSerializeAsToken
|
|||||||
import net.corda.core.serialization.deserialize
|
import net.corda.core.serialization.deserialize
|
||||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||||
import net.corda.core.serialization.serialize
|
import net.corda.core.serialization.serialize
|
||||||
import net.corda.core.utilities.*
|
import net.corda.core.utilities.ByteSequence
|
||||||
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
|
import net.corda.core.utilities.OpaqueBytes
|
||||||
|
import net.corda.core.utilities.contextLogger
|
||||||
|
import net.corda.core.utilities.trace
|
||||||
import net.corda.node.VersionInfo
|
import net.corda.node.VersionInfo
|
||||||
import net.corda.node.internal.LifecycleSupport
|
import net.corda.node.internal.LifecycleSupport
|
||||||
|
import net.corda.node.internal.artemis.ReactiveArtemisConsumer
|
||||||
import net.corda.node.internal.artemis.ReactiveArtemisConsumer.Companion.multiplex
|
import net.corda.node.internal.artemis.ReactiveArtemisConsumer.Companion.multiplex
|
||||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||||
import net.corda.node.services.config.NodeConfiguration
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
@ -39,11 +44,14 @@ import net.corda.node.utilities.PersistentMap
|
|||||||
import net.corda.nodeapi.ArtemisTcpTransport
|
import net.corda.nodeapi.ArtemisTcpTransport
|
||||||
import net.corda.nodeapi.ConnectionDirection
|
import net.corda.nodeapi.ConnectionDirection
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress
|
||||||
import net.corda.nodeapi.internal.bridging.BridgeControl
|
import net.corda.nodeapi.internal.bridging.BridgeControl
|
||||||
import net.corda.nodeapi.internal.bridging.BridgeEntry
|
import net.corda.nodeapi.internal.bridging.BridgeEntry
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
@ -53,7 +61,12 @@ import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
|
|||||||
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
|
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType
|
import org.apache.activemq.artemis.api.core.RoutingType
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.apache.activemq.artemis.api.core.client.*
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ServerLocator
|
||||||
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.Subscription
|
import rx.Subscription
|
||||||
@ -483,8 +496,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
|||||||
running = false
|
running = false
|
||||||
runningFuture = openFuture()
|
runningFuture = openFuture()
|
||||||
networkChangeSubscription?.unsubscribe()
|
networkChangeSubscription?.unsubscribe()
|
||||||
require(p2pConsumer != null, {"stop can't be called twice"})
|
require(p2pConsumer != null, { "stop can't be called twice" })
|
||||||
require(producer != null, {"stop can't be called twice"})
|
require(producer != null, { "stop can't be called twice" })
|
||||||
|
|
||||||
close(p2pConsumer)
|
close(p2pConsumer)
|
||||||
p2pConsumer = null
|
p2pConsumer = null
|
||||||
@ -642,30 +655,30 @@ private class P2PMessagingConsumer(
|
|||||||
private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>) : LifecycleSupport {
|
private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>) : LifecycleSupport {
|
||||||
|
|
||||||
private companion object {
|
private companion object {
|
||||||
private const val initialSessionMessages = "${P2PMessagingHeaders.Type.KEY}='${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'"
|
private const val initialSessionMessages = "${P2PMessagingHeaders.Type.KEY}<>'${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'"
|
||||||
private const val existingSessionMessages = "${P2PMessagingHeaders.Type.KEY}<>'${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'"
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private var startedFlag = false
|
private var startedFlag = false
|
||||||
|
|
||||||
val messages: PublishSubject<ClientMessage> = PublishSubject.create<ClientMessage>()
|
val messages: PublishSubject<ClientMessage> = PublishSubject.create<ClientMessage>()
|
||||||
|
|
||||||
private var initialConsumer = multiplex(queueNames, createSession, initialSessionMessages)
|
private val existingOnlyConsumer = multiplex(queueNames, createSession, initialSessionMessages)
|
||||||
private var existingConsumer = multiplex(queueNames, createSession, existingSessionMessages)
|
private val initialAndExistingConsumer = multiplex(queueNames, createSession)
|
||||||
private val subscriptions = mutableSetOf<Subscription>()
|
private val subscriptions = mutableSetOf<Subscription>()
|
||||||
|
|
||||||
override fun start() {
|
override fun start() {
|
||||||
|
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
require(!startedFlag)
|
require(!startedFlag)
|
||||||
drainingModeWasChangedEvents.filter { change -> change.switchedOn() }.doOnNext { pauseInitial() }.subscribe()
|
drainingModeWasChangedEvents.filter { change -> change.switchedOn() }.doOnNext { initialAndExistingConsumer.switchTo(existingOnlyConsumer) }.subscribe()
|
||||||
drainingModeWasChangedEvents.filter { change -> change.switchedOff() }.doOnNext { resumeInitial() }.subscribe()
|
drainingModeWasChangedEvents.filter { change -> change.switchedOff() }.doOnNext { existingOnlyConsumer.switchTo(initialAndExistingConsumer) }.subscribe()
|
||||||
subscriptions += initialConsumer.messages.doOnNext(messages::onNext).subscribe()
|
subscriptions += existingOnlyConsumer.messages.doOnNext(messages::onNext).subscribe()
|
||||||
subscriptions += existingConsumer.messages.doOnNext(messages::onNext).subscribe()
|
subscriptions += initialAndExistingConsumer.messages.doOnNext(messages::onNext).subscribe()
|
||||||
if (!isDrainingModeOn()) {
|
if (isDrainingModeOn()) {
|
||||||
initialConsumer.start()
|
existingOnlyConsumer.start()
|
||||||
|
} else {
|
||||||
|
initialAndExistingConsumer.start()
|
||||||
}
|
}
|
||||||
existingConsumer.start()
|
|
||||||
startedFlag = true
|
startedFlag = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -674,8 +687,8 @@ private class P2PMessagingConsumer(
|
|||||||
|
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
if (startedFlag) {
|
if (startedFlag) {
|
||||||
initialConsumer.stop()
|
existingOnlyConsumer.stop()
|
||||||
existingConsumer.stop()
|
initialAndExistingConsumer.stop()
|
||||||
subscriptions.forEach(Subscription::unsubscribe)
|
subscriptions.forEach(Subscription::unsubscribe)
|
||||||
subscriptions.clear()
|
subscriptions.clear()
|
||||||
startedFlag = false
|
startedFlag = false
|
||||||
@ -687,25 +700,16 @@ private class P2PMessagingConsumer(
|
|||||||
override val started: Boolean
|
override val started: Boolean
|
||||||
get() = startedFlag
|
get() = startedFlag
|
||||||
|
|
||||||
|
|
||||||
private fun pauseInitial() {
|
|
||||||
|
|
||||||
if (initialConsumer.started && initialConsumer.connected) {
|
|
||||||
initialConsumer.disconnect()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun resumeInitial() {
|
|
||||||
|
|
||||||
if(!initialConsumer.started) {
|
|
||||||
initialConsumer.start()
|
|
||||||
}
|
|
||||||
if (!initialConsumer.connected) {
|
|
||||||
initialConsumer.connect()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun Pair<Boolean, Boolean>.switchedOff() = first && !second
|
private fun Pair<Boolean, Boolean>.switchedOff() = first && !second
|
||||||
|
|
||||||
private fun Pair<Boolean, Boolean>.switchedOn() = !first && second
|
private fun Pair<Boolean, Boolean>.switchedOn() = !first && second
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun ReactiveArtemisConsumer.switchTo(other: ReactiveArtemisConsumer) {
|
||||||
|
|
||||||
|
disconnect()
|
||||||
|
when {
|
||||||
|
!other.started -> other.start()
|
||||||
|
!other.connected -> other.connect()
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user