mirror of
https://github.com/corda/corda.git
synced 2025-05-02 08:43:15 +00:00
Messaging layer improvements:
- Fix thread safety issues in ArtemisMessagingClient. The Artemis API isn't thread safe, but that isn't well documented and it will happily invoke callbacks in parallel. - Add discussion of how we tackle threading currently in the codebase and make a few other improvements. - Add a shutdown hook so we stop properly when the user presses ctrl-c
This commit is contained in:
parent
493f7f1fd1
commit
ac81d2aa32
@ -159,6 +159,7 @@ class ThreadBox<out T>(val content: T, val lock: ReentrantLock = ReentrantLock()
|
|||||||
check(lock.isHeldByCurrentThread, { "Expected $lock to already be locked." })
|
check(lock.isHeldByCurrentThread, { "Expected $lock to already be locked." })
|
||||||
return body(content)
|
return body(content)
|
||||||
}
|
}
|
||||||
|
fun checkNotLocked() = check(!lock.isHeldByCurrentThread)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -8,10 +8,11 @@ import com.r3corda.core.node.NodeInfo
|
|||||||
import com.r3corda.core.node.services.NetworkMapCache
|
import com.r3corda.core.node.services.NetworkMapCache
|
||||||
import com.r3corda.core.node.services.ServiceType
|
import com.r3corda.core.node.services.ServiceType
|
||||||
import com.r3corda.node.services.config.NodeConfiguration
|
import com.r3corda.node.services.config.NodeConfiguration
|
||||||
import com.r3corda.node.services.messaging.ArtemisMessagingClient
|
|
||||||
import com.r3corda.node.services.config.NodeConfigurationFromConfig
|
import com.r3corda.node.services.config.NodeConfigurationFromConfig
|
||||||
|
import com.r3corda.node.services.messaging.ArtemisMessagingClient
|
||||||
import com.r3corda.node.services.network.InMemoryNetworkMapCache
|
import com.r3corda.node.services.network.InMemoryNetworkMapCache
|
||||||
import com.r3corda.node.services.network.NetworkMapService
|
import com.r3corda.node.services.network.NetworkMapService
|
||||||
|
import com.r3corda.node.utilities.AffinityExecutor
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
import com.typesafe.config.ConfigRenderOptions
|
import com.typesafe.config.ConfigRenderOptions
|
||||||
import org.slf4j.Logger
|
import org.slf4j.Logger
|
||||||
@ -207,7 +208,8 @@ class DriverDSL(
|
|||||||
Paths.get(baseDirectory, "driver-artemis"),
|
Paths.get(baseDirectory, "driver-artemis"),
|
||||||
driverNodeConfiguration,
|
driverNodeConfiguration,
|
||||||
serverHostPort = networkMapAddress,
|
serverHostPort = networkMapAddress,
|
||||||
myHostPort = portAllocation.nextHostAndPort()
|
myHostPort = portAllocation.nextHostAndPort(),
|
||||||
|
executor = AffinityExecutor.ServiceAffinityExecutor("Client thread", 1)
|
||||||
)
|
)
|
||||||
var messagingServiceStarted = false
|
var messagingServiceStarted = false
|
||||||
|
|
||||||
@ -220,9 +222,7 @@ class DriverDSL(
|
|||||||
}
|
}
|
||||||
|
|
||||||
override fun shutdown() {
|
override fun shutdown() {
|
||||||
registeredProcesses.forEach {
|
registeredProcesses.forEach(Process::destroy)
|
||||||
it.destroy()
|
|
||||||
}
|
|
||||||
/** Wait 5 seconds, then [Process.destroyForcibly] */
|
/** Wait 5 seconds, then [Process.destroyForcibly] */
|
||||||
val finishedFuture = Executors.newSingleThreadExecutor().submit {
|
val finishedFuture = Executors.newSingleThreadExecutor().submit {
|
||||||
waitForAllNodesToFinish()
|
waitForAllNodesToFinish()
|
||||||
@ -235,9 +235,8 @@ class DriverDSL(
|
|||||||
it.destroyForcibly()
|
it.destroyForcibly()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (messagingServiceStarted){
|
if (messagingServiceStarted)
|
||||||
messagingService.stop()
|
messagingService.stop()
|
||||||
}
|
|
||||||
|
|
||||||
// Check that we shut down properly
|
// Check that we shut down properly
|
||||||
addressMustNotBeBound(messagingService.myHostPort)
|
addressMustNotBeBound(messagingService.myHostPort)
|
||||||
@ -361,7 +360,7 @@ class DriverDSL(
|
|||||||
): Process {
|
): Process {
|
||||||
|
|
||||||
// Write node.conf
|
// Write node.conf
|
||||||
writeConfig("${cliParams.baseDirectory}", "node.conf", config)
|
writeConfig(cliParams.baseDirectory, "node.conf", config)
|
||||||
|
|
||||||
val className = NodeRunner::class.java.canonicalName
|
val className = NodeRunner::class.java.canonicalName
|
||||||
val separator = System.getProperty("file.separator")
|
val separator = System.getProperty("file.separator")
|
||||||
|
@ -2,6 +2,7 @@ package com.r3corda.node.internal
|
|||||||
|
|
||||||
import com.codahale.metrics.JmxReporter
|
import com.codahale.metrics.JmxReporter
|
||||||
import com.google.common.net.HostAndPort
|
import com.google.common.net.HostAndPort
|
||||||
|
import com.google.common.util.concurrent.MoreExecutors
|
||||||
import com.r3corda.core.node.NodeInfo
|
import com.r3corda.core.node.NodeInfo
|
||||||
import com.r3corda.core.node.ServiceHub
|
import com.r3corda.core.node.ServiceHub
|
||||||
import com.r3corda.core.node.services.ServiceType
|
import com.r3corda.core.node.services.ServiceType
|
||||||
@ -31,12 +32,12 @@ import java.net.InetSocketAddress
|
|||||||
import java.nio.channels.FileLock
|
import java.nio.channels.FileLock
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
import java.time.Clock
|
import java.time.Clock
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
import javax.management.ObjectName
|
import javax.management.ObjectName
|
||||||
|
import kotlin.concurrent.thread
|
||||||
|
|
||||||
class ConfigurationException(message: String) : Exception(message)
|
class ConfigurationException(message: String) : Exception(message)
|
||||||
|
|
||||||
// TODO: Split this into a regression testing environment
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Node manages a standalone server that takes part in the P2P network. It creates the services found in [ServiceHub],
|
* A Node manages a standalone server that takes part in the P2P network. It creates the services found in [ServiceHub],
|
||||||
* loads important data off disk and starts listening for connections.
|
* loads important data off disk and starts listening for connections.
|
||||||
@ -63,6 +64,43 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
|
|||||||
|
|
||||||
override val log = loggerFor<Node>()
|
override val log = loggerFor<Node>()
|
||||||
|
|
||||||
|
// DISCUSSION
|
||||||
|
//
|
||||||
|
// We use a single server thread for now, which means all message handling is serialized.
|
||||||
|
//
|
||||||
|
// Writing thread safe code is hard. In this project we are writing most node services and code to be thread safe, but
|
||||||
|
// the possibility of mistakes is always present. Thus we make a deliberate decision here to trade off some multi-core
|
||||||
|
// scalability in order to gain developer productivity by setting the size of the serverThread pool to one, which will
|
||||||
|
// reduce the number of threading bugs we will need to tackle.
|
||||||
|
//
|
||||||
|
// This leaves us with four possibilities in future:
|
||||||
|
//
|
||||||
|
// (1) We discover that processing messages is fast and that our eventual use cases do not need very high
|
||||||
|
// processing rates. We have benefited from the higher productivity and not lost anything.
|
||||||
|
//
|
||||||
|
// (2) We discover that we need greater multi-core scalability, but that the bulk of our time goes into particular CPU
|
||||||
|
// hotspots that are easily multi-threaded e.g. signature checking. We successfully multi-thread those hotspots
|
||||||
|
// and find that our software now scales sufficiently well to satisfy our user's needs.
|
||||||
|
//
|
||||||
|
// (3) We discover that it wasn't enough, but that we only need to run some messages in parallel and that the bulk of
|
||||||
|
// the work can stay single threaded. For example perhaps we find that latency sensitive UI requests must be handled
|
||||||
|
// on a separate thread pool where long blocking operations are not allowed, but that the bulk of the heavy lifting
|
||||||
|
// can stay single threaded. In this case we would need a separate thread pool, but we still minimise the amount of
|
||||||
|
// thread safe code we need to write and test.
|
||||||
|
//
|
||||||
|
// (4) None of the above are sufficient and we need to run all messages in parallel to get maximum (single machine)
|
||||||
|
// scalability and fully saturate all cores. In that case we can go fully free-threaded, e.g. change the number '1'
|
||||||
|
// below to some multiple of the core count. Alternatively by using the ForkJoinPool and let it figure out the right
|
||||||
|
// number of threads by itself. This will require some investment in stress testing to build confidence that we
|
||||||
|
// haven't made any mistakes, but it will only be necessary if eventual deployment scenarios demand it.
|
||||||
|
//
|
||||||
|
// Note that the messaging subsystem schedules work onto this thread in a blocking manner. That means if the server
|
||||||
|
// thread becomes too slow and a backlog of work starts to builds up it propagates back through into the messaging
|
||||||
|
// layer, which can then react to the backpressure. Artemis MQ in particular knows how to do flow control by paging
|
||||||
|
// messages to disk rather than letting us run out of RAM.
|
||||||
|
//
|
||||||
|
// The primary work done by the server thread is execution of protocol logics, and related
|
||||||
|
// serialisation/deserialisation work.
|
||||||
override val serverThread = AffinityExecutor.ServiceAffinityExecutor("Node thread", 1)
|
override val serverThread = AffinityExecutor.ServiceAffinityExecutor("Node thread", 1)
|
||||||
|
|
||||||
lateinit var webServer: Server
|
lateinit var webServer: Server
|
||||||
@ -180,6 +218,11 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
|
|||||||
}.
|
}.
|
||||||
build().
|
build().
|
||||||
start()
|
start()
|
||||||
|
|
||||||
|
Runtime.getRuntime().addShutdownHook(thread(start = false) {
|
||||||
|
stop()
|
||||||
|
})
|
||||||
|
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -188,12 +231,23 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
|
|||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private var shutdown = false
|
||||||
|
|
||||||
override fun stop() {
|
override fun stop() {
|
||||||
|
synchronized(this) {
|
||||||
|
if (shutdown) return
|
||||||
|
shutdown = true
|
||||||
|
}
|
||||||
|
log.info("Shutting down ...")
|
||||||
|
// Shut down the web server.
|
||||||
webServer.stop()
|
webServer.stop()
|
||||||
|
// Terminate the messaging system. This will block until messages that are in-flight have finished being
|
||||||
|
// processed so it may take a moment.
|
||||||
super.stop()
|
super.stop()
|
||||||
|
MoreExecutors.shutdownAndAwaitTermination(serverThread, 50, TimeUnit.SECONDS)
|
||||||
messageBroker?.stop()
|
messageBroker?.stop()
|
||||||
nodeFileLock!!.release()
|
nodeFileLock!!.release()
|
||||||
serverThread.shutdownNow()
|
log.info("Shutdown complete")
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun alreadyRunningNodeCheck() {
|
private fun alreadyRunningNodeCheck() {
|
||||||
|
@ -1,13 +1,15 @@
|
|||||||
package com.r3corda.node.services.messaging
|
package com.r3corda.node.services.messaging
|
||||||
|
|
||||||
import com.google.common.net.HostAndPort
|
import com.google.common.net.HostAndPort
|
||||||
import com.r3corda.core.RunOnCallerThread
|
|
||||||
import com.r3corda.core.ThreadBox
|
import com.r3corda.core.ThreadBox
|
||||||
import com.r3corda.core.messaging.*
|
import com.r3corda.core.messaging.*
|
||||||
|
import com.r3corda.core.serialization.opaque
|
||||||
import com.r3corda.core.utilities.loggerFor
|
import com.r3corda.core.utilities.loggerFor
|
||||||
import com.r3corda.node.internal.Node
|
import com.r3corda.node.internal.Node
|
||||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||||
import com.r3corda.node.services.config.NodeConfiguration
|
import com.r3corda.node.services.config.NodeConfiguration
|
||||||
|
import com.r3corda.node.utilities.AffinityExecutor
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.apache.activemq.artemis.api.core.client.*
|
import org.apache.activemq.artemis.api.core.client.*
|
||||||
import java.nio.file.FileSystems
|
import java.nio.file.FileSystems
|
||||||
@ -20,18 +22,22 @@ import javax.annotation.concurrent.ThreadSafe
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* This class implements the [MessagingService] API using Apache Artemis, the successor to their ActiveMQ product.
|
* This class implements the [MessagingService] API using Apache Artemis, the successor to their ActiveMQ product.
|
||||||
* Artemis is a message queue broker and here we run a client connecting to the specified broker instance [ArtemisMessagingServer]
|
* Artemis is a message queue broker and here we run a client connecting to the specified broker instance
|
||||||
|
* [ArtemisMessagingServer].
|
||||||
|
*
|
||||||
|
* Message handlers are run on the provided [AffinityExecutor] synchronously, that is, the Artemis callback threads
|
||||||
|
* are blocked until the handler is scheduled and completed. This allows backpressure to propagate from the given executor
|
||||||
|
* through into Artemis and from there, back through to senders.
|
||||||
*
|
*
|
||||||
* @param serverHostPort The address of the broker instance to connect to (might be running in the same process)
|
* @param serverHostPort The address of the broker instance to connect to (might be running in the same process)
|
||||||
* @param myHostPort What host and port to use as an address for incoming messages
|
* @param myHostPort What host and port to use as an address for incoming messages
|
||||||
* @param defaultExecutor This will be used as the default executor to run message handlers on, if no other is specified.
|
|
||||||
*/
|
*/
|
||||||
@ThreadSafe
|
@ThreadSafe
|
||||||
class ArtemisMessagingClient(directory: Path,
|
class ArtemisMessagingClient(directory: Path,
|
||||||
config: NodeConfiguration,
|
config: NodeConfiguration,
|
||||||
val serverHostPort: HostAndPort,
|
val serverHostPort: HostAndPort,
|
||||||
val myHostPort: HostAndPort,
|
val myHostPort: HostAndPort,
|
||||||
val defaultExecutor: Executor = RunOnCallerThread) : ArtemisMessagingComponent(directory, config), MessagingServiceInternal {
|
val executor: AffinityExecutor) : ArtemisMessagingComponent(directory, config), MessagingServiceInternal {
|
||||||
companion object {
|
companion object {
|
||||||
val log = loggerFor<ArtemisMessagingClient>()
|
val log = loggerFor<ArtemisMessagingClient>()
|
||||||
|
|
||||||
@ -53,6 +59,9 @@ class ArtemisMessagingClient(directory: Path,
|
|||||||
private class InnerState {
|
private class InnerState {
|
||||||
var running = false
|
var running = false
|
||||||
val producers = HashMap<Address, ClientProducer>()
|
val producers = HashMap<Address, ClientProducer>()
|
||||||
|
var consumer: ClientConsumer? = null
|
||||||
|
var session: ClientSession? = null
|
||||||
|
var clientFactory: ClientSessionFactory? = null
|
||||||
}
|
}
|
||||||
|
|
||||||
/** A registration to handle messages of different types */
|
/** A registration to handle messages of different types */
|
||||||
@ -62,14 +71,9 @@ class ArtemisMessagingClient(directory: Path,
|
|||||||
|
|
||||||
override val myAddress: SingleMessageRecipient = Address(myHostPort)
|
override val myAddress: SingleMessageRecipient = Address(myHostPort)
|
||||||
|
|
||||||
private val mutex = ThreadBox(InnerState())
|
private val state = ThreadBox(InnerState())
|
||||||
private val handlers = CopyOnWriteArrayList<Handler>()
|
private val handlers = CopyOnWriteArrayList<Handler>()
|
||||||
|
|
||||||
private var serverLocator: ServerLocator? = null
|
|
||||||
private var clientFactory: ClientSessionFactory? = null
|
|
||||||
private var session: ClientSession? = null
|
|
||||||
private var consumer: ClientConsumer? = null
|
|
||||||
|
|
||||||
// TODO: This is not robust and needs to be replaced by more intelligently using the message queue server.
|
// TODO: This is not robust and needs to be replaced by more intelligently using the message queue server.
|
||||||
private val undeliveredMessages = CopyOnWriteArrayList<Message>()
|
private val undeliveredMessages = CopyOnWriteArrayList<Message>()
|
||||||
|
|
||||||
@ -77,43 +81,42 @@ class ArtemisMessagingClient(directory: Path,
|
|||||||
require(directory.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
|
require(directory.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
|
||||||
}
|
}
|
||||||
|
|
||||||
fun start() = mutex.locked {
|
fun start() {
|
||||||
if (!running) {
|
state.locked {
|
||||||
configureAndStartClient()
|
check(!running)
|
||||||
running = true
|
running = true
|
||||||
|
|
||||||
|
log.info("Connecting to server: $serverHostPort")
|
||||||
|
// Connect to our server.
|
||||||
|
val tcpTransport = tcpTransport(ConnectionDirection.OUTBOUND, serverHostPort.hostText, serverHostPort.port)
|
||||||
|
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport)
|
||||||
|
clientFactory = locator.createSessionFactory()
|
||||||
|
|
||||||
|
// Create a queue on which to receive messages and set up the handler.
|
||||||
|
val session = clientFactory!!.createSession()
|
||||||
|
this.session = session
|
||||||
|
|
||||||
|
val address = myHostPort.toString()
|
||||||
|
val queueName = myHostPort.toString()
|
||||||
|
session.createQueue(address, queueName, false)
|
||||||
|
consumer = session.createConsumer(queueName).setMessageHandler { artemisMessage: ClientMessage ->
|
||||||
|
val message: Message? = artemisToCordaMessage(artemisMessage)
|
||||||
|
if (message != null)
|
||||||
|
deliver(message)
|
||||||
|
}
|
||||||
|
session.start()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun configureAndStartClient() {
|
private fun artemisToCordaMessage(message: ClientMessage): Message? {
|
||||||
log.info("Connecting to server: $serverHostPort")
|
|
||||||
// Connect to our server.
|
|
||||||
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(
|
|
||||||
tcpTransport(ConnectionDirection.OUTBOUND, serverHostPort.hostText, serverHostPort.port))
|
|
||||||
this.serverLocator = serverLocator
|
|
||||||
val clientFactory = serverLocator.createSessionFactory()
|
|
||||||
this.clientFactory = clientFactory
|
|
||||||
|
|
||||||
// Create a queue on which to receive messages and set up the handler.
|
|
||||||
val session = clientFactory.createSession()
|
|
||||||
this.session = session
|
|
||||||
|
|
||||||
val address = myHostPort.toString()
|
|
||||||
val queueName = myHostPort.toString()
|
|
||||||
session.createQueue(address, queueName, false)
|
|
||||||
consumer = session.createConsumer(queueName).setMessageHandler { message: ClientMessage -> handleIncomingMessage(message) }
|
|
||||||
session.start()
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun handleIncomingMessage(message: ClientMessage) {
|
|
||||||
// This code runs for every inbound message.
|
|
||||||
try {
|
try {
|
||||||
if (!message.containsProperty(TOPIC_PROPERTY)) {
|
if (!message.containsProperty(TOPIC_PROPERTY)) {
|
||||||
log.warn("Received message without a $TOPIC_PROPERTY property, ignoring")
|
log.warn("Received message without a $TOPIC_PROPERTY property, ignoring")
|
||||||
return
|
return null
|
||||||
}
|
}
|
||||||
if (!message.containsProperty(SESSION_ID_PROPERTY)) {
|
if (!message.containsProperty(SESSION_ID_PROPERTY)) {
|
||||||
log.warn("Received message without a $SESSION_ID_PROPERTY property, ignoring")
|
log.warn("Received message without a $SESSION_ID_PROPERTY property, ignoring")
|
||||||
return
|
return null
|
||||||
}
|
}
|
||||||
val topic = message.getStringProperty(TOPIC_PROPERTY)
|
val topic = message.getStringProperty(TOPIC_PROPERTY)
|
||||||
val sessionID = message.getLongProperty(SESSION_ID_PROPERTY)
|
val sessionID = message.getLongProperty(SESSION_ID_PROPERTY)
|
||||||
@ -126,18 +129,17 @@ class ArtemisMessagingClient(directory: Path,
|
|||||||
override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp)
|
override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp)
|
||||||
override val debugMessageID: String = message.messageID.toString()
|
override val debugMessageID: String = message.messageID.toString()
|
||||||
override fun serialise(): ByteArray = body
|
override fun serialise(): ByteArray = body
|
||||||
override fun toString() = topic + "#" + String(data)
|
override fun toString() = topic + "#" + data.opaque()
|
||||||
}
|
}
|
||||||
|
|
||||||
deliverMessage(msg)
|
return msg
|
||||||
} finally {
|
} catch (e: Exception) {
|
||||||
// TODO the message is delivered onto an executor and so we may be acking the message before we've
|
log.error("Internal error whilst reading MQ message", e)
|
||||||
// finished processing it
|
return null
|
||||||
message.acknowledge()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun deliverMessage(msg: Message): Boolean {
|
private fun deliver(msg: Message): Boolean {
|
||||||
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
|
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
|
||||||
// or removed whilst the filter is executing will not affect anything.
|
// or removed whilst the filter is executing will not affect anything.
|
||||||
val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession }
|
val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession }
|
||||||
@ -155,57 +157,78 @@ class ArtemisMessagingClient(directory: Path,
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (handler in deliverTo) {
|
for (handler in deliverTo) {
|
||||||
(handler.executor ?: defaultExecutor).execute {
|
try {
|
||||||
try {
|
// This will perform a BLOCKING call onto the executor, although we are not actually 'fetching' anything
|
||||||
|
// from the thread as the callbacks don't return anything. Thus if the handlers are slow, we will be slow,
|
||||||
|
// and Artemis can handle that case intelligently. We don't just invoke the handler directly in order to
|
||||||
|
// ensure that we have the features of the AffinityExecutor class throughout the bulk of the codebase.
|
||||||
|
//
|
||||||
|
// Note that handlers may re-enter this class. We aren't holding any locks at this point, so that's OK.
|
||||||
|
state.checkNotLocked()
|
||||||
|
executor.fetchFrom {
|
||||||
handler.callback(msg, handler)
|
handler.callback(msg, handler)
|
||||||
} catch(e: Exception) {
|
|
||||||
log.error("Caught exception whilst executing message handler for ${msg.topicSession}", e)
|
|
||||||
}
|
}
|
||||||
|
} catch(e: Exception) {
|
||||||
|
log.error("Caught exception whilst executing message handler for ${msg.topicSession}", e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun stop() = mutex.locked {
|
override fun stop() {
|
||||||
for (producer in producers.values) producer.close()
|
state.locked {
|
||||||
producers.clear()
|
if (clientFactory == null)
|
||||||
consumer?.close()
|
return // Was never started to begin with, so just ignore.
|
||||||
session?.close()
|
|
||||||
clientFactory?.close()
|
// Setting the message handler to null here will block until there are no more threads running the handler,
|
||||||
serverLocator?.close()
|
// so once we come back we know we can close the consumer and no more messages are being processed
|
||||||
// We expect to be garbage collected shortly after being stopped, so we don't null anything explicitly here.
|
// anywhere, due to the blocking delivery.
|
||||||
running = false
|
try {
|
||||||
|
consumer?.messageHandler = null
|
||||||
|
consumer?.close()
|
||||||
|
} catch(e: ActiveMQObjectClosedException) {
|
||||||
|
// Ignore it: this can happen if the server has gone away before we do.
|
||||||
|
}
|
||||||
|
consumer = null
|
||||||
|
|
||||||
|
for (producer in producers.values) producer.close()
|
||||||
|
producers.clear()
|
||||||
|
|
||||||
|
// Closing the factory closes all the sessions it produced as well.
|
||||||
|
clientFactory!!.close()
|
||||||
|
clientFactory = null
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun send(message: Message, target: MessageRecipients) {
|
override fun send(message: Message, target: MessageRecipients) {
|
||||||
if (target !is Address)
|
if (target !is Address)
|
||||||
TODO("Only simple sends to single recipients are currently implemented")
|
TODO("Only simple sends to single recipients are currently implemented")
|
||||||
val artemisMessage = session!!.createMessage(true).apply {
|
|
||||||
val sessionID = message.topicSession.sessionID
|
|
||||||
putStringProperty(TOPIC_PROPERTY, message.topicSession.topic)
|
|
||||||
putLongProperty(SESSION_ID_PROPERTY, sessionID)
|
|
||||||
writeBodyBufferBytes(message.data)
|
|
||||||
}
|
|
||||||
getProducerForAddress(target).send(artemisMessage)
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun getProducerForAddress(address: Address): ClientProducer {
|
state.locked {
|
||||||
return mutex.locked {
|
val artemisMessage = session!!.createMessage(true).apply {
|
||||||
producers.getOrPut(address) {
|
val sessionID = message.topicSession.sessionID
|
||||||
if (address != myAddress) {
|
putStringProperty(TOPIC_PROPERTY, message.topicSession.topic)
|
||||||
maybeCreateQueue(address.hostAndPort)
|
putLongProperty(SESSION_ID_PROPERTY, sessionID)
|
||||||
}
|
writeBodyBufferBytes(message.data)
|
||||||
session!!.createProducer(address.hostAndPort.toString())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val producer = producers.getOrPut(target) {
|
||||||
|
if (target != myAddress)
|
||||||
|
maybeCreateQueue(target.hostAndPort)
|
||||||
|
session!!.createProducer(target.hostAndPort.toString())
|
||||||
|
}
|
||||||
|
producer.send(artemisMessage)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun maybeCreateQueue(hostAndPort: HostAndPort) {
|
private fun maybeCreateQueue(hostAndPort: HostAndPort) {
|
||||||
val name = hostAndPort.toString()
|
state.alreadyLocked {
|
||||||
val queueQuery = session!!.queueQuery(SimpleString(name))
|
val name = hostAndPort.toString()
|
||||||
if (!queueQuery.isExists) {
|
val queueQuery = session!!.queueQuery(SimpleString(name))
|
||||||
session!!.createQueue(name, name, true /* durable */)
|
if (!queueQuery.isExists) {
|
||||||
|
session!!.createQueue(name, name, true /* durable */)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -219,7 +242,7 @@ class ArtemisMessagingClient(directory: Path,
|
|||||||
require(!topicSession.isBlank()) { "Topic must not be blank, as the empty topic is a special case." }
|
require(!topicSession.isBlank()) { "Topic must not be blank, as the empty topic is a special case." }
|
||||||
val handler = Handler(executor, topicSession, callback)
|
val handler = Handler(executor, topicSession, callback)
|
||||||
handlers.add(handler)
|
handlers.add(handler)
|
||||||
undeliveredMessages.removeIf { deliverMessage(it) }
|
undeliveredMessages.removeIf { deliver(it) }
|
||||||
return handler
|
return handler
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,6 +7,7 @@ import com.r3corda.core.testing.freeLocalHostAndPort
|
|||||||
import com.r3corda.node.services.config.NodeConfiguration
|
import com.r3corda.node.services.config.NodeConfiguration
|
||||||
import com.r3corda.node.services.messaging.ArtemisMessagingClient
|
import com.r3corda.node.services.messaging.ArtemisMessagingClient
|
||||||
import com.r3corda.node.services.messaging.ArtemisMessagingServer
|
import com.r3corda.node.services.messaging.ArtemisMessagingServer
|
||||||
|
import com.r3corda.node.utilities.AffinityExecutor
|
||||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||||
import org.junit.After
|
import org.junit.After
|
||||||
import org.junit.Rule
|
import org.junit.Rule
|
||||||
@ -99,7 +100,7 @@ class ArtemisMessagingTests {
|
|||||||
|
|
||||||
private fun createMessagingClient(server: HostAndPort = hostAndPort,
|
private fun createMessagingClient(server: HostAndPort = hostAndPort,
|
||||||
local: HostAndPort = hostAndPort): ArtemisMessagingClient {
|
local: HostAndPort = hostAndPort): ArtemisMessagingClient {
|
||||||
return ArtemisMessagingClient(temporaryFolder.newFolder().toPath(), config, server, local).apply {
|
return ArtemisMessagingClient(temporaryFolder.newFolder().toPath(), config, server, local, AffinityExecutor.SAME_THREAD).apply {
|
||||||
configureWithDevSSLCertificate()
|
configureWithDevSSLCertificate()
|
||||||
messagingClient = this
|
messagingClient = this
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user