mirror of
https://github.com/corda/corda.git
synced 2025-01-31 00:24:59 +00:00
ENT-3484 - Added periodic log.warn message to remind that the node has been set into draining mode. (#5151)
* Added periodic message logged from a timer which is switched on and off by the node drain events. * Timer also spawns on start() to ensure that if a node is shutdown in draining mode and started with it, it will continue logging messages.
This commit is contained in:
parent
d6ea66d0ab
commit
3b7f0aff92
@ -7,7 +7,9 @@ import net.corda.core.internal.concurrent.map
|
|||||||
import net.corda.core.messaging.startFlow
|
import net.corda.core.messaging.startFlow
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.core.utilities.getOrThrow
|
import net.corda.core.utilities.getOrThrow
|
||||||
|
import net.corda.core.utilities.seconds
|
||||||
import net.corda.core.utilities.unwrap
|
import net.corda.core.utilities.unwrap
|
||||||
|
import net.corda.node.logging.logFile
|
||||||
import net.corda.node.services.Permissions
|
import net.corda.node.services.Permissions
|
||||||
import net.corda.nodeapi.internal.hasCancelledDrainingShutdown
|
import net.corda.nodeapi.internal.hasCancelledDrainingShutdown
|
||||||
import net.corda.testing.core.ALICE_NAME
|
import net.corda.testing.core.ALICE_NAME
|
||||||
@ -19,6 +21,7 @@ import net.corda.testing.driver.internal.incrementalPortAllocation
|
|||||||
import net.corda.testing.internal.chooseIdentity
|
import net.corda.testing.internal.chooseIdentity
|
||||||
import net.corda.testing.node.User
|
import net.corda.testing.node.User
|
||||||
import net.corda.testing.node.internal.waitForShutdown
|
import net.corda.testing.node.internal.waitForShutdown
|
||||||
|
import org.assertj.core.api.Assertions
|
||||||
import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat
|
import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat
|
||||||
import org.junit.After
|
import org.junit.After
|
||||||
import org.junit.Before
|
import org.junit.Before
|
||||||
|
@ -56,6 +56,7 @@ import java.util.*
|
|||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import java.util.concurrent.CountDownLatch
|
import java.util.concurrent.CountDownLatch
|
||||||
import javax.annotation.concurrent.ThreadSafe
|
import javax.annotation.concurrent.ThreadSafe
|
||||||
|
import kotlin.concurrent.timer
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class implements the [MessagingService] API using Apache Artemis, the successor to their ActiveMQ product.
|
* This class implements the [MessagingService] API using Apache Artemis, the successor to their ActiveMQ product.
|
||||||
@ -193,7 +194,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
|||||||
|
|
||||||
inboxes.forEach { createQueueIfAbsent(it, producerSession!!, exclusive = true) }
|
inboxes.forEach { createQueueIfAbsent(it, producerSession!!, exclusive = true) }
|
||||||
|
|
||||||
p2pConsumer = P2PMessagingConsumer(inboxes, createNewSession, isDrainingModeOn, drainingModeWasChangedEvents)
|
p2pConsumer = P2PMessagingConsumer(inboxes, createNewSession, isDrainingModeOn, drainingModeWasChangedEvents, metricRegistry)
|
||||||
|
|
||||||
messagingExecutor = MessagingExecutor(
|
messagingExecutor = MessagingExecutor(
|
||||||
executorSession!!,
|
executorSession!!,
|
||||||
@ -573,10 +574,12 @@ private class P2PMessagingConsumer(
|
|||||||
queueNames: Set<String>,
|
queueNames: Set<String>,
|
||||||
createSession: () -> ClientSession,
|
createSession: () -> ClientSession,
|
||||||
private val isDrainingModeOn: () -> Boolean,
|
private val isDrainingModeOn: () -> Boolean,
|
||||||
private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>) : LifecycleSupport {
|
private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>,
|
||||||
|
private val metricsRegistry : MetricRegistry) : LifecycleSupport {
|
||||||
|
|
||||||
private companion object {
|
private companion object {
|
||||||
private const val initialSessionMessages = "${P2PMessagingHeaders.Type.KEY}<>'${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'"
|
private const val initialSessionMessages = "${P2PMessagingHeaders.Type.KEY}<>'${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'"
|
||||||
|
private val logger by lazy { loggerFor<P2PMessagingClient>() }
|
||||||
}
|
}
|
||||||
|
|
||||||
private var startedFlag = false
|
private var startedFlag = false
|
||||||
@ -587,16 +590,30 @@ private class P2PMessagingConsumer(
|
|||||||
private val initialAndExistingConsumer = multiplex(queueNames, createSession)
|
private val initialAndExistingConsumer = multiplex(queueNames, createSession)
|
||||||
private val subscriptions = mutableSetOf<Subscription>()
|
private val subscriptions = mutableSetOf<Subscription>()
|
||||||
|
|
||||||
|
private var notificationTimer : Timer? = null
|
||||||
|
private fun scheduleDrainNotificationTimer() {
|
||||||
|
notificationTimer = timer("DrainNotificationTimer", true, 10.seconds.toMillis(), 1.minutes.toMillis()) {
|
||||||
|
logger.warn("Node is currently in draining mode, new flows will not be processed! Flows in flight: ${metricsRegistry.gauges["Flows.InFlight"]?.value}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override fun start() {
|
override fun start() {
|
||||||
|
|
||||||
synchronized(this) {
|
synchronized(this) {
|
||||||
require(!startedFlag){"Must not already be started"}
|
require(!startedFlag){"Must not already be started"}
|
||||||
drainingModeWasChangedEvents.filter { change -> change.switchedOn() }.doOnNext { initialAndExistingConsumer.switchTo(existingOnlyConsumer) }.subscribe()
|
drainingModeWasChangedEvents.filter { change -> change.switchedOn() }.doOnNext {
|
||||||
drainingModeWasChangedEvents.filter { change -> change.switchedOff() }.doOnNext { existingOnlyConsumer.switchTo(initialAndExistingConsumer) }.subscribe()
|
initialAndExistingConsumer.switchTo(existingOnlyConsumer)
|
||||||
|
scheduleDrainNotificationTimer()
|
||||||
|
}.subscribe()
|
||||||
|
drainingModeWasChangedEvents.filter { change -> change.switchedOff() }.doOnNext {
|
||||||
|
existingOnlyConsumer.switchTo(initialAndExistingConsumer)
|
||||||
|
notificationTimer?.cancel()
|
||||||
|
}.subscribe()
|
||||||
subscriptions += existingOnlyConsumer.messages.doOnNext(messages::onNext).subscribe()
|
subscriptions += existingOnlyConsumer.messages.doOnNext(messages::onNext).subscribe()
|
||||||
subscriptions += initialAndExistingConsumer.messages.doOnNext(messages::onNext).subscribe()
|
subscriptions += initialAndExistingConsumer.messages.doOnNext(messages::onNext).subscribe()
|
||||||
if (isDrainingModeOn()) {
|
if (isDrainingModeOn()) {
|
||||||
existingOnlyConsumer.start()
|
existingOnlyConsumer.start()
|
||||||
|
scheduleDrainNotificationTimer()
|
||||||
} else {
|
} else {
|
||||||
initialAndExistingConsumer.start()
|
initialAndExistingConsumer.start()
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user