From 3b7f0aff92db1913e16e842361afc0ae94430fb1 Mon Sep 17 00:00:00 2001 From: Stefan Iliev <46542846+StefanIliev545@users.noreply.github.com> Date: Mon, 3 Jun 2019 11:49:29 +0100 Subject: [PATCH] ENT-3484 - Added periodic log.warn message to remind that the node has been set into draining mode. (#5151) * Added periodic message logged from a timer which is switched on and off by the node drain events. * Timer also spawns on start() to ensure that if a node is shutdown in draining mode and started with it, it will continue logging messages. --- .../draining/P2PFlowsDrainingModeTest.kt | 3 +++ .../services/messaging/P2PMessagingClient.kt | 25 ++++++++++++++++--- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt b/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt index 3fed0afbe4..f59d025c8b 100644 --- a/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt @@ -7,7 +7,9 @@ import net.corda.core.internal.concurrent.map import net.corda.core.messaging.startFlow import net.corda.core.utilities.contextLogger import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds import net.corda.core.utilities.unwrap +import net.corda.node.logging.logFile import net.corda.node.services.Permissions import net.corda.nodeapi.internal.hasCancelledDrainingShutdown import net.corda.testing.core.ALICE_NAME @@ -19,6 +21,7 @@ import net.corda.testing.driver.internal.incrementalPortAllocation import net.corda.testing.internal.chooseIdentity import net.corda.testing.node.User import net.corda.testing.node.internal.waitForShutdown +import org.assertj.core.api.Assertions import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat import org.junit.After import org.junit.Before diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index df6fce26e8..7065728ed0 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -56,6 +56,7 @@ import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CountDownLatch import javax.annotation.concurrent.ThreadSafe +import kotlin.concurrent.timer /** * This class implements the [MessagingService] API using Apache Artemis, the successor to their ActiveMQ product. @@ -193,7 +194,7 @@ class P2PMessagingClient(val config: NodeConfiguration, inboxes.forEach { createQueueIfAbsent(it, producerSession!!, exclusive = true) } - p2pConsumer = P2PMessagingConsumer(inboxes, createNewSession, isDrainingModeOn, drainingModeWasChangedEvents) + p2pConsumer = P2PMessagingConsumer(inboxes, createNewSession, isDrainingModeOn, drainingModeWasChangedEvents, metricRegistry) messagingExecutor = MessagingExecutor( executorSession!!, @@ -573,10 +574,12 @@ private class P2PMessagingConsumer( queueNames: Set, createSession: () -> ClientSession, private val isDrainingModeOn: () -> Boolean, - private val drainingModeWasChangedEvents: Observable>) : LifecycleSupport { + private val drainingModeWasChangedEvents: Observable>, + private val metricsRegistry : MetricRegistry) : LifecycleSupport { private companion object { private const val initialSessionMessages = "${P2PMessagingHeaders.Type.KEY}<>'${P2PMessagingHeaders.Type.SESSION_INIT_VALUE}'" + private val logger by lazy { loggerFor() } } private var startedFlag = false @@ -587,16 +590,30 @@ private class P2PMessagingConsumer( private val initialAndExistingConsumer = multiplex(queueNames, createSession) private val subscriptions = mutableSetOf() + private var notificationTimer : Timer? = null + private fun scheduleDrainNotificationTimer() { + notificationTimer = timer("DrainNotificationTimer", true, 10.seconds.toMillis(), 1.minutes.toMillis()) { + logger.warn("Node is currently in draining mode, new flows will not be processed! Flows in flight: ${metricsRegistry.gauges["Flows.InFlight"]?.value}") + } + } + override fun start() { synchronized(this) { require(!startedFlag){"Must not already be started"} - drainingModeWasChangedEvents.filter { change -> change.switchedOn() }.doOnNext { initialAndExistingConsumer.switchTo(existingOnlyConsumer) }.subscribe() - drainingModeWasChangedEvents.filter { change -> change.switchedOff() }.doOnNext { existingOnlyConsumer.switchTo(initialAndExistingConsumer) }.subscribe() + drainingModeWasChangedEvents.filter { change -> change.switchedOn() }.doOnNext { + initialAndExistingConsumer.switchTo(existingOnlyConsumer) + scheduleDrainNotificationTimer() + }.subscribe() + drainingModeWasChangedEvents.filter { change -> change.switchedOff() }.doOnNext { + existingOnlyConsumer.switchTo(initialAndExistingConsumer) + notificationTimer?.cancel() + }.subscribe() subscriptions += existingOnlyConsumer.messages.doOnNext(messages::onNext).subscribe() subscriptions += initialAndExistingConsumer.messages.doOnNext(messages::onNext).subscribe() if (isDrainingModeOn()) { existingOnlyConsumer.start() + scheduleDrainNotificationTimer() } else { initialAndExistingConsumer.start() }