From ac81d2aa32e0eb76d73e5a5159fcc796fc372ca3 Mon Sep 17 00:00:00 2001 From: Mike Hearn Date: Tue, 9 Aug 2016 20:31:35 +0200 Subject: [PATCH] Messaging layer improvements: - Fix thread safety issues in ArtemisMessagingClient. The Artemis API isn't thread safe, but that isn't well documented and it will happily invoke callbacks in parallel. - Add discussion of how we tackle threading currently in the codebase and make a few other improvements. - Add a shutdown hook so we stop properly when the user presses ctrl-c --- .../src/main/kotlin/com/r3corda/core/Utils.kt | 1 + .../kotlin/com/r3corda/node/driver/Driver.kt | 15 +- .../kotlin/com/r3corda/node/internal/Node.kt | 60 +++++- .../messaging/ArtemisMessagingClient.kt | 177 ++++++++++-------- .../node/services/ArtemisMessagingTests.kt | 3 +- 5 files changed, 167 insertions(+), 89 deletions(-) diff --git a/core/src/main/kotlin/com/r3corda/core/Utils.kt b/core/src/main/kotlin/com/r3corda/core/Utils.kt index 7d6956a842..b55eccd02c 100644 --- a/core/src/main/kotlin/com/r3corda/core/Utils.kt +++ b/core/src/main/kotlin/com/r3corda/core/Utils.kt @@ -159,6 +159,7 @@ class ThreadBox(val content: T, val lock: ReentrantLock = ReentrantLock() check(lock.isHeldByCurrentThread, { "Expected $lock to already be locked." }) return body(content) } + fun checkNotLocked() = check(!lock.isHeldByCurrentThread) } /** diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index 5e2305981a..baf378fede 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -8,10 +8,11 @@ import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.node.services.ServiceType import com.r3corda.node.services.config.NodeConfiguration -import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.config.NodeConfigurationFromConfig +import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.NetworkMapService +import com.r3corda.node.utilities.AffinityExecutor import com.typesafe.config.Config import com.typesafe.config.ConfigRenderOptions import org.slf4j.Logger @@ -207,7 +208,8 @@ class DriverDSL( Paths.get(baseDirectory, "driver-artemis"), driverNodeConfiguration, serverHostPort = networkMapAddress, - myHostPort = portAllocation.nextHostAndPort() + myHostPort = portAllocation.nextHostAndPort(), + executor = AffinityExecutor.ServiceAffinityExecutor("Client thread", 1) ) var messagingServiceStarted = false @@ -220,9 +222,7 @@ class DriverDSL( } override fun shutdown() { - registeredProcesses.forEach { - it.destroy() - } + registeredProcesses.forEach(Process::destroy) /** Wait 5 seconds, then [Process.destroyForcibly] */ val finishedFuture = Executors.newSingleThreadExecutor().submit { waitForAllNodesToFinish() @@ -235,9 +235,8 @@ class DriverDSL( it.destroyForcibly() } } - if (messagingServiceStarted){ + if (messagingServiceStarted) messagingService.stop() - } // Check that we shut down properly addressMustNotBeBound(messagingService.myHostPort) @@ -361,7 +360,7 @@ class DriverDSL( ): Process { // Write node.conf - writeConfig("${cliParams.baseDirectory}", "node.conf", config) + writeConfig(cliParams.baseDirectory, "node.conf", config) val className = NodeRunner::class.java.canonicalName val separator = System.getProperty("file.separator") diff --git a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt index fe5ca572b1..bf70df9253 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt @@ -2,6 +2,7 @@ package com.r3corda.node.internal import com.codahale.metrics.JmxReporter import com.google.common.net.HostAndPort +import com.google.common.util.concurrent.MoreExecutors import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.services.ServiceType @@ -31,12 +32,12 @@ import java.net.InetSocketAddress import java.nio.channels.FileLock import java.nio.file.Path import java.time.Clock +import java.util.concurrent.TimeUnit import javax.management.ObjectName +import kotlin.concurrent.thread class ConfigurationException(message: String) : Exception(message) -// TODO: Split this into a regression testing environment - /** * A Node manages a standalone server that takes part in the P2P network. It creates the services found in [ServiceHub], * loads important data off disk and starts listening for connections. @@ -63,6 +64,43 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, override val log = loggerFor() + // DISCUSSION + // + // We use a single server thread for now, which means all message handling is serialized. + // + // Writing thread safe code is hard. In this project we are writing most node services and code to be thread safe, but + // the possibility of mistakes is always present. Thus we make a deliberate decision here to trade off some multi-core + // scalability in order to gain developer productivity by setting the size of the serverThread pool to one, which will + // reduce the number of threading bugs we will need to tackle. + // + // This leaves us with four possibilities in future: + // + // (1) We discover that processing messages is fast and that our eventual use cases do not need very high + // processing rates. We have benefited from the higher productivity and not lost anything. + // + // (2) We discover that we need greater multi-core scalability, but that the bulk of our time goes into particular CPU + // hotspots that are easily multi-threaded e.g. signature checking. We successfully multi-thread those hotspots + // and find that our software now scales sufficiently well to satisfy our user's needs. + // + // (3) We discover that it wasn't enough, but that we only need to run some messages in parallel and that the bulk of + // the work can stay single threaded. For example perhaps we find that latency sensitive UI requests must be handled + // on a separate thread pool where long blocking operations are not allowed, but that the bulk of the heavy lifting + // can stay single threaded. In this case we would need a separate thread pool, but we still minimise the amount of + // thread safe code we need to write and test. + // + // (4) None of the above are sufficient and we need to run all messages in parallel to get maximum (single machine) + // scalability and fully saturate all cores. In that case we can go fully free-threaded, e.g. change the number '1' + // below to some multiple of the core count. Alternatively by using the ForkJoinPool and let it figure out the right + // number of threads by itself. This will require some investment in stress testing to build confidence that we + // haven't made any mistakes, but it will only be necessary if eventual deployment scenarios demand it. + // + // Note that the messaging subsystem schedules work onto this thread in a blocking manner. That means if the server + // thread becomes too slow and a backlog of work starts to builds up it propagates back through into the messaging + // layer, which can then react to the backpressure. Artemis MQ in particular knows how to do flow control by paging + // messages to disk rather than letting us run out of RAM. + // + // The primary work done by the server thread is execution of protocol logics, and related + // serialisation/deserialisation work. override val serverThread = AffinityExecutor.ServiceAffinityExecutor("Node thread", 1) lateinit var webServer: Server @@ -180,6 +218,11 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, }. build(). start() + + Runtime.getRuntime().addShutdownHook(thread(start = false) { + stop() + }) + return this } @@ -188,12 +231,23 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, return this } + private var shutdown = false + override fun stop() { + synchronized(this) { + if (shutdown) return + shutdown = true + } + log.info("Shutting down ...") + // Shut down the web server. webServer.stop() + // Terminate the messaging system. This will block until messages that are in-flight have finished being + // processed so it may take a moment. super.stop() + MoreExecutors.shutdownAndAwaitTermination(serverThread, 50, TimeUnit.SECONDS) messageBroker?.stop() nodeFileLock!!.release() - serverThread.shutdownNow() + log.info("Shutdown complete") } private fun alreadyRunningNodeCheck() { diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt index 7b35d4bcdc..1ecb3d1a49 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt @@ -1,13 +1,15 @@ package com.r3corda.node.services.messaging import com.google.common.net.HostAndPort -import com.r3corda.core.RunOnCallerThread import com.r3corda.core.ThreadBox import com.r3corda.core.messaging.* +import com.r3corda.core.serialization.opaque import com.r3corda.core.utilities.loggerFor import com.r3corda.node.internal.Node import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.config.NodeConfiguration +import com.r3corda.node.utilities.AffinityExecutor +import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.* import java.nio.file.FileSystems @@ -20,18 +22,22 @@ import javax.annotation.concurrent.ThreadSafe /** * This class implements the [MessagingService] API using Apache Artemis, the successor to their ActiveMQ product. - * Artemis is a message queue broker and here we run a client connecting to the specified broker instance [ArtemisMessagingServer] + * Artemis is a message queue broker and here we run a client connecting to the specified broker instance + * [ArtemisMessagingServer]. + * + * Message handlers are run on the provided [AffinityExecutor] synchronously, that is, the Artemis callback threads + * are blocked until the handler is scheduled and completed. This allows backpressure to propagate from the given executor + * through into Artemis and from there, back through to senders. * * @param serverHostPort The address of the broker instance to connect to (might be running in the same process) * @param myHostPort What host and port to use as an address for incoming messages - * @param defaultExecutor This will be used as the default executor to run message handlers on, if no other is specified. */ @ThreadSafe class ArtemisMessagingClient(directory: Path, config: NodeConfiguration, val serverHostPort: HostAndPort, val myHostPort: HostAndPort, - val defaultExecutor: Executor = RunOnCallerThread) : ArtemisMessagingComponent(directory, config), MessagingServiceInternal { + val executor: AffinityExecutor) : ArtemisMessagingComponent(directory, config), MessagingServiceInternal { companion object { val log = loggerFor() @@ -53,6 +59,9 @@ class ArtemisMessagingClient(directory: Path, private class InnerState { var running = false val producers = HashMap() + var consumer: ClientConsumer? = null + var session: ClientSession? = null + var clientFactory: ClientSessionFactory? = null } /** A registration to handle messages of different types */ @@ -62,14 +71,9 @@ class ArtemisMessagingClient(directory: Path, override val myAddress: SingleMessageRecipient = Address(myHostPort) - private val mutex = ThreadBox(InnerState()) + private val state = ThreadBox(InnerState()) private val handlers = CopyOnWriteArrayList() - private var serverLocator: ServerLocator? = null - private var clientFactory: ClientSessionFactory? = null - private var session: ClientSession? = null - private var consumer: ClientConsumer? = null - // TODO: This is not robust and needs to be replaced by more intelligently using the message queue server. private val undeliveredMessages = CopyOnWriteArrayList() @@ -77,43 +81,42 @@ class ArtemisMessagingClient(directory: Path, require(directory.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" } } - fun start() = mutex.locked { - if (!running) { - configureAndStartClient() + fun start() { + state.locked { + check(!running) running = true + + log.info("Connecting to server: $serverHostPort") + // Connect to our server. + val tcpTransport = tcpTransport(ConnectionDirection.OUTBOUND, serverHostPort.hostText, serverHostPort.port) + val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport) + clientFactory = locator.createSessionFactory() + + // Create a queue on which to receive messages and set up the handler. + val session = clientFactory!!.createSession() + this.session = session + + val address = myHostPort.toString() + val queueName = myHostPort.toString() + session.createQueue(address, queueName, false) + consumer = session.createConsumer(queueName).setMessageHandler { artemisMessage: ClientMessage -> + val message: Message? = artemisToCordaMessage(artemisMessage) + if (message != null) + deliver(message) + } + session.start() } } - private fun configureAndStartClient() { - log.info("Connecting to server: $serverHostPort") - // Connect to our server. - val serverLocator = ActiveMQClient.createServerLocatorWithoutHA( - tcpTransport(ConnectionDirection.OUTBOUND, serverHostPort.hostText, serverHostPort.port)) - this.serverLocator = serverLocator - val clientFactory = serverLocator.createSessionFactory() - this.clientFactory = clientFactory - - // Create a queue on which to receive messages and set up the handler. - val session = clientFactory.createSession() - this.session = session - - val address = myHostPort.toString() - val queueName = myHostPort.toString() - session.createQueue(address, queueName, false) - consumer = session.createConsumer(queueName).setMessageHandler { message: ClientMessage -> handleIncomingMessage(message) } - session.start() - } - - private fun handleIncomingMessage(message: ClientMessage) { - // This code runs for every inbound message. + private fun artemisToCordaMessage(message: ClientMessage): Message? { try { if (!message.containsProperty(TOPIC_PROPERTY)) { log.warn("Received message without a $TOPIC_PROPERTY property, ignoring") - return + return null } if (!message.containsProperty(SESSION_ID_PROPERTY)) { log.warn("Received message without a $SESSION_ID_PROPERTY property, ignoring") - return + return null } val topic = message.getStringProperty(TOPIC_PROPERTY) val sessionID = message.getLongProperty(SESSION_ID_PROPERTY) @@ -126,18 +129,17 @@ class ArtemisMessagingClient(directory: Path, override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp) override val debugMessageID: String = message.messageID.toString() override fun serialise(): ByteArray = body - override fun toString() = topic + "#" + String(data) + override fun toString() = topic + "#" + data.opaque() } - deliverMessage(msg) - } finally { - // TODO the message is delivered onto an executor and so we may be acking the message before we've - // finished processing it - message.acknowledge() + return msg + } catch (e: Exception) { + log.error("Internal error whilst reading MQ message", e) + return null } } - private fun deliverMessage(msg: Message): Boolean { + private fun deliver(msg: Message): Boolean { // Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added // or removed whilst the filter is executing will not affect anything. val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession } @@ -155,57 +157,78 @@ class ArtemisMessagingClient(directory: Path, } for (handler in deliverTo) { - (handler.executor ?: defaultExecutor).execute { - try { + try { + // This will perform a BLOCKING call onto the executor, although we are not actually 'fetching' anything + // from the thread as the callbacks don't return anything. Thus if the handlers are slow, we will be slow, + // and Artemis can handle that case intelligently. We don't just invoke the handler directly in order to + // ensure that we have the features of the AffinityExecutor class throughout the bulk of the codebase. + // + // Note that handlers may re-enter this class. We aren't holding any locks at this point, so that's OK. + state.checkNotLocked() + executor.fetchFrom { handler.callback(msg, handler) - } catch(e: Exception) { - log.error("Caught exception whilst executing message handler for ${msg.topicSession}", e) } + } catch(e: Exception) { + log.error("Caught exception whilst executing message handler for ${msg.topicSession}", e) } } return true } - override fun stop() = mutex.locked { - for (producer in producers.values) producer.close() - producers.clear() - consumer?.close() - session?.close() - clientFactory?.close() - serverLocator?.close() - // We expect to be garbage collected shortly after being stopped, so we don't null anything explicitly here. - running = false + override fun stop() { + state.locked { + if (clientFactory == null) + return // Was never started to begin with, so just ignore. + + // Setting the message handler to null here will block until there are no more threads running the handler, + // so once we come back we know we can close the consumer and no more messages are being processed + // anywhere, due to the blocking delivery. + try { + consumer?.messageHandler = null + consumer?.close() + } catch(e: ActiveMQObjectClosedException) { + // Ignore it: this can happen if the server has gone away before we do. + } + consumer = null + + for (producer in producers.values) producer.close() + producers.clear() + + // Closing the factory closes all the sessions it produced as well. + clientFactory!!.close() + clientFactory = null + } } override fun send(message: Message, target: MessageRecipients) { if (target !is Address) TODO("Only simple sends to single recipients are currently implemented") - val artemisMessage = session!!.createMessage(true).apply { - val sessionID = message.topicSession.sessionID - putStringProperty(TOPIC_PROPERTY, message.topicSession.topic) - putLongProperty(SESSION_ID_PROPERTY, sessionID) - writeBodyBufferBytes(message.data) - } - getProducerForAddress(target).send(artemisMessage) - } - private fun getProducerForAddress(address: Address): ClientProducer { - return mutex.locked { - producers.getOrPut(address) { - if (address != myAddress) { - maybeCreateQueue(address.hostAndPort) - } - session!!.createProducer(address.hostAndPort.toString()) + state.locked { + val artemisMessage = session!!.createMessage(true).apply { + val sessionID = message.topicSession.sessionID + putStringProperty(TOPIC_PROPERTY, message.topicSession.topic) + putLongProperty(SESSION_ID_PROPERTY, sessionID) + writeBodyBufferBytes(message.data) } + + val producer = producers.getOrPut(target) { + if (target != myAddress) + maybeCreateQueue(target.hostAndPort) + session!!.createProducer(target.hostAndPort.toString()) + } + producer.send(artemisMessage) } } private fun maybeCreateQueue(hostAndPort: HostAndPort) { - val name = hostAndPort.toString() - val queueQuery = session!!.queueQuery(SimpleString(name)) - if (!queueQuery.isExists) { - session!!.createQueue(name, name, true /* durable */) + state.alreadyLocked { + val name = hostAndPort.toString() + val queueQuery = session!!.queueQuery(SimpleString(name)) + if (!queueQuery.isExists) { + session!!.createQueue(name, name, true /* durable */) + } } } @@ -219,7 +242,7 @@ class ArtemisMessagingClient(directory: Path, require(!topicSession.isBlank()) { "Topic must not be blank, as the empty topic is a special case." } val handler = Handler(executor, topicSession, callback) handlers.add(handler) - undeliveredMessages.removeIf { deliverMessage(it) } + undeliveredMessages.removeIf { deliver(it) } return handler } diff --git a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt index 0ca66bef07..389b8674cc 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt @@ -7,6 +7,7 @@ import com.r3corda.core.testing.freeLocalHostAndPort import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.messaging.ArtemisMessagingServer +import com.r3corda.node.utilities.AffinityExecutor import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.After import org.junit.Rule @@ -99,7 +100,7 @@ class ArtemisMessagingTests { private fun createMessagingClient(server: HostAndPort = hostAndPort, local: HostAndPort = hostAndPort): ArtemisMessagingClient { - return ArtemisMessagingClient(temporaryFolder.newFolder().toPath(), config, server, local).apply { + return ArtemisMessagingClient(temporaryFolder.newFolder().toPath(), config, server, local, AffinityExecutor.SAME_THREAD).apply { configureWithDevSSLCertificate() messagingClient = this }