Make ArtemisMessagingClient blocking and require the user to directly enter a message loop. This cleans up a few things and ensures we can't get caught out by messages being arbitrarily re-ordered as they pass through any Artemis thread pools.

This commit is contained in:
Mike Hearn
2016-08-12 13:56:21 +02:00
parent ac81d2aa32
commit cba0427e01
10 changed files with 123 additions and 67 deletions

View File

@ -42,18 +42,12 @@ fun main(args: Array<String>) {
try { try {
val dirFile = dir.toFile() val dirFile = dir.toFile()
if (!dirFile.exists()) { if (!dirFile.exists())
dirFile.mkdirs() dirFile.mkdirs()
}
val node = conf.createNode() val node = conf.createNode()
node.start() node.start()
try { node.run()
// TODO create a proper daemon and/or provide some console handler to give interactive commands
while (true) Thread.sleep(Long.MAX_VALUE)
} catch(e: InterruptedException) {
node.stop()
}
} catch (e: Exception) { } catch (e: Exception) {
log.error("Exception during node startup", e) log.error("Exception during node startup", e)
System.exit(1) System.exit(1)

View File

@ -27,6 +27,7 @@ import java.util.*
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import kotlin.concurrent.thread
/** /**
* This file defines a small "Driver" DSL for starting up nodes. * This file defines a small "Driver" DSL for starting up nodes.
@ -292,6 +293,7 @@ class DriverDSL(
startNetworkMapService() startNetworkMapService()
messagingService.configureWithDevSSLCertificate() messagingService.configureWithDevSSLCertificate()
messagingService.start() messagingService.start()
thread { messagingService.run() }
messagingServiceStarted = true messagingServiceStarted = true
// We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from // We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from
// the network map service itself // the network map service itself

View File

@ -8,8 +8,8 @@ import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.ServiceType
import com.r3corda.node.internal.Node import com.r3corda.node.internal.Node
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingClient
import com.r3corda.node.services.config.NodeConfigurationFromConfig import com.r3corda.node.services.config.NodeConfigurationFromConfig
import com.r3corda.node.services.messaging.ArtemisMessagingClient
import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NetworkMapService
import joptsimple.ArgumentAcceptingOptionSpec import joptsimple.ArgumentAcceptingOptionSpec
import joptsimple.OptionParser import joptsimple.OptionParser
@ -64,6 +64,7 @@ class NodeRunner {
log.info("Starting ${nodeConfiguration.myLegalName} with services $services on addresses $messagingAddress and $apiAddress") log.info("Starting ${nodeConfiguration.myLegalName} with services $services on addresses $messagingAddress and $apiAddress")
node.start() node.start()
node.run()
} }
} }
} }

View File

@ -226,6 +226,12 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
return this return this
} }
/** Starts a blocking event loop for message dispatch. */
fun run() {
(net as ArtemisMessagingClient).run()
}
// TODO: Do we really need setup?
override fun setup(): Node { override fun setup(): Node {
super.setup() super.setup()
return this return this
@ -234,6 +240,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
private var shutdown = false private var shutdown = false
override fun stop() { override fun stop() {
check(!serverThread.isOnThread)
synchronized(this) { synchronized(this) {
if (shutdown) return if (shutdown) return
shutdown = true shutdown = true
@ -244,6 +251,9 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
// Terminate the messaging system. This will block until messages that are in-flight have finished being // Terminate the messaging system. This will block until messages that are in-flight have finished being
// processed so it may take a moment. // processed so it may take a moment.
super.stop() super.stop()
// We do another wait here, even though any in-flight messages have been drained away now because the
// server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is
// arbitrary and might be inappropriate.
MoreExecutors.shutdownAndAwaitTermination(serverThread, 50, TimeUnit.SECONDS) MoreExecutors.shutdownAndAwaitTermination(serverThread, 50, TimeUnit.SECONDS)
messageBroker?.stop() messageBroker?.stop()
nodeFileLock!!.release() nodeFileLock!!.release()

View File

@ -9,6 +9,12 @@ import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRefFactory import com.r3corda.core.protocols.ProtocolLogicRefFactory
interface MessagingServiceInternal: MessagingService { interface MessagingServiceInternal: MessagingService {
/**
* Initiates shutdown: if called from a thread that isn't controlled by the executor passed to the constructor
* then this will block until all in-flight messages have finished being handled and acknowledged. If called
* from a thread that's a part of the [AffinityExecutor] given to the constructor, it returns immediately and
* shutdown is asynchronous.
*/
fun stop() fun stop()
} }

View File

@ -17,6 +17,7 @@ import java.nio.file.Path
import java.time.Instant import java.time.Instant
import java.util.* import java.util.*
import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executor import java.util.concurrent.Executor
import javax.annotation.concurrent.ThreadSafe import javax.annotation.concurrent.ThreadSafe
@ -57,6 +58,7 @@ class ArtemisMessagingClient(directory: Path,
} }
private class InnerState { private class InnerState {
var started = false
var running = false var running = false
val producers = HashMap<Address, ClientProducer>() val producers = HashMap<Address, ClientProducer>()
var consumer: ClientConsumer? = null var consumer: ClientConsumer? = null
@ -83,8 +85,8 @@ class ArtemisMessagingClient(directory: Path,
fun start() { fun start() {
state.locked { state.locked {
check(!running) check(!started) { "start can't be called twice" }
running = true started = true
log.info("Connecting to server: $serverHostPort") log.info("Connecting to server: $serverHostPort")
// Connect to our server. // Connect to our server.
@ -99,13 +101,57 @@ class ArtemisMessagingClient(directory: Path,
val address = myHostPort.toString() val address = myHostPort.toString()
val queueName = myHostPort.toString() val queueName = myHostPort.toString()
session.createQueue(address, queueName, false) session.createQueue(address, queueName, false)
consumer = session.createConsumer(queueName).setMessageHandler { artemisMessage: ClientMessage -> consumer = session.createConsumer(queueName)
session.start()
}
}
private var shutdownLatch = CountDownLatch(1)
/** Starts the event loop: this method only returns once [stop] has been called. */
fun run() {
val consumer = state.locked {
check(started)
check(!running) { "run can't be called twice" }
running = true
consumer!!
}
while (true) {
// Two possibilities here:
//
// 1. We block waiting for a message and the consumer is closed in another thread. In this case
// receive returns null and we break out of the loop.
// 2. We receive a message and process it, and stop() is called during delivery. In this case,
// calling receive will throw and we break out of the loop.
//
// It's safe to call into receive simultaneous with other threads calling send on a producer.
val artemisMessage: ClientMessage = try {
consumer.receive()
} catch(e: ActiveMQObjectClosedException) {
null
} ?: break
val message: Message? = artemisToCordaMessage(artemisMessage) val message: Message? = artemisToCordaMessage(artemisMessage)
if (message != null) if (message != null)
deliver(message) deliver(message)
// Ack the message so it won't be redelivered. We should only really do this when there were no
// transient failures. If we caught an exception in the handler, we could back off and retry delivery
// a few times before giving up and redirecting the message to a dead-letter address for admin or
// developer inspection. Artemis has the features to do this for us, we just need to enable them.
//
// TODO: Setup Artemis delayed redelivery and dead letter addresses.
//
// ACKing a message calls back into the session which isn't thread safe, so we have to ensure it
// doesn't collide with a send here. Note that stop() could have been called whilst we were
// processing a message but if so, it'll be parked waiting for us to count down the latch, so
// the session itself is still around and we can still ack messages as a result.
state.locked {
artemisMessage.acknowledge()
} }
session.start()
} }
shutdownLatch.countDown()
} }
private fun artemisToCordaMessage(message: ClientMessage): Message? { private fun artemisToCordaMessage(message: ClientMessage): Message? {
@ -140,6 +186,7 @@ class ArtemisMessagingClient(directory: Path,
} }
private fun deliver(msg: Message): Boolean { private fun deliver(msg: Message): Boolean {
state.checkNotLocked()
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added // Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
// or removed whilst the filter is executing will not affect anything. // or removed whilst the filter is executing will not affect anything.
val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession } val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession }
@ -158,13 +205,14 @@ class ArtemisMessagingClient(directory: Path,
for (handler in deliverTo) { for (handler in deliverTo) {
try { try {
// This will perform a BLOCKING call onto the executor, although we are not actually 'fetching' anything // This will perform a BLOCKING call onto the executor. Thus if the handlers are slow, we will
// from the thread as the callbacks don't return anything. Thus if the handlers are slow, we will be slow, // be slow, and Artemis can handle that case intelligently. We don't just invoke the handler
// and Artemis can handle that case intelligently. We don't just invoke the handler directly in order to // directly in order to ensure that we have the features of the AffinityExecutor class throughout
// ensure that we have the features of the AffinityExecutor class throughout the bulk of the codebase. // the bulk of the codebase and other non-messaging jobs can be scheduled onto the server executor
// easily.
// //
// Note that handlers may re-enter this class. We aren't holding any locks at this point, so that's OK. // Note that handlers may re-enter this class. We aren't holding any locks and methods like
state.checkNotLocked() // start/run/stop have re-entrancy assertions at the top, so it is OK.
executor.fetchFrom { executor.fetchFrom {
handler.callback(msg, handler) handler.callback(msg, handler)
} }
@ -177,21 +225,24 @@ class ArtemisMessagingClient(directory: Path,
} }
override fun stop() { override fun stop() {
state.locked { val running = state.locked {
if (clientFactory == null) // We allow stop() to be called without a run() in between, but it must have at least been started.
return // Was never started to begin with, so just ignore. check(started)
// Setting the message handler to null here will block until there are no more threads running the handler, val c = consumer ?: throw IllegalStateException("stop can't be called twice")
// so once we come back we know we can close the consumer and no more messages are being processed
// anywhere, due to the blocking delivery.
try { try {
consumer?.messageHandler = null c.close()
consumer?.close()
} catch(e: ActiveMQObjectClosedException) { } catch(e: ActiveMQObjectClosedException) {
// Ignore it: this can happen if the server has gone away before we do. // Ignore it: this can happen if the server has gone away before we do.
} }
consumer = null consumer = null
running
}
if (running && !executor.isOnThread) {
// Wait for the main loop to notice the consumer has gone and finish up.
shutdownLatch.await()
}
state.locked {
for (producer in producers.values) producer.close() for (producer in producers.values) producer.close()
producers.clear() producers.clear()

View File

@ -28,6 +28,7 @@ interface AffinityExecutor : Executor {
execute(runnable) execute(runnable)
} }
// TODO: Rename this to executeWithResult
/** /**
* Runs the given function on the executor, blocking until the result is available. Be careful not to deadlock this * Runs the given function on the executor, blocking until the result is available. Be careful not to deadlock this
* way! Make sure the executor can't possibly be waiting for the calling thread. * way! Make sure the executor can't possibly be waiting for the calling thread.

View File

@ -16,9 +16,8 @@ import org.junit.rules.TemporaryFolder
import java.net.ServerSocket import java.net.ServerSocket
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.SECONDS import kotlin.concurrent.thread
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull import kotlin.test.assertNull
class ArtemisMessagingTests { class ArtemisMessagingTests {
@ -66,8 +65,9 @@ class ArtemisMessagingTests {
createMessagingServer(serverAddress).start() createMessagingServer(serverAddress).start()
val messagingClient = createMessagingClient(server = invalidServerAddress) messagingClient = createMessagingClient(server = invalidServerAddress)
assertThatThrownBy { messagingClient.start() } assertThatThrownBy { messagingClient!!.start() }
messagingClient = null
} }
@Test @Test
@ -84,6 +84,7 @@ class ArtemisMessagingTests {
val messagingClient = createMessagingClient() val messagingClient = createMessagingClient()
messagingClient.start() messagingClient.start()
thread { messagingClient.run() }
messagingClient.addMessageHandler(topic) { message, r -> messagingClient.addMessageHandler(topic) { message, r ->
receivedMessages.add(message) receivedMessages.add(message)
@ -92,8 +93,7 @@ class ArtemisMessagingTests {
val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray()) val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray())
messagingClient.send(message, messagingClient.myAddress) messagingClient.send(message, messagingClient.myAddress)
val actual = receivedMessages.poll(2, SECONDS) val actual: Message = receivedMessages.take()
assertNotNull(actual)
assertEquals("first msg", String(actual.data)) assertEquals("first msg", String(actual.data))
assertNull(receivedMessages.poll(200, MILLISECONDS)) assertNull(receivedMessages.poll(200, MILLISECONDS))
} }

View File

@ -336,11 +336,7 @@ private fun runNode(cliParams: CliParams.RunNode): Int {
runUploadRates(cliParams.apiAddress) runUploadRates(cliParams.apiAddress)
} }
try { node.run()
while (true) Thread.sleep(Long.MAX_VALUE)
} catch(e: InterruptedException) {
node.stop()
}
} catch (e: NotSetupException) { } catch (e: NotSetupException) {
log.error(e.message) log.error(e.message)
return 1 return 1
@ -397,7 +393,7 @@ private fun startNode(params: CliParams.RunNode, networkMap: SingleMessageRecipi
} }
val node = logElapsedTime("Node startup", log) { val node = logElapsedTime("Node startup", log) {
Node(params.dir, params.networkAddress, params.apiAddress, config, networkMapId, advertisedServices, DemoClock()).start() Node(params.dir, params.networkAddress, params.apiAddress, config, networkMapId, advertisedServices, DemoClock()).setup().start()
} }
return node return node

View File

@ -2,22 +2,20 @@ package com.r3corda.demos
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.contracts.CommercialPaper import com.r3corda.contracts.CommercialPaper
import com.r3corda.contracts.asset.DUMMY_CASH_ISSUER import com.r3corda.contracts.asset.DUMMY_CASH_ISSUER
import com.r3corda.contracts.asset.cashBalances import com.r3corda.contracts.asset.cashBalances
import com.r3corda.contracts.testing.fillWithSomeTestCash import com.r3corda.contracts.testing.fillWithSomeTestCash
import com.r3corda.core.*
import com.r3corda.core.contracts.* import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.crypto.generateKeyPair import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.days
import com.r3corda.core.logElapsedTime
import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue
import com.r3corda.core.seconds
import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.deserialize
import com.r3corda.core.utilities.Emoji import com.r3corda.core.utilities.Emoji
import com.r3corda.core.utilities.LogHelper import com.r3corda.core.utilities.LogHelper
@ -40,7 +38,7 @@ import java.nio.file.Paths
import java.security.PublicKey import java.security.PublicKey
import java.time.Instant import java.time.Instant
import java.util.* import java.util.*
import java.util.concurrent.CountDownLatch import kotlin.concurrent.thread
import kotlin.system.exitProcess import kotlin.system.exitProcess
import kotlin.test.assertEquals import kotlin.test.assertEquals
@ -73,10 +71,6 @@ val DEFAULT_BASE_DIRECTORY = "./build/trader-demo"
private val log: Logger = LoggerFactory.getLogger("TraderDemo") private val log: Logger = LoggerFactory.getLogger("TraderDemo")
fun main(args: Array<String>) { fun main(args: Array<String>) {
exitProcess(runTraderDemo(args))
}
fun runTraderDemo(args: Array<String>): Int {
val parser = OptionParser() val parser = OptionParser()
val roleArg = parser.accepts("role").withRequiredArg().ofType(Role::class.java).required() val roleArg = parser.accepts("role").withRequiredArg().ofType(Role::class.java).required()
@ -90,7 +84,7 @@ fun runTraderDemo(args: Array<String>): Int {
} catch (e: Exception) { } catch (e: Exception) {
log.error(e.message) log.error(e.message)
printHelp(parser) printHelp(parser)
return 1 exitProcess(1)
} }
val role = options.valueOf(roleArg)!! val role = options.valueOf(roleArg)!!
@ -160,12 +154,13 @@ fun runTraderDemo(args: Array<String>): Int {
if (role == Role.BUYER) { if (role == Role.BUYER) {
runBuyer(node, amount) runBuyer(node, amount)
} else { } else {
node.networkMapRegistrationFuture.get() node.networkMapRegistrationFuture.success {
val party = node.netMapCache.getNodeByLegalName("Bank A")?.identity ?: throw IllegalStateException("Cannot find other node?!") val party = node.netMapCache.getNodeByLegalName("Bank A")?.identity ?: throw IllegalStateException("Cannot find other node?!")
runSeller(node, amount, party) runSeller(node, amount, party)
} }
}
return 0 node.run()
} }
private fun runSeller(node: Node, amount: Amount<Currency>, otherSide: Party) { private fun runSeller(node: Node, amount: Amount<Currency>, otherSide: Party) {
@ -182,19 +177,21 @@ private fun runSeller(node: Node, amount: Amount<Currency>, otherSide: Party) {
} }
} }
var tradeTX: SignedTransaction? = null val tradeTX: ListenableFuture<SignedTransaction>
if (node.isPreviousCheckpointsPresent) { if (node.isPreviousCheckpointsPresent) {
node.smm.findStateMachines(TraderDemoProtocolSeller::class.java).forEach { tradeTX = node.smm.findStateMachines(TraderDemoProtocolSeller::class.java).single().second
tradeTX = it.second.get()
}
} else { } else {
val seller = TraderDemoProtocolSeller(otherSide, amount) val seller = TraderDemoProtocolSeller(otherSide, amount)
tradeTX = node.smm.add("demo.seller", seller).get() tradeTX = node.smm.add("demo.seller", seller)
} }
println("Sale completed - we have a happy customer!\n\nFinal transaction is:\n\n${Emoji.renderIfSupported(tradeTX!!.tx)}")
tradeTX.success {
println("Sale completed - we have a happy customer!\n\nFinal transaction is:\n\n${Emoji.renderIfSupported(it.tx)}")
thread {
node.stop() node.stop()
} }
}
}
private fun runBuyer(node: Node, amount: Amount<Currency>) { private fun runBuyer(node: Node, amount: Amount<Currency>) {
// Buyer will fetch the attachment from the seller automatically when it resolves the transaction. // Buyer will fetch the attachment from the seller automatically when it resolves the transaction.
@ -222,13 +219,11 @@ private fun runBuyer(node: Node, amount: Amount<Currency>) {
val buyer = TraderDemoProtocolBuyer(otherSide, attachmentsPath, amount) val buyer = TraderDemoProtocolBuyer(otherSide, attachmentsPath, amount)
node.smm.add("demo.buyer", buyer) node.smm.add("demo.buyer", buyer)
} }
CountDownLatch(1).await() // Prevent the application from terminating
} }
// We create a couple of ad-hoc test protocols that wrap the two party trade protocol, to give us the demo logic. // We create a couple of ad-hoc test protocols that wrap the two party trade protocol, to give us the demo logic.
val DEMO_TOPIC = "initiate.demo.trade" private val DEMO_TOPIC = "initiate.demo.trade"
private class TraderDemoProtocolBuyer(val otherSide: Party, private class TraderDemoProtocolBuyer(val otherSide: Party,
private val attachmentsPath: Path, private val attachmentsPath: Path,