[CORDA-1235]: Prevent out of order consumption of initiation vs subsequent messages (#2850)

This commit is contained in:
Michele Sollecito 2018-03-21 11:24:53 +00:00 committed by GitHub
parent f1ac3c39e9
commit ce8771900e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -17,6 +17,7 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.*
import net.corda.node.VersionInfo
import net.corda.node.internal.LifecycleSupport
import net.corda.node.internal.artemis.ReactiveArtemisConsumer
import net.corda.node.internal.artemis.ReactiveArtemisConsumer.Companion.multiplex
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.NodeConfiguration
@ -418,7 +419,7 @@ class P2PMessagingClient(private val config: NodeConfiguration,
state.checkNotLocked()
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
// or removed whilst the filter is executing will not affect anything.
val deliverTo = handlers.filter { it.topic.isBlank() || it.topic== msg.topic }
val deliverTo = handlers.filter { it.topic.isBlank() || it.topic == msg.topic }
try {
// This will perform a BLOCKING call onto the executor. Thus if the handlers are slow, we will
// be slow, and Artemis can handle that case intelligently. We don't just invoke the handler
@ -469,8 +470,8 @@ class P2PMessagingClient(private val config: NodeConfiguration,
val prevRunning = running
running = false
networkChangeSubscription?.unsubscribe()
require(p2pConsumer != null, {"stop can't be called twice"})
require(producer != null, {"stop can't be called twice"})
require(p2pConsumer != null, { "stop can't be called twice" })
require(producer != null, { "stop can't be called twice" })
close(p2pConsumer)
p2pConsumer = null
@ -506,7 +507,7 @@ class P2PMessagingClient(private val config: NodeConfiguration,
override fun close() = stop()
override fun send(message: Message, target: MessageRecipients, retryId: Long?, sequenceKey: Any, additionalHeaders: Map<String, String>) {
sendInternal(message, target, retryId, additionalHeaders)
sendInternal(message, target, retryId, additionalHeaders)
}
private fun sendInternal(message: Message, target: MessageRecipients, retryId: Long?, additionalHeaders: Map<String, String> = emptyMap()) {
@ -528,7 +529,7 @@ class P2PMessagingClient(private val config: NodeConfiguration,
if (amqDelayMillis > 0 && message.topic == StateMachineManagerImpl.sessionTopic) {
putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelayMillis)
}
additionalHeaders.forEach { key, value -> putStringProperty(key, value)}
additionalHeaders.forEach { key, value -> putStringProperty(key, value) }
}
log.trace {
"Send to: $mqAddress topic: ${message.topic} uuid: ${message.uniqueMessageId}"
@ -598,7 +599,8 @@ class P2PMessagingClient(private val config: NodeConfiguration,
} else {
// Otherwise we send the message to an internal queue for the target residing on our broker. It's then the
// broker's job to route the message to the target's P2P queue.
val internalTargetQueue = (target as? ArtemisAddress)?.queueName ?: throw IllegalArgumentException("Not an Artemis address")
val internalTargetQueue = (target as? ArtemisAddress)?.queueName
?: throw IllegalArgumentException("Not an Artemis address")
state.locked {
createQueueIfAbsent(internalTargetQueue, producerSession!!)
}
@ -660,30 +662,30 @@ private class P2PMessagingConsumer(
private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>) : LifecycleSupport {
private companion object {
private const val initialSessionMessages = "${P2PMessagingHeaders.Type.KEY}='${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'"
private const val existingSessionMessages = "${P2PMessagingHeaders.Type.KEY}<>'${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'"
private const val initialSessionMessages = "${P2PMessagingHeaders.Type.KEY}<>'${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'"
}
private var startedFlag = false
val messages: PublishSubject<ClientMessage> = PublishSubject.create<ClientMessage>()
private var initialConsumer = multiplex(queueNames, createSession, initialSessionMessages)
private var existingConsumer = multiplex(queueNames, createSession, existingSessionMessages)
private val existingOnlyConsumer = multiplex(queueNames, createSession, initialSessionMessages)
private val initialAndExistingConsumer = multiplex(queueNames, createSession)
private val subscriptions = mutableSetOf<Subscription>()
override fun start() {
synchronized(this) {
require(!startedFlag)
drainingModeWasChangedEvents.filter { change -> change.switchedOn() }.doOnNext { pauseInitial() }.subscribe()
drainingModeWasChangedEvents.filter { change -> change.switchedOff() }.doOnNext { resumeInitial() }.subscribe()
subscriptions += initialConsumer.messages.doOnNext(messages::onNext).subscribe()
subscriptions += existingConsumer.messages.doOnNext(messages::onNext).subscribe()
if (!isDrainingModeOn()) {
initialConsumer.start()
drainingModeWasChangedEvents.filter { change -> change.switchedOn() }.doOnNext { initialAndExistingConsumer.switchTo(existingOnlyConsumer) }.subscribe()
drainingModeWasChangedEvents.filter { change -> change.switchedOff() }.doOnNext { existingOnlyConsumer.switchTo(initialAndExistingConsumer) }.subscribe()
subscriptions += existingOnlyConsumer.messages.doOnNext(messages::onNext).subscribe()
subscriptions += initialAndExistingConsumer.messages.doOnNext(messages::onNext).subscribe()
if (isDrainingModeOn()) {
existingOnlyConsumer.start()
} else {
initialAndExistingConsumer.start()
}
existingConsumer.start()
startedFlag = true
}
}
@ -692,8 +694,8 @@ private class P2PMessagingConsumer(
synchronized(this) {
if (startedFlag) {
initialConsumer.stop()
existingConsumer.stop()
existingOnlyConsumer.stop()
initialAndExistingConsumer.stop()
subscriptions.forEach(Subscription::unsubscribe)
subscriptions.clear()
startedFlag = false
@ -705,25 +707,16 @@ private class P2PMessagingConsumer(
override val started: Boolean
get() = startedFlag
private fun pauseInitial() {
if (initialConsumer.started && initialConsumer.connected) {
initialConsumer.disconnect()
}
}
private fun resumeInitial() {
if(!initialConsumer.started) {
initialConsumer.start()
}
if (!initialConsumer.connected) {
initialConsumer.connect()
}
}
private fun Pair<Boolean, Boolean>.switchedOff() = first && !second
private fun Pair<Boolean, Boolean>.switchedOn() = !first && second
}
private fun ReactiveArtemisConsumer.switchTo(other: ReactiveArtemisConsumer) {
disconnect()
when {
!other.started -> other.start()
!other.connected -> other.connect()
}
}