From 11efe8ca1b06a8114c50fafa57f98407eff38bd2 Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Mon, 5 Sep 2016 14:42:27 +0100 Subject: [PATCH] Extend Rick's closeOnShutdown so that all the shutdown that was being done manually is done through the same mechanism. Thus allowing for a consistent reverse ordering and hopefully preventing DB shutdown happening ahead of serverThread shutdown. Follow Rick's PR suggestions Preserve comment about abitrary timeout on serverThread stop --- .../com/r3corda/node/internal/AbstractNode.kt | 26 +++++++++++---- .../kotlin/com/r3corda/node/internal/Node.kt | 32 +++++++++++-------- 2 files changed, 37 insertions(+), 21 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index 3060c3c9ff..e2bc9f76cb 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -2,6 +2,7 @@ package com.r3corda.node.internal import com.codahale.metrics.MetricRegistry import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.MoreExecutors import com.google.common.util.concurrent.SettableFuture import com.r3corda.core.RunOnCallerThread import com.r3corda.core.contracts.SignedTransaction @@ -49,13 +50,14 @@ import com.r3corda.node.utilities.AddOrRemove import com.r3corda.node.utilities.AffinityExecutor import com.r3corda.node.utilities.configureDatabase import org.slf4j.Logger -import java.io.Closeable import java.nio.file.FileAlreadyExistsException import java.nio.file.Files import java.nio.file.Path import java.security.KeyPair import java.time.Clock import java.util.* +import java.util.concurrent.ExecutorService +import java.util.concurrent.TimeUnit /** * A base node implementation that can be customised either for production (with real implementations that do real @@ -134,7 +136,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, lateinit var scheduler: SchedulerService lateinit var protocolLogicFactory: ProtocolLogicRefFactory val customServices: ArrayList = ArrayList() - protected val closeOnStop: ArrayList = ArrayList() + protected val runOnStop: ArrayList = ArrayList() /** Locates and returns a service of the given type if loaded, or throws an exception if not found. */ inline fun findService() = customServices.filterIsInstance().single() @@ -166,6 +168,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, checkpointStorage = storageServices.second netMapCache = InMemoryNetworkMapCache() net = makeMessagingService() + runOnStop += Runnable { net.stop() } wallet = makeWalletService() identity = makeIdentityService() @@ -188,6 +191,14 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, listOf(tokenizableServices), checkpointStorage, serverThread) + if (serverThread is ExecutorService) { + runOnStop += Runnable { + // We wait here, even though any in-flight messages should have been drained away because the + // server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is + // arbitrary and might be inappropriate. + MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, TimeUnit.SECONDS) + } + } inNodeWalletMonitorService = makeWalletMonitorService() // Note this HAS to be after smm is set buildAdvertisedServices() @@ -213,7 +224,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, val (toClose, database) = configureDatabase(props) // Now log the vendor string as this will also cause a connection to be tested eagerly. log.info("Connected to ${database.vendor} database.") - closeOnStop += toClose + runOnStop += Runnable { toClose.close() } } } @@ -354,11 +365,12 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, // to indicate "Please shut down gracefully" vs "Shut down now". // Meanwhile, we let the remote service send us updates until the acknowledgment buffer overflows and it // unsubscribes us forcibly, rather than blocking the shutdown process. - net.stop() - // Stop in opposite order to starting - for (toClose in closeOnStop.reversed()) { - toClose.close() + + // Run shutdown hooks in opposite order to starting + for (toRun in runOnStop.reversed()) { + toRun.run() } + runOnStop.clear() } protected abstract fun makeMessagingService(): MessagingServiceInternal diff --git a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt index 9257056cfb..ccbdec136b 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt @@ -2,7 +2,6 @@ package com.r3corda.node.internal import com.codahale.metrics.JmxReporter import com.google.common.net.HostAndPort -import com.google.common.util.concurrent.MoreExecutors import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.services.ServiceType @@ -30,11 +29,9 @@ import org.glassfish.jersey.server.ServerProperties import org.glassfish.jersey.servlet.ServletContainer import java.io.RandomAccessFile import java.lang.management.ManagementFactory -import java.net.InetSocketAddress import java.nio.channels.FileLock import java.nio.file.Path import java.time.Clock -import java.util.concurrent.TimeUnit import javax.management.ObjectName import kotlin.concurrent.thread @@ -112,6 +109,8 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, // when our process shuts down, but we try in stop() anyway just to be nice. private var nodeFileLock: FileLock? = null + private var shutdownThread: Thread? = null + override fun makeMessagingService(): MessagingServiceInternal { val serverAddr = messagingServerAddr ?: { messageBroker = ArtemisMessagingServer(dir, configuration, p2pAddr, services.networkMapCache) @@ -128,6 +127,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, // Start up the embedded MQ server messageBroker?.apply { configureWithDevSSLCertificate() // TODO: Create proper certificate provisioning process + runOnStop += Runnable { messageBroker?.stop() } start() bridgeToNetworkMapService(networkMapService) } @@ -187,6 +187,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, } server.handler = handlerCollection + runOnStop += Runnable { server.stop() } server.start() return server } @@ -252,9 +253,10 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, build(). start() - Runtime.getRuntime().addShutdownHook(thread(start = false) { + shutdownThread = thread(start = false) { stop() - }) + } + Runtime.getRuntime().addShutdownHook(shutdownThread) return this } @@ -277,18 +279,20 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, synchronized(this) { if (shutdown) return shutdown = true + + // Unregister shutdown hook to prevent any unnecessary second calls to stop + if(shutdownThread != null) { + Runtime.getRuntime().removeShutdownHook(shutdownThread) + shutdownThread = null + } } log.info("Shutting down ...") - // Shut down the web server. - webServer.stop() - // Terminate the messaging system. This will block until messages that are in-flight have finished being - // processed so it may take a moment. + + // All the Node started subsystems were registered with the runOnStop list at creation. + // So now simply call the parent to stop everything in reverse order. + // In particular this prevents premature shutdown of the Database by AbstractNode whilst the serverThread is active super.stop() - // We do another wait here, even though any in-flight messages have been drained away now because the - // server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is - // arbitrary and might be inappropriate. - MoreExecutors.shutdownAndAwaitTermination(serverThread, 50, TimeUnit.SECONDS) - messageBroker?.stop() + nodeFileLock!!.release() log.info("Shutdown complete") }