ENT-2489: Perform cleaner shutdown of Rpc/Flow workers (#1398)

* ENT-2489: Perform cleaner shutdown of P2P connections.

* ENT-2489: SessionFactory tidy-up

* ENT-2489: Tidy-up RpcFlowWorkerDriver shutdown sequence

* ENT-2489: Fix test compilation issue.

* ENT-2489: Introduce helper function following review from @mnesbit
This commit is contained in:
Viktor Kolomeyko 2018-09-18 17:54:08 +01:00 committed by GitHub
parent 18d4013d0c
commit dbee84c01d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 57 additions and 26 deletions

View File

@ -87,9 +87,9 @@ data class RpcFlowWorkerDriverDSL(private val driverDSL: DriverDSLImpl) : Intern
val flowWorkerBroker = createFlowWorkerBroker(config, signedNetworkParameters.networkParameters.maxMessageSize)
val rpcWorkerBroker = createRpcWorkerBroker(rpcWorkerConfig, signedNetworkParameters.networkParameters.maxMessageSize)
flowWorkerConfigs.map {
val flowWorkers = flowWorkerConfigs.map {
val (flowWorker, _) = createFlowWorker(it, myInfo, ourKeyPair, trustRoot, nodeCa, signedNetworkParameters)
shutdownManager.registerShutdown { flowWorker.stop() }
flowWorker
}
val (rpcWorker, rpcWorkerServiceHub) = createRpcWorker(rpcWorkerConfig, myInfo, signedNetworkParameters, ourKeyPair, trustRoot, nodeCa, rpcWorkerBroker.serverControl)
@ -97,10 +97,12 @@ data class RpcFlowWorkerDriverDSL(private val driverDSL: DriverDSLImpl) : Intern
val bridge = createBridge(bridgeConfig)
shutdownManager.registerShutdown {
bridge.stop()
// Gracefully shutdown bottom-up, i.e.: FlowWorker, RPC Worker, Brokers, Bridge
flowWorkers.forEach { it.stop() }
rpcWorker.stop()
flowWorkerBroker.stop()
rpcWorkerBroker.stop()
bridge.stop()
}
visibilityHandle.listen(rpcWorkerServiceHub.rpcOps).map {

View File

@ -0,0 +1,8 @@
package net.corda.node.services.messaging
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal
fun ClientSession.stillOpen(): Boolean {
return (!isClosed && (this as? ClientSessionInternal)?.isClosing != false)
}

View File

@ -115,7 +115,9 @@ class MessagingExecutor(
}
}
Job.Shutdown -> {
session.commit()
if(session.stillOpen()) {
session.commit()
}
break@eventLoop
}
}

View File

@ -110,6 +110,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
var bridgeSession: ClientSession? = null
var bridgeNotifyConsumer: ClientConsumer? = null
var networkChangeSubscription: Subscription? = null
var sessionFactory: ClientSessionFactory? = null
fun sendMessage(address: String, message: ClientMessage) = producer!!.send(address, message)
}
@ -133,8 +134,30 @@ class P2PMessagingClient(val config: NodeConfiguration,
private val handlers = ConcurrentHashMap<String, MessageHandler>()
private val deduplicator = P2PMessageDeduplicator(database)
// Note: Public visibility for testing
var messagingExecutor: MessagingExecutor? = null
private fun failoverCallback(event: FailoverEventType) {
when (event) {
FailoverEventType.FAILURE_DETECTED -> {
log.warn("Connection to the broker was lost. Starting ${config.enterpriseConfiguration.externalBrokerConnectionConfiguration.reconnectAttempts} reconnect attempts.")
}
FailoverEventType.FAILOVER_COMPLETED -> {
log.info("Connection to broker re-established.")
}
FailoverEventType.FAILOVER_FAILED -> state.locked {
if (running) {
log.error("Could not reconnect to the broker after ${config.enterpriseConfiguration.externalBrokerConnectionConfiguration.reconnectAttempts} attempts. Node is shutting down.")
Thread.sleep(config.enterpriseConfiguration.externalBrokerConnectionConfiguration.retryInterval.toMillis())
Runtime.getRuntime().halt(1)
}
}
else -> {
log.warn("Cannot handle event $event.")
}
}
}
/**
* @param myIdentity The primary identity of the node, which defines the messaging address for externally received messages.
* It is also used to construct the myAddress field, which is ultimately advertised in the network map.
@ -171,25 +194,9 @@ class P2PMessagingClient(val config: NodeConfiguration,
initialConnectAttempts = config.enterpriseConfiguration.externalBrokerConnectionConfiguration.initialConnectAttempts
}
}
val sessionFactory = locator!!.createSessionFactory()
sessionFactory.addFailoverListener { event ->
when (event) {
FailoverEventType.FAILURE_DETECTED -> {
log.warn("Connection to the broker was lost. Starting ${config.enterpriseConfiguration.externalBrokerConnectionConfiguration.reconnectAttempts} reconnect attempts.")
}
FailoverEventType.FAILOVER_COMPLETED -> {
log.info("Connection to broker re-established.")
}
FailoverEventType.FAILOVER_FAILED -> {
log.error("Could not reconnect to the broker after ${config.enterpriseConfiguration.externalBrokerConnectionConfiguration.reconnectAttempts} attempts. Node is shutting down.")
Thread.sleep(config.enterpriseConfiguration.externalBrokerConnectionConfiguration.retryInterval.toMillis())
Runtime.getRuntime().halt(1)
}
else -> {
log.warn("Cannot handle event $event.")
}
}
}
sessionFactory = locator!!.createSessionFactory().addFailoverListener(::failoverCallback)
// Login using the node username. The broker will authenticate us as its node (as opposed to another peer)
// using our TLS certificate.
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer
@ -455,26 +462,37 @@ class P2PMessagingClient(val config: NodeConfiguration,
* it returns immediately and shutdown is asynchronous.
*/
fun stop() {
log.info("Stopping P2PMessagingClient for: $serverAddress")
val running = state.locked {
// We allow stop() to be called without a run() in between, but it must have at least been started.
check(started)
val prevRunning = running
running = false
networkChangeSubscription?.unsubscribe()
require(p2pConsumer != null, { "stop can't be called twice" })
require(producer != null, { "stop can't be called twice" })
require(p2pConsumer != null) { "stop can't be called twice" }
require(producer != null) { "stop can't be called twice" }
close(p2pConsumer)
p2pConsumer = null
close(producer)
producer = null
producerSession!!.commit()
producerSession?.let {
if (it.stillOpen()) {
it.commit()
}
}
close(bridgeNotifyConsumer)
knownQueues.clear()
eventsSubscription?.unsubscribe()
eventsSubscription = null
// Clean-up sessionFactory
sessionFactory?.let {
it.removeFailoverListener(::failoverCallback)
it.close()
}
prevRunning
}
if (running && !nodeExecutor.isOnThread) {
@ -486,6 +504,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
state.locked {
locator?.close()
}
log.info("Stopped P2PMessagingClient for: $serverAddress")
}
private fun close(target: AutoCloseable?) {