Merged in mnesbit-cleanup-shutdown-in-reverse-order (pull request )

Shutdown node components in reverse order to startup
This commit is contained in:
Matthew Nesbit 2016-09-06 09:32:58 +01:00
commit c6d16ea609
2 changed files with 37 additions and 21 deletions
node/src/main/kotlin/com/r3corda/node/internal

@ -2,6 +2,7 @@ package com.r3corda.node.internal
import com.codahale.metrics.MetricRegistry import com.codahale.metrics.MetricRegistry
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.RunOnCallerThread import com.r3corda.core.RunOnCallerThread
import com.r3corda.core.contracts.SignedTransaction import com.r3corda.core.contracts.SignedTransaction
@ -49,13 +50,14 @@ import com.r3corda.node.utilities.AddOrRemove
import com.r3corda.node.utilities.AffinityExecutor import com.r3corda.node.utilities.AffinityExecutor
import com.r3corda.node.utilities.configureDatabase import com.r3corda.node.utilities.configureDatabase
import org.slf4j.Logger import org.slf4j.Logger
import java.io.Closeable
import java.nio.file.FileAlreadyExistsException import java.nio.file.FileAlreadyExistsException
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
import java.security.KeyPair import java.security.KeyPair
import java.time.Clock import java.time.Clock
import java.util.* import java.util.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
/** /**
* A base node implementation that can be customised either for production (with real implementations that do real * A base node implementation that can be customised either for production (with real implementations that do real
@ -134,7 +136,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
lateinit var scheduler: SchedulerService lateinit var scheduler: SchedulerService
lateinit var protocolLogicFactory: ProtocolLogicRefFactory lateinit var protocolLogicFactory: ProtocolLogicRefFactory
val customServices: ArrayList<Any> = ArrayList() val customServices: ArrayList<Any> = ArrayList()
protected val closeOnStop: ArrayList<Closeable> = ArrayList() protected val runOnStop: ArrayList<Runnable> = ArrayList()
/** Locates and returns a service of the given type if loaded, or throws an exception if not found. */ /** Locates and returns a service of the given type if loaded, or throws an exception if not found. */
inline fun <reified T: Any> findService() = customServices.filterIsInstance<T>().single() inline fun <reified T: Any> findService() = customServices.filterIsInstance<T>().single()
@ -166,6 +168,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
checkpointStorage = storageServices.second checkpointStorage = storageServices.second
netMapCache = InMemoryNetworkMapCache() netMapCache = InMemoryNetworkMapCache()
net = makeMessagingService() net = makeMessagingService()
runOnStop += Runnable { net.stop() }
wallet = makeWalletService() wallet = makeWalletService()
identity = makeIdentityService() identity = makeIdentityService()
@ -188,6 +191,14 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
listOf(tokenizableServices), listOf(tokenizableServices),
checkpointStorage, checkpointStorage,
serverThread) serverThread)
if (serverThread is ExecutorService) {
runOnStop += Runnable {
// We wait here, even though any in-flight messages should have been drained away because the
// server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is
// arbitrary and might be inappropriate.
MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, TimeUnit.SECONDS)
}
}
inNodeWalletMonitorService = makeWalletMonitorService() // Note this HAS to be after smm is set inNodeWalletMonitorService = makeWalletMonitorService() // Note this HAS to be after smm is set
buildAdvertisedServices() buildAdvertisedServices()
@ -213,7 +224,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
val (toClose, database) = configureDatabase(props) val (toClose, database) = configureDatabase(props)
// Now log the vendor string as this will also cause a connection to be tested eagerly. // Now log the vendor string as this will also cause a connection to be tested eagerly.
log.info("Connected to ${database.vendor} database.") log.info("Connected to ${database.vendor} database.")
closeOnStop += toClose runOnStop += Runnable { toClose.close() }
} }
} }
@ -354,11 +365,12 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
// to indicate "Please shut down gracefully" vs "Shut down now". // to indicate "Please shut down gracefully" vs "Shut down now".
// Meanwhile, we let the remote service send us updates until the acknowledgment buffer overflows and it // Meanwhile, we let the remote service send us updates until the acknowledgment buffer overflows and it
// unsubscribes us forcibly, rather than blocking the shutdown process. // unsubscribes us forcibly, rather than blocking the shutdown process.
net.stop()
// Stop in opposite order to starting // Run shutdown hooks in opposite order to starting
for (toClose in closeOnStop.reversed()) { for (toRun in runOnStop.reversed()) {
toClose.close() toRun.run()
} }
runOnStop.clear()
} }
protected abstract fun makeMessagingService(): MessagingServiceInternal protected abstract fun makeMessagingService(): MessagingServiceInternal

@ -2,7 +2,6 @@ package com.r3corda.node.internal
import com.codahale.metrics.JmxReporter import com.codahale.metrics.JmxReporter
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.MoreExecutors
import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.ServiceType
@ -30,11 +29,9 @@ import org.glassfish.jersey.server.ServerProperties
import org.glassfish.jersey.servlet.ServletContainer import org.glassfish.jersey.servlet.ServletContainer
import java.io.RandomAccessFile import java.io.RandomAccessFile
import java.lang.management.ManagementFactory import java.lang.management.ManagementFactory
import java.net.InetSocketAddress
import java.nio.channels.FileLock import java.nio.channels.FileLock
import java.nio.file.Path import java.nio.file.Path
import java.time.Clock import java.time.Clock
import java.util.concurrent.TimeUnit
import javax.management.ObjectName import javax.management.ObjectName
import kotlin.concurrent.thread import kotlin.concurrent.thread
@ -112,6 +109,8 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
// when our process shuts down, but we try in stop() anyway just to be nice. // when our process shuts down, but we try in stop() anyway just to be nice.
private var nodeFileLock: FileLock? = null private var nodeFileLock: FileLock? = null
private var shutdownThread: Thread? = null
override fun makeMessagingService(): MessagingServiceInternal { override fun makeMessagingService(): MessagingServiceInternal {
val serverAddr = messagingServerAddr ?: { val serverAddr = messagingServerAddr ?: {
messageBroker = ArtemisMessagingServer(dir, configuration, p2pAddr, services.networkMapCache) messageBroker = ArtemisMessagingServer(dir, configuration, p2pAddr, services.networkMapCache)
@ -128,6 +127,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
// Start up the embedded MQ server // Start up the embedded MQ server
messageBroker?.apply { messageBroker?.apply {
configureWithDevSSLCertificate() // TODO: Create proper certificate provisioning process configureWithDevSSLCertificate() // TODO: Create proper certificate provisioning process
runOnStop += Runnable { messageBroker?.stop() }
start() start()
bridgeToNetworkMapService(networkMapService) bridgeToNetworkMapService(networkMapService)
} }
@ -187,6 +187,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
} }
server.handler = handlerCollection server.handler = handlerCollection
runOnStop += Runnable { server.stop() }
server.start() server.start()
return server return server
} }
@ -252,9 +253,10 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
build(). build().
start() start()
Runtime.getRuntime().addShutdownHook(thread(start = false) { shutdownThread = thread(start = false) {
stop() stop()
}) }
Runtime.getRuntime().addShutdownHook(shutdownThread)
return this return this
} }
@ -277,18 +279,20 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
synchronized(this) { synchronized(this) {
if (shutdown) return if (shutdown) return
shutdown = true shutdown = true
// Unregister shutdown hook to prevent any unnecessary second calls to stop
if(shutdownThread != null) {
Runtime.getRuntime().removeShutdownHook(shutdownThread)
shutdownThread = null
}
} }
log.info("Shutting down ...") log.info("Shutting down ...")
// Shut down the web server.
webServer.stop() // All the Node started subsystems were registered with the runOnStop list at creation.
// Terminate the messaging system. This will block until messages that are in-flight have finished being // So now simply call the parent to stop everything in reverse order.
// processed so it may take a moment. // In particular this prevents premature shutdown of the Database by AbstractNode whilst the serverThread is active
super.stop() super.stop()
// We do another wait here, even though any in-flight messages have been drained away now because the
// server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is
// arbitrary and might be inappropriate.
MoreExecutors.shutdownAndAwaitTermination(serverThread, 50, TimeUnit.SECONDS)
messageBroker?.stop()
nodeFileLock!!.release() nodeFileLock!!.release()
log.info("Shutdown complete") log.info("Shutdown complete")
} }