From cba0427e01b45bffbabf8a59b3c201be3b48b482 Mon Sep 17 00:00:00 2001 From: Mike Hearn Date: Fri, 12 Aug 2016 13:56:21 +0200 Subject: [PATCH] Make ArtemisMessagingClient blocking and require the user to directly enter a message loop. This cleans up a few things and ensures we can't get caught out by messages being arbitrarily re-ordered as they pass through any Artemis thread pools. --- node/src/main/kotlin/com/r3corda/node/Main.kt | 10 +- .../kotlin/com/r3corda/node/driver/Driver.kt | 2 + .../com/r3corda/node/driver/NodeRunner.kt | 3 +- .../kotlin/com/r3corda/node/internal/Node.kt | 10 ++ .../node/services/api/ServiceHubInternal.kt | 6 ++ .../messaging/ArtemisMessagingClient.kt | 95 ++++++++++++++----- .../node/utilities/AffinityExecutor.kt | 1 + .../node/services/ArtemisMessagingTests.kt | 12 +-- src/main/kotlin/com/r3corda/demos/IRSDemo.kt | 8 +- .../kotlin/com/r3corda/demos/TraderDemo.kt | 43 ++++----- 10 files changed, 123 insertions(+), 67 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/Main.kt b/node/src/main/kotlin/com/r3corda/node/Main.kt index cee6ae1514..52d1392254 100644 --- a/node/src/main/kotlin/com/r3corda/node/Main.kt +++ b/node/src/main/kotlin/com/r3corda/node/Main.kt @@ -42,18 +42,12 @@ fun main(args: Array) { try { val dirFile = dir.toFile() - if (!dirFile.exists()) { + if (!dirFile.exists()) dirFile.mkdirs() - } val node = conf.createNode() node.start() - try { - // TODO create a proper daemon and/or provide some console handler to give interactive commands - while (true) Thread.sleep(Long.MAX_VALUE) - } catch(e: InterruptedException) { - node.stop() - } + node.run() } catch (e: Exception) { log.error("Exception during node startup", e) System.exit(1) diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index baf378fede..51e1ddd2e0 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -27,6 +27,7 @@ import java.util.* import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException +import kotlin.concurrent.thread /** * This file defines a small "Driver" DSL for starting up nodes. @@ -292,6 +293,7 @@ class DriverDSL( startNetworkMapService() messagingService.configureWithDevSSLCertificate() messagingService.start() + thread { messagingService.run() } messagingServiceStarted = true // We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from // the network map service itself diff --git a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt index c8d90a5c2e..1f96d50b12 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt @@ -8,8 +8,8 @@ import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.ServiceType import com.r3corda.node.internal.Node import com.r3corda.node.services.config.NodeConfiguration -import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.config.NodeConfigurationFromConfig +import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.network.NetworkMapService import joptsimple.ArgumentAcceptingOptionSpec import joptsimple.OptionParser @@ -64,6 +64,7 @@ class NodeRunner { log.info("Starting ${nodeConfiguration.myLegalName} with services $services on addresses $messagingAddress and $apiAddress") node.start() + node.run() } } } diff --git a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt index bf70df9253..245811f0fb 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt @@ -226,6 +226,12 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, return this } + /** Starts a blocking event loop for message dispatch. */ + fun run() { + (net as ArtemisMessagingClient).run() + } + + // TODO: Do we really need setup? override fun setup(): Node { super.setup() return this @@ -234,6 +240,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, private var shutdown = false override fun stop() { + check(!serverThread.isOnThread) synchronized(this) { if (shutdown) return shutdown = true @@ -244,6 +251,9 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, // Terminate the messaging system. This will block until messages that are in-flight have finished being // processed so it may take a moment. super.stop() + // We do another wait here, even though any in-flight messages have been drained away now because the + // server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is + // arbitrary and might be inappropriate. MoreExecutors.shutdownAndAwaitTermination(serverThread, 50, TimeUnit.SECONDS) messageBroker?.stop() nodeFileLock!!.release() diff --git a/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt index 3d5ce9c33e..0937dc5130 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt @@ -9,6 +9,12 @@ import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogicRefFactory interface MessagingServiceInternal: MessagingService { + /** + * Initiates shutdown: if called from a thread that isn't controlled by the executor passed to the constructor + * then this will block until all in-flight messages have finished being handled and acknowledged. If called + * from a thread that's a part of the [AffinityExecutor] given to the constructor, it returns immediately and + * shutdown is asynchronous. + */ fun stop() } diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt index 1ecb3d1a49..658b1aa2e2 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt @@ -17,6 +17,7 @@ import java.nio.file.Path import java.time.Instant import java.util.* import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.CountDownLatch import java.util.concurrent.Executor import javax.annotation.concurrent.ThreadSafe @@ -57,6 +58,7 @@ class ArtemisMessagingClient(directory: Path, } private class InnerState { + var started = false var running = false val producers = HashMap() var consumer: ClientConsumer? = null @@ -83,8 +85,8 @@ class ArtemisMessagingClient(directory: Path, fun start() { state.locked { - check(!running) - running = true + check(!started) { "start can't be called twice" } + started = true log.info("Connecting to server: $serverHostPort") // Connect to our server. @@ -99,15 +101,59 @@ class ArtemisMessagingClient(directory: Path, val address = myHostPort.toString() val queueName = myHostPort.toString() session.createQueue(address, queueName, false) - consumer = session.createConsumer(queueName).setMessageHandler { artemisMessage: ClientMessage -> - val message: Message? = artemisToCordaMessage(artemisMessage) - if (message != null) - deliver(message) - } + consumer = session.createConsumer(queueName) session.start() } } + private var shutdownLatch = CountDownLatch(1) + + /** Starts the event loop: this method only returns once [stop] has been called. */ + fun run() { + val consumer = state.locked { + check(started) + check(!running) { "run can't be called twice" } + running = true + consumer!! + } + + while (true) { + // Two possibilities here: + // + // 1. We block waiting for a message and the consumer is closed in another thread. In this case + // receive returns null and we break out of the loop. + // 2. We receive a message and process it, and stop() is called during delivery. In this case, + // calling receive will throw and we break out of the loop. + // + // It's safe to call into receive simultaneous with other threads calling send on a producer. + val artemisMessage: ClientMessage = try { + consumer.receive() + } catch(e: ActiveMQObjectClosedException) { + null + } ?: break + + val message: Message? = artemisToCordaMessage(artemisMessage) + if (message != null) + deliver(message) + + // Ack the message so it won't be redelivered. We should only really do this when there were no + // transient failures. If we caught an exception in the handler, we could back off and retry delivery + // a few times before giving up and redirecting the message to a dead-letter address for admin or + // developer inspection. Artemis has the features to do this for us, we just need to enable them. + // + // TODO: Setup Artemis delayed redelivery and dead letter addresses. + // + // ACKing a message calls back into the session which isn't thread safe, so we have to ensure it + // doesn't collide with a send here. Note that stop() could have been called whilst we were + // processing a message but if so, it'll be parked waiting for us to count down the latch, so + // the session itself is still around and we can still ack messages as a result. + state.locked { + artemisMessage.acknowledge() + } + } + shutdownLatch.countDown() + } + private fun artemisToCordaMessage(message: ClientMessage): Message? { try { if (!message.containsProperty(TOPIC_PROPERTY)) { @@ -140,6 +186,7 @@ class ArtemisMessagingClient(directory: Path, } private fun deliver(msg: Message): Boolean { + state.checkNotLocked() // Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added // or removed whilst the filter is executing will not affect anything. val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession } @@ -158,13 +205,14 @@ class ArtemisMessagingClient(directory: Path, for (handler in deliverTo) { try { - // This will perform a BLOCKING call onto the executor, although we are not actually 'fetching' anything - // from the thread as the callbacks don't return anything. Thus if the handlers are slow, we will be slow, - // and Artemis can handle that case intelligently. We don't just invoke the handler directly in order to - // ensure that we have the features of the AffinityExecutor class throughout the bulk of the codebase. + // This will perform a BLOCKING call onto the executor. Thus if the handlers are slow, we will + // be slow, and Artemis can handle that case intelligently. We don't just invoke the handler + // directly in order to ensure that we have the features of the AffinityExecutor class throughout + // the bulk of the codebase and other non-messaging jobs can be scheduled onto the server executor + // easily. // - // Note that handlers may re-enter this class. We aren't holding any locks at this point, so that's OK. - state.checkNotLocked() + // Note that handlers may re-enter this class. We aren't holding any locks and methods like + // start/run/stop have re-entrancy assertions at the top, so it is OK. executor.fetchFrom { handler.callback(msg, handler) } @@ -177,21 +225,24 @@ class ArtemisMessagingClient(directory: Path, } override fun stop() { - state.locked { - if (clientFactory == null) - return // Was never started to begin with, so just ignore. + val running = state.locked { + // We allow stop() to be called without a run() in between, but it must have at least been started. + check(started) - // Setting the message handler to null here will block until there are no more threads running the handler, - // so once we come back we know we can close the consumer and no more messages are being processed - // anywhere, due to the blocking delivery. + val c = consumer ?: throw IllegalStateException("stop can't be called twice") try { - consumer?.messageHandler = null - consumer?.close() + c.close() } catch(e: ActiveMQObjectClosedException) { // Ignore it: this can happen if the server has gone away before we do. } consumer = null - + running + } + if (running && !executor.isOnThread) { + // Wait for the main loop to notice the consumer has gone and finish up. + shutdownLatch.await() + } + state.locked { for (producer in producers.values) producer.close() producers.clear() diff --git a/node/src/main/kotlin/com/r3corda/node/utilities/AffinityExecutor.kt b/node/src/main/kotlin/com/r3corda/node/utilities/AffinityExecutor.kt index a27436b6b6..d427704c95 100644 --- a/node/src/main/kotlin/com/r3corda/node/utilities/AffinityExecutor.kt +++ b/node/src/main/kotlin/com/r3corda/node/utilities/AffinityExecutor.kt @@ -28,6 +28,7 @@ interface AffinityExecutor : Executor { execute(runnable) } + // TODO: Rename this to executeWithResult /** * Runs the given function on the executor, blocking until the result is available. Be careful not to deadlock this * way! Make sure the executor can't possibly be waiting for the calling thread. diff --git a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt index 389b8674cc..aab6e5587f 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt @@ -16,9 +16,8 @@ import org.junit.rules.TemporaryFolder import java.net.ServerSocket import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit.MILLISECONDS -import java.util.concurrent.TimeUnit.SECONDS +import kotlin.concurrent.thread import kotlin.test.assertEquals -import kotlin.test.assertNotNull import kotlin.test.assertNull class ArtemisMessagingTests { @@ -66,8 +65,9 @@ class ArtemisMessagingTests { createMessagingServer(serverAddress).start() - val messagingClient = createMessagingClient(server = invalidServerAddress) - assertThatThrownBy { messagingClient.start() } + messagingClient = createMessagingClient(server = invalidServerAddress) + assertThatThrownBy { messagingClient!!.start() } + messagingClient = null } @Test @@ -84,6 +84,7 @@ class ArtemisMessagingTests { val messagingClient = createMessagingClient() messagingClient.start() + thread { messagingClient.run() } messagingClient.addMessageHandler(topic) { message, r -> receivedMessages.add(message) @@ -92,8 +93,7 @@ class ArtemisMessagingTests { val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray()) messagingClient.send(message, messagingClient.myAddress) - val actual = receivedMessages.poll(2, SECONDS) - assertNotNull(actual) + val actual: Message = receivedMessages.take() assertEquals("first msg", String(actual.data)) assertNull(receivedMessages.poll(200, MILLISECONDS)) } diff --git a/src/main/kotlin/com/r3corda/demos/IRSDemo.kt b/src/main/kotlin/com/r3corda/demos/IRSDemo.kt index d520949fd5..0d72427503 100644 --- a/src/main/kotlin/com/r3corda/demos/IRSDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/IRSDemo.kt @@ -336,11 +336,7 @@ private fun runNode(cliParams: CliParams.RunNode): Int { runUploadRates(cliParams.apiAddress) } - try { - while (true) Thread.sleep(Long.MAX_VALUE) - } catch(e: InterruptedException) { - node.stop() - } + node.run() } catch (e: NotSetupException) { log.error(e.message) return 1 @@ -397,7 +393,7 @@ private fun startNode(params: CliParams.RunNode, networkMap: SingleMessageRecipi } val node = logElapsedTime("Node startup", log) { - Node(params.dir, params.networkAddress, params.apiAddress, config, networkMapId, advertisedServices, DemoClock()).start() + Node(params.dir, params.networkAddress, params.apiAddress, config, networkMapId, advertisedServices, DemoClock()).setup().start() } return node diff --git a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt index c61cee426a..8f204904b1 100644 --- a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt @@ -2,22 +2,20 @@ package com.r3corda.demos import co.paralleluniverse.fibers.Suspendable import com.google.common.net.HostAndPort +import com.google.common.util.concurrent.ListenableFuture import com.r3corda.contracts.CommercialPaper import com.r3corda.contracts.asset.DUMMY_CASH_ISSUER import com.r3corda.contracts.asset.cashBalances import com.r3corda.contracts.testing.fillWithSomeTestCash +import com.r3corda.core.* import com.r3corda.core.contracts.* import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SecureHash import com.r3corda.core.crypto.generateKeyPair -import com.r3corda.core.days -import com.r3corda.core.logElapsedTime import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.node.services.ServiceType import com.r3corda.core.protocols.ProtocolLogic -import com.r3corda.core.random63BitValue -import com.r3corda.core.seconds import com.r3corda.core.serialization.deserialize import com.r3corda.core.utilities.Emoji import com.r3corda.core.utilities.LogHelper @@ -40,7 +38,7 @@ import java.nio.file.Paths import java.security.PublicKey import java.time.Instant import java.util.* -import java.util.concurrent.CountDownLatch +import kotlin.concurrent.thread import kotlin.system.exitProcess import kotlin.test.assertEquals @@ -73,10 +71,6 @@ val DEFAULT_BASE_DIRECTORY = "./build/trader-demo" private val log: Logger = LoggerFactory.getLogger("TraderDemo") fun main(args: Array) { - exitProcess(runTraderDemo(args)) -} - -fun runTraderDemo(args: Array): Int { val parser = OptionParser() val roleArg = parser.accepts("role").withRequiredArg().ofType(Role::class.java).required() @@ -90,7 +84,7 @@ fun runTraderDemo(args: Array): Int { } catch (e: Exception) { log.error(e.message) printHelp(parser) - return 1 + exitProcess(1) } val role = options.valueOf(roleArg)!! @@ -160,12 +154,13 @@ fun runTraderDemo(args: Array): Int { if (role == Role.BUYER) { runBuyer(node, amount) } else { - node.networkMapRegistrationFuture.get() - val party = node.netMapCache.getNodeByLegalName("Bank A")?.identity ?: throw IllegalStateException("Cannot find other node?!") - runSeller(node, amount, party) + node.networkMapRegistrationFuture.success { + val party = node.netMapCache.getNodeByLegalName("Bank A")?.identity ?: throw IllegalStateException("Cannot find other node?!") + runSeller(node, amount, party) + } } - return 0 + node.run() } private fun runSeller(node: Node, amount: Amount, otherSide: Party) { @@ -182,18 +177,20 @@ private fun runSeller(node: Node, amount: Amount, otherSide: Party) { } } - var tradeTX: SignedTransaction? = null + val tradeTX: ListenableFuture if (node.isPreviousCheckpointsPresent) { - node.smm.findStateMachines(TraderDemoProtocolSeller::class.java).forEach { - tradeTX = it.second.get() - } + tradeTX = node.smm.findStateMachines(TraderDemoProtocolSeller::class.java).single().second } else { val seller = TraderDemoProtocolSeller(otherSide, amount) - tradeTX = node.smm.add("demo.seller", seller).get() + tradeTX = node.smm.add("demo.seller", seller) } - println("Sale completed - we have a happy customer!\n\nFinal transaction is:\n\n${Emoji.renderIfSupported(tradeTX!!.tx)}") - node.stop() + tradeTX.success { + println("Sale completed - we have a happy customer!\n\nFinal transaction is:\n\n${Emoji.renderIfSupported(it.tx)}") + thread { + node.stop() + } + } } private fun runBuyer(node: Node, amount: Amount) { @@ -222,13 +219,11 @@ private fun runBuyer(node: Node, amount: Amount) { val buyer = TraderDemoProtocolBuyer(otherSide, attachmentsPath, amount) node.smm.add("demo.buyer", buyer) } - - CountDownLatch(1).await() // Prevent the application from terminating } // We create a couple of ad-hoc test protocols that wrap the two party trade protocol, to give us the demo logic. -val DEMO_TOPIC = "initiate.demo.trade" +private val DEMO_TOPIC = "initiate.demo.trade" private class TraderDemoProtocolBuyer(val otherSide: Party, private val attachmentsPath: Path,