mirror of
https://github.com/corda/corda.git
synced 2025-01-29 15:43:55 +00:00
Merged in mike-fix-artemis-threading (pull request #276)
Messaging layer improvements
This commit is contained in:
commit
a451000623
@ -160,6 +160,7 @@ class ThreadBox<out T>(val content: T, val lock: ReentrantLock = ReentrantLock()
|
||||
check(lock.isHeldByCurrentThread, { "Expected $lock to already be locked." })
|
||||
return body(content)
|
||||
}
|
||||
fun checkNotLocked() = check(!lock.isHeldByCurrentThread)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -42,18 +42,12 @@ fun main(args: Array<String>) {
|
||||
|
||||
try {
|
||||
val dirFile = dir.toFile()
|
||||
if (!dirFile.exists()) {
|
||||
if (!dirFile.exists())
|
||||
dirFile.mkdirs()
|
||||
}
|
||||
|
||||
val node = conf.createNode()
|
||||
node.start()
|
||||
try {
|
||||
// TODO create a proper daemon and/or provide some console handler to give interactive commands
|
||||
while (true) Thread.sleep(Long.MAX_VALUE)
|
||||
} catch(e: InterruptedException) {
|
||||
node.stop()
|
||||
}
|
||||
node.run()
|
||||
} catch (e: Exception) {
|
||||
log.error("Exception during node startup", e)
|
||||
System.exit(1)
|
||||
|
@ -8,10 +8,11 @@ import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.NetworkMapCache
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingClient
|
||||
import com.r3corda.node.services.config.NodeConfigurationFromConfig
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingClient
|
||||
import com.r3corda.node.services.network.InMemoryNetworkMapCache
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigRenderOptions
|
||||
import org.slf4j.Logger
|
||||
@ -26,6 +27,7 @@ import java.util.*
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.TimeoutException
|
||||
import kotlin.concurrent.thread
|
||||
|
||||
/**
|
||||
* This file defines a small "Driver" DSL for starting up nodes.
|
||||
@ -207,7 +209,8 @@ class DriverDSL(
|
||||
Paths.get(baseDirectory, "driver-artemis"),
|
||||
driverNodeConfiguration,
|
||||
serverHostPort = networkMapAddress,
|
||||
myHostPort = portAllocation.nextHostAndPort()
|
||||
myHostPort = portAllocation.nextHostAndPort(),
|
||||
executor = AffinityExecutor.ServiceAffinityExecutor("Client thread", 1)
|
||||
)
|
||||
var messagingServiceStarted = false
|
||||
|
||||
@ -220,9 +223,7 @@ class DriverDSL(
|
||||
}
|
||||
|
||||
override fun shutdown() {
|
||||
registeredProcesses.forEach {
|
||||
it.destroy()
|
||||
}
|
||||
registeredProcesses.forEach(Process::destroy)
|
||||
/** Wait 5 seconds, then [Process.destroyForcibly] */
|
||||
val finishedFuture = Executors.newSingleThreadExecutor().submit {
|
||||
waitForAllNodesToFinish()
|
||||
@ -235,9 +236,8 @@ class DriverDSL(
|
||||
it.destroyForcibly()
|
||||
}
|
||||
}
|
||||
if (messagingServiceStarted){
|
||||
if (messagingServiceStarted)
|
||||
messagingService.stop()
|
||||
}
|
||||
|
||||
// Check that we shut down properly
|
||||
addressMustNotBeBound(messagingService.myHostPort)
|
||||
@ -293,6 +293,7 @@ class DriverDSL(
|
||||
startNetworkMapService()
|
||||
messagingService.configureWithDevSSLCertificate()
|
||||
messagingService.start()
|
||||
thread { messagingService.run() }
|
||||
messagingServiceStarted = true
|
||||
// We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from
|
||||
// the network map service itself
|
||||
@ -361,7 +362,7 @@ class DriverDSL(
|
||||
): Process {
|
||||
|
||||
// Write node.conf
|
||||
writeConfig("${cliParams.baseDirectory}", "node.conf", config)
|
||||
writeConfig(cliParams.baseDirectory, "node.conf", config)
|
||||
|
||||
val className = NodeRunner::class.java.canonicalName
|
||||
val separator = System.getProperty("file.separator")
|
||||
|
@ -8,8 +8,8 @@ import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.node.internal.Node
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingClient
|
||||
import com.r3corda.node.services.config.NodeConfigurationFromConfig
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingClient
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import joptsimple.ArgumentAcceptingOptionSpec
|
||||
import joptsimple.OptionParser
|
||||
@ -64,6 +64,7 @@ class NodeRunner {
|
||||
|
||||
log.info("Starting ${nodeConfiguration.myLegalName} with services $services on addresses $messagingAddress and $apiAddress")
|
||||
node.start()
|
||||
node.run()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package com.r3corda.node.internal
|
||||
|
||||
import com.codahale.metrics.JmxReporter
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.ServiceHub
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
@ -31,12 +32,12 @@ import java.net.InetSocketAddress
|
||||
import java.nio.channels.FileLock
|
||||
import java.nio.file.Path
|
||||
import java.time.Clock
|
||||
import java.util.concurrent.TimeUnit
|
||||
import javax.management.ObjectName
|
||||
import kotlin.concurrent.thread
|
||||
|
||||
class ConfigurationException(message: String) : Exception(message)
|
||||
|
||||
// TODO: Split this into a regression testing environment
|
||||
|
||||
/**
|
||||
* A Node manages a standalone server that takes part in the P2P network. It creates the services found in [ServiceHub],
|
||||
* loads important data off disk and starts listening for connections.
|
||||
@ -63,6 +64,43 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
|
||||
|
||||
override val log = loggerFor<Node>()
|
||||
|
||||
// DISCUSSION
|
||||
//
|
||||
// We use a single server thread for now, which means all message handling is serialized.
|
||||
//
|
||||
// Writing thread safe code is hard. In this project we are writing most node services and code to be thread safe, but
|
||||
// the possibility of mistakes is always present. Thus we make a deliberate decision here to trade off some multi-core
|
||||
// scalability in order to gain developer productivity by setting the size of the serverThread pool to one, which will
|
||||
// reduce the number of threading bugs we will need to tackle.
|
||||
//
|
||||
// This leaves us with four possibilities in future:
|
||||
//
|
||||
// (1) We discover that processing messages is fast and that our eventual use cases do not need very high
|
||||
// processing rates. We have benefited from the higher productivity and not lost anything.
|
||||
//
|
||||
// (2) We discover that we need greater multi-core scalability, but that the bulk of our time goes into particular CPU
|
||||
// hotspots that are easily multi-threaded e.g. signature checking. We successfully multi-thread those hotspots
|
||||
// and find that our software now scales sufficiently well to satisfy our user's needs.
|
||||
//
|
||||
// (3) We discover that it wasn't enough, but that we only need to run some messages in parallel and that the bulk of
|
||||
// the work can stay single threaded. For example perhaps we find that latency sensitive UI requests must be handled
|
||||
// on a separate thread pool where long blocking operations are not allowed, but that the bulk of the heavy lifting
|
||||
// can stay single threaded. In this case we would need a separate thread pool, but we still minimise the amount of
|
||||
// thread safe code we need to write and test.
|
||||
//
|
||||
// (4) None of the above are sufficient and we need to run all messages in parallel to get maximum (single machine)
|
||||
// scalability and fully saturate all cores. In that case we can go fully free-threaded, e.g. change the number '1'
|
||||
// below to some multiple of the core count. Alternatively by using the ForkJoinPool and let it figure out the right
|
||||
// number of threads by itself. This will require some investment in stress testing to build confidence that we
|
||||
// haven't made any mistakes, but it will only be necessary if eventual deployment scenarios demand it.
|
||||
//
|
||||
// Note that the messaging subsystem schedules work onto this thread in a blocking manner. That means if the server
|
||||
// thread becomes too slow and a backlog of work starts to builds up it propagates back through into the messaging
|
||||
// layer, which can then react to the backpressure. Artemis MQ in particular knows how to do flow control by paging
|
||||
// messages to disk rather than letting us run out of RAM.
|
||||
//
|
||||
// The primary work done by the server thread is execution of protocol logics, and related
|
||||
// serialisation/deserialisation work.
|
||||
override val serverThread = AffinityExecutor.ServiceAffinityExecutor("Node thread", 1)
|
||||
|
||||
lateinit var webServer: Server
|
||||
@ -180,20 +218,46 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
|
||||
}.
|
||||
build().
|
||||
start()
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(thread(start = false) {
|
||||
stop()
|
||||
})
|
||||
|
||||
return this
|
||||
}
|
||||
|
||||
/** Starts a blocking event loop for message dispatch. */
|
||||
fun run() {
|
||||
(net as ArtemisMessagingClient).run()
|
||||
}
|
||||
|
||||
// TODO: Do we really need setup?
|
||||
override fun setup(): Node {
|
||||
super.setup()
|
||||
return this
|
||||
}
|
||||
|
||||
private var shutdown = false
|
||||
|
||||
override fun stop() {
|
||||
check(!serverThread.isOnThread)
|
||||
synchronized(this) {
|
||||
if (shutdown) return
|
||||
shutdown = true
|
||||
}
|
||||
log.info("Shutting down ...")
|
||||
// Shut down the web server.
|
||||
webServer.stop()
|
||||
// Terminate the messaging system. This will block until messages that are in-flight have finished being
|
||||
// processed so it may take a moment.
|
||||
super.stop()
|
||||
// We do another wait here, even though any in-flight messages have been drained away now because the
|
||||
// server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is
|
||||
// arbitrary and might be inappropriate.
|
||||
MoreExecutors.shutdownAndAwaitTermination(serverThread, 50, TimeUnit.SECONDS)
|
||||
messageBroker?.stop()
|
||||
nodeFileLock!!.release()
|
||||
serverThread.shutdownNow()
|
||||
log.info("Shutdown complete")
|
||||
}
|
||||
|
||||
private fun alreadyRunningNodeCheck() {
|
||||
|
@ -9,6 +9,12 @@ import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.protocols.ProtocolLogicRefFactory
|
||||
|
||||
interface MessagingServiceInternal: MessagingService {
|
||||
/**
|
||||
* Initiates shutdown: if called from a thread that isn't controlled by the executor passed to the constructor
|
||||
* then this will block until all in-flight messages have finished being handled and acknowledged. If called
|
||||
* from a thread that's a part of the [AffinityExecutor] given to the constructor, it returns immediately and
|
||||
* shutdown is asynchronous.
|
||||
*/
|
||||
fun stop()
|
||||
}
|
||||
|
||||
|
@ -1,13 +1,15 @@
|
||||
package com.r3corda.node.services.messaging
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.r3corda.core.RunOnCallerThread
|
||||
import com.r3corda.core.ThreadBox
|
||||
import com.r3corda.core.messaging.*
|
||||
import com.r3corda.core.serialization.opaque
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.node.internal.Node
|
||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import java.nio.file.FileSystems
|
||||
@ -15,23 +17,28 @@ import java.nio.file.Path
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.Executor
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* This class implements the [MessagingService] API using Apache Artemis, the successor to their ActiveMQ product.
|
||||
* Artemis is a message queue broker and here we run a client connecting to the specified broker instance [ArtemisMessagingServer]
|
||||
* Artemis is a message queue broker and here we run a client connecting to the specified broker instance
|
||||
* [ArtemisMessagingServer].
|
||||
*
|
||||
* Message handlers are run on the provided [AffinityExecutor] synchronously, that is, the Artemis callback threads
|
||||
* are blocked until the handler is scheduled and completed. This allows backpressure to propagate from the given executor
|
||||
* through into Artemis and from there, back through to senders.
|
||||
*
|
||||
* @param serverHostPort The address of the broker instance to connect to (might be running in the same process)
|
||||
* @param myHostPort What host and port to use as an address for incoming messages
|
||||
* @param defaultExecutor This will be used as the default executor to run message handlers on, if no other is specified.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class ArtemisMessagingClient(directory: Path,
|
||||
config: NodeConfiguration,
|
||||
val serverHostPort: HostAndPort,
|
||||
val myHostPort: HostAndPort,
|
||||
val defaultExecutor: Executor = RunOnCallerThread) : ArtemisMessagingComponent(directory, config), MessagingServiceInternal {
|
||||
val executor: AffinityExecutor) : ArtemisMessagingComponent(directory, config), MessagingServiceInternal {
|
||||
companion object {
|
||||
val log = loggerFor<ArtemisMessagingClient>()
|
||||
|
||||
@ -51,8 +58,12 @@ class ArtemisMessagingClient(directory: Path,
|
||||
}
|
||||
|
||||
private class InnerState {
|
||||
var started = false
|
||||
var running = false
|
||||
val producers = HashMap<Address, ClientProducer>()
|
||||
var consumer: ClientConsumer? = null
|
||||
var session: ClientSession? = null
|
||||
var clientFactory: ClientSessionFactory? = null
|
||||
}
|
||||
|
||||
/** A registration to handle messages of different types */
|
||||
@ -62,14 +73,9 @@ class ArtemisMessagingClient(directory: Path,
|
||||
|
||||
override val myAddress: SingleMessageRecipient = Address(myHostPort)
|
||||
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
private val state = ThreadBox(InnerState())
|
||||
private val handlers = CopyOnWriteArrayList<Handler>()
|
||||
|
||||
private var serverLocator: ServerLocator? = null
|
||||
private var clientFactory: ClientSessionFactory? = null
|
||||
private var session: ClientSession? = null
|
||||
private var consumer: ClientConsumer? = null
|
||||
|
||||
// TODO: This is not robust and needs to be replaced by more intelligently using the message queue server.
|
||||
private val undeliveredMessages = CopyOnWriteArrayList<Message>()
|
||||
|
||||
@ -77,43 +83,86 @@ class ArtemisMessagingClient(directory: Path,
|
||||
require(directory.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
|
||||
}
|
||||
|
||||
fun start() = mutex.locked {
|
||||
if (!running) {
|
||||
configureAndStartClient()
|
||||
running = true
|
||||
fun start() {
|
||||
state.locked {
|
||||
check(!started) { "start can't be called twice" }
|
||||
started = true
|
||||
|
||||
log.info("Connecting to server: $serverHostPort")
|
||||
// Connect to our server.
|
||||
val tcpTransport = tcpTransport(ConnectionDirection.OUTBOUND, serverHostPort.hostText, serverHostPort.port)
|
||||
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport)
|
||||
clientFactory = locator.createSessionFactory()
|
||||
|
||||
// Create a queue on which to receive messages and set up the handler.
|
||||
val session = clientFactory!!.createSession()
|
||||
this.session = session
|
||||
|
||||
val address = myHostPort.toString()
|
||||
val queueName = myHostPort.toString()
|
||||
session.createQueue(address, queueName, false)
|
||||
consumer = session.createConsumer(queueName)
|
||||
session.start()
|
||||
}
|
||||
}
|
||||
|
||||
private fun configureAndStartClient() {
|
||||
log.info("Connecting to server: $serverHostPort")
|
||||
// Connect to our server.
|
||||
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(
|
||||
tcpTransport(ConnectionDirection.OUTBOUND, serverHostPort.hostText, serverHostPort.port))
|
||||
this.serverLocator = serverLocator
|
||||
val clientFactory = serverLocator.createSessionFactory()
|
||||
this.clientFactory = clientFactory
|
||||
private var shutdownLatch = CountDownLatch(1)
|
||||
|
||||
// Create a queue on which to receive messages and set up the handler.
|
||||
val session = clientFactory.createSession()
|
||||
this.session = session
|
||||
/** Starts the event loop: this method only returns once [stop] has been called. */
|
||||
fun run() {
|
||||
val consumer = state.locked {
|
||||
check(started)
|
||||
check(!running) { "run can't be called twice" }
|
||||
running = true
|
||||
consumer!!
|
||||
}
|
||||
|
||||
val address = myHostPort.toString()
|
||||
val queueName = myHostPort.toString()
|
||||
session.createQueue(address, queueName, false)
|
||||
consumer = session.createConsumer(queueName).setMessageHandler { message: ClientMessage -> handleIncomingMessage(message) }
|
||||
session.start()
|
||||
while (true) {
|
||||
// Two possibilities here:
|
||||
//
|
||||
// 1. We block waiting for a message and the consumer is closed in another thread. In this case
|
||||
// receive returns null and we break out of the loop.
|
||||
// 2. We receive a message and process it, and stop() is called during delivery. In this case,
|
||||
// calling receive will throw and we break out of the loop.
|
||||
//
|
||||
// It's safe to call into receive simultaneous with other threads calling send on a producer.
|
||||
val artemisMessage: ClientMessage = try {
|
||||
consumer.receive()
|
||||
} catch(e: ActiveMQObjectClosedException) {
|
||||
null
|
||||
} ?: break
|
||||
|
||||
val message: Message? = artemisToCordaMessage(artemisMessage)
|
||||
if (message != null)
|
||||
deliver(message)
|
||||
|
||||
// Ack the message so it won't be redelivered. We should only really do this when there were no
|
||||
// transient failures. If we caught an exception in the handler, we could back off and retry delivery
|
||||
// a few times before giving up and redirecting the message to a dead-letter address for admin or
|
||||
// developer inspection. Artemis has the features to do this for us, we just need to enable them.
|
||||
//
|
||||
// TODO: Setup Artemis delayed redelivery and dead letter addresses.
|
||||
//
|
||||
// ACKing a message calls back into the session which isn't thread safe, so we have to ensure it
|
||||
// doesn't collide with a send here. Note that stop() could have been called whilst we were
|
||||
// processing a message but if so, it'll be parked waiting for us to count down the latch, so
|
||||
// the session itself is still around and we can still ack messages as a result.
|
||||
state.locked {
|
||||
artemisMessage.acknowledge()
|
||||
}
|
||||
}
|
||||
shutdownLatch.countDown()
|
||||
}
|
||||
|
||||
private fun handleIncomingMessage(message: ClientMessage) {
|
||||
// This code runs for every inbound message.
|
||||
private fun artemisToCordaMessage(message: ClientMessage): Message? {
|
||||
try {
|
||||
if (!message.containsProperty(TOPIC_PROPERTY)) {
|
||||
log.warn("Received message without a $TOPIC_PROPERTY property, ignoring")
|
||||
return
|
||||
return null
|
||||
}
|
||||
if (!message.containsProperty(SESSION_ID_PROPERTY)) {
|
||||
log.warn("Received message without a $SESSION_ID_PROPERTY property, ignoring")
|
||||
return
|
||||
return null
|
||||
}
|
||||
val topic = message.getStringProperty(TOPIC_PROPERTY)
|
||||
val sessionID = message.getLongProperty(SESSION_ID_PROPERTY)
|
||||
@ -126,18 +175,18 @@ class ArtemisMessagingClient(directory: Path,
|
||||
override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp)
|
||||
override val debugMessageID: String = message.messageID.toString()
|
||||
override fun serialise(): ByteArray = body
|
||||
override fun toString() = topic + "#" + String(data)
|
||||
override fun toString() = topic + "#" + data.opaque()
|
||||
}
|
||||
|
||||
deliverMessage(msg)
|
||||
} finally {
|
||||
// TODO the message is delivered onto an executor and so we may be acking the message before we've
|
||||
// finished processing it
|
||||
message.acknowledge()
|
||||
return msg
|
||||
} catch (e: Exception) {
|
||||
log.error("Internal error whilst reading MQ message", e)
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
private fun deliverMessage(msg: Message): Boolean {
|
||||
private fun deliver(msg: Message): Boolean {
|
||||
state.checkNotLocked()
|
||||
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
|
||||
// or removed whilst the filter is executing will not affect anything.
|
||||
val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession }
|
||||
@ -155,57 +204,82 @@ class ArtemisMessagingClient(directory: Path,
|
||||
}
|
||||
|
||||
for (handler in deliverTo) {
|
||||
(handler.executor ?: defaultExecutor).execute {
|
||||
try {
|
||||
try {
|
||||
// This will perform a BLOCKING call onto the executor. Thus if the handlers are slow, we will
|
||||
// be slow, and Artemis can handle that case intelligently. We don't just invoke the handler
|
||||
// directly in order to ensure that we have the features of the AffinityExecutor class throughout
|
||||
// the bulk of the codebase and other non-messaging jobs can be scheduled onto the server executor
|
||||
// easily.
|
||||
//
|
||||
// Note that handlers may re-enter this class. We aren't holding any locks and methods like
|
||||
// start/run/stop have re-entrancy assertions at the top, so it is OK.
|
||||
executor.fetchFrom {
|
||||
handler.callback(msg, handler)
|
||||
} catch(e: Exception) {
|
||||
log.error("Caught exception whilst executing message handler for ${msg.topicSession}", e)
|
||||
}
|
||||
} catch(e: Exception) {
|
||||
log.error("Caught exception whilst executing message handler for ${msg.topicSession}", e)
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
override fun stop() = mutex.locked {
|
||||
for (producer in producers.values) producer.close()
|
||||
producers.clear()
|
||||
consumer?.close()
|
||||
session?.close()
|
||||
clientFactory?.close()
|
||||
serverLocator?.close()
|
||||
// We expect to be garbage collected shortly after being stopped, so we don't null anything explicitly here.
|
||||
running = false
|
||||
override fun stop() {
|
||||
val running = state.locked {
|
||||
// We allow stop() to be called without a run() in between, but it must have at least been started.
|
||||
check(started)
|
||||
|
||||
val c = consumer ?: throw IllegalStateException("stop can't be called twice")
|
||||
try {
|
||||
c.close()
|
||||
} catch(e: ActiveMQObjectClosedException) {
|
||||
// Ignore it: this can happen if the server has gone away before we do.
|
||||
}
|
||||
consumer = null
|
||||
running
|
||||
}
|
||||
if (running && !executor.isOnThread) {
|
||||
// Wait for the main loop to notice the consumer has gone and finish up.
|
||||
shutdownLatch.await()
|
||||
}
|
||||
state.locked {
|
||||
for (producer in producers.values) producer.close()
|
||||
producers.clear()
|
||||
|
||||
// Closing the factory closes all the sessions it produced as well.
|
||||
clientFactory!!.close()
|
||||
clientFactory = null
|
||||
}
|
||||
}
|
||||
|
||||
override fun send(message: Message, target: MessageRecipients) {
|
||||
if (target !is Address)
|
||||
TODO("Only simple sends to single recipients are currently implemented")
|
||||
val artemisMessage = session!!.createMessage(true).apply {
|
||||
val sessionID = message.topicSession.sessionID
|
||||
putStringProperty(TOPIC_PROPERTY, message.topicSession.topic)
|
||||
putLongProperty(SESSION_ID_PROPERTY, sessionID)
|
||||
writeBodyBufferBytes(message.data)
|
||||
}
|
||||
getProducerForAddress(target).send(artemisMessage)
|
||||
}
|
||||
|
||||
private fun getProducerForAddress(address: Address): ClientProducer {
|
||||
return mutex.locked {
|
||||
producers.getOrPut(address) {
|
||||
if (address != myAddress) {
|
||||
maybeCreateQueue(address.hostAndPort)
|
||||
}
|
||||
session!!.createProducer(address.hostAndPort.toString())
|
||||
state.locked {
|
||||
val artemisMessage = session!!.createMessage(true).apply {
|
||||
val sessionID = message.topicSession.sessionID
|
||||
putStringProperty(TOPIC_PROPERTY, message.topicSession.topic)
|
||||
putLongProperty(SESSION_ID_PROPERTY, sessionID)
|
||||
writeBodyBufferBytes(message.data)
|
||||
}
|
||||
|
||||
val producer = producers.getOrPut(target) {
|
||||
if (target != myAddress)
|
||||
maybeCreateQueue(target.hostAndPort)
|
||||
session!!.createProducer(target.hostAndPort.toString())
|
||||
}
|
||||
producer.send(artemisMessage)
|
||||
}
|
||||
}
|
||||
|
||||
private fun maybeCreateQueue(hostAndPort: HostAndPort) {
|
||||
val name = hostAndPort.toString()
|
||||
val queueQuery = session!!.queueQuery(SimpleString(name))
|
||||
if (!queueQuery.isExists) {
|
||||
session!!.createQueue(name, name, true /* durable */)
|
||||
state.alreadyLocked {
|
||||
val name = hostAndPort.toString()
|
||||
val queueQuery = session!!.queueQuery(SimpleString(name))
|
||||
if (!queueQuery.isExists) {
|
||||
session!!.createQueue(name, name, true /* durable */)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -219,7 +293,7 @@ class ArtemisMessagingClient(directory: Path,
|
||||
require(!topicSession.isBlank()) { "Topic must not be blank, as the empty topic is a special case." }
|
||||
val handler = Handler(executor, topicSession, callback)
|
||||
handlers.add(handler)
|
||||
undeliveredMessages.removeIf { deliverMessage(it) }
|
||||
undeliveredMessages.removeIf { deliver(it) }
|
||||
return handler
|
||||
}
|
||||
|
||||
|
@ -28,6 +28,7 @@ interface AffinityExecutor : Executor {
|
||||
execute(runnable)
|
||||
}
|
||||
|
||||
// TODO: Rename this to executeWithResult
|
||||
/**
|
||||
* Runs the given function on the executor, blocking until the result is available. Be careful not to deadlock this
|
||||
* way! Make sure the executor can't possibly be waiting for the calling thread.
|
||||
|
@ -7,6 +7,7 @@ import com.r3corda.core.testing.freeLocalHostAndPort
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingClient
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingServer
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.After
|
||||
import org.junit.Rule
|
||||
@ -15,9 +16,8 @@ import org.junit.rules.TemporaryFolder
|
||||
import java.net.ServerSocket
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||
import java.util.concurrent.TimeUnit.SECONDS
|
||||
import kotlin.concurrent.thread
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertNotNull
|
||||
import kotlin.test.assertNull
|
||||
|
||||
class ArtemisMessagingTests {
|
||||
@ -65,8 +65,9 @@ class ArtemisMessagingTests {
|
||||
|
||||
createMessagingServer(serverAddress).start()
|
||||
|
||||
val messagingClient = createMessagingClient(server = invalidServerAddress)
|
||||
assertThatThrownBy { messagingClient.start() }
|
||||
messagingClient = createMessagingClient(server = invalidServerAddress)
|
||||
assertThatThrownBy { messagingClient!!.start() }
|
||||
messagingClient = null
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -83,6 +84,7 @@ class ArtemisMessagingTests {
|
||||
|
||||
val messagingClient = createMessagingClient()
|
||||
messagingClient.start()
|
||||
thread { messagingClient.run() }
|
||||
|
||||
messagingClient.addMessageHandler(topic) { message, r ->
|
||||
receivedMessages.add(message)
|
||||
@ -91,15 +93,14 @@ class ArtemisMessagingTests {
|
||||
val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray())
|
||||
messagingClient.send(message, messagingClient.myAddress)
|
||||
|
||||
val actual = receivedMessages.poll(2, SECONDS)
|
||||
assertNotNull(actual)
|
||||
val actual: Message = receivedMessages.take()
|
||||
assertEquals("first msg", String(actual.data))
|
||||
assertNull(receivedMessages.poll(200, MILLISECONDS))
|
||||
}
|
||||
|
||||
private fun createMessagingClient(server: HostAndPort = hostAndPort,
|
||||
local: HostAndPort = hostAndPort): ArtemisMessagingClient {
|
||||
return ArtemisMessagingClient(temporaryFolder.newFolder().toPath(), config, server, local).apply {
|
||||
return ArtemisMessagingClient(temporaryFolder.newFolder().toPath(), config, server, local, AffinityExecutor.SAME_THREAD).apply {
|
||||
configureWithDevSSLCertificate()
|
||||
messagingClient = this
|
||||
}
|
||||
|
@ -336,11 +336,7 @@ private fun runNode(cliParams: CliParams.RunNode): Int {
|
||||
runUploadRates(cliParams.apiAddress)
|
||||
}
|
||||
|
||||
try {
|
||||
while (true) Thread.sleep(Long.MAX_VALUE)
|
||||
} catch(e: InterruptedException) {
|
||||
node.stop()
|
||||
}
|
||||
node.run()
|
||||
} catch (e: NotSetupException) {
|
||||
log.error(e.message)
|
||||
return 1
|
||||
@ -397,7 +393,7 @@ private fun startNode(params: CliParams.RunNode, networkMap: SingleMessageRecipi
|
||||
}
|
||||
|
||||
val node = logElapsedTime("Node startup", log) {
|
||||
Node(params.dir, params.networkAddress, params.apiAddress, config, networkMapId, advertisedServices, DemoClock()).start()
|
||||
Node(params.dir, params.networkAddress, params.apiAddress, config, networkMapId, advertisedServices, DemoClock()).setup().start()
|
||||
}
|
||||
|
||||
return node
|
||||
|
@ -2,22 +2,20 @@ package com.r3corda.demos
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.r3corda.contracts.CommercialPaper
|
||||
import com.r3corda.contracts.asset.DUMMY_CASH_ISSUER
|
||||
import com.r3corda.contracts.asset.cashBalances
|
||||
import com.r3corda.contracts.testing.fillWithSomeTestCash
|
||||
import com.r3corda.core.*
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.crypto.generateKeyPair
|
||||
import com.r3corda.core.days
|
||||
import com.r3corda.core.logElapsedTime
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.seconds
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.core.utilities.Emoji
|
||||
import com.r3corda.core.utilities.LogHelper
|
||||
@ -40,7 +38,7 @@ import java.nio.file.Paths
|
||||
import java.security.PublicKey
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import kotlin.concurrent.thread
|
||||
import kotlin.system.exitProcess
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
@ -73,10 +71,6 @@ val DEFAULT_BASE_DIRECTORY = "./build/trader-demo"
|
||||
private val log: Logger = LoggerFactory.getLogger("TraderDemo")
|
||||
|
||||
fun main(args: Array<String>) {
|
||||
exitProcess(runTraderDemo(args))
|
||||
}
|
||||
|
||||
fun runTraderDemo(args: Array<String>): Int {
|
||||
val parser = OptionParser()
|
||||
|
||||
val roleArg = parser.accepts("role").withRequiredArg().ofType(Role::class.java).required()
|
||||
@ -90,7 +84,7 @@ fun runTraderDemo(args: Array<String>): Int {
|
||||
} catch (e: Exception) {
|
||||
log.error(e.message)
|
||||
printHelp(parser)
|
||||
return 1
|
||||
exitProcess(1)
|
||||
}
|
||||
|
||||
val role = options.valueOf(roleArg)!!
|
||||
@ -160,12 +154,13 @@ fun runTraderDemo(args: Array<String>): Int {
|
||||
if (role == Role.BUYER) {
|
||||
runBuyer(node, amount)
|
||||
} else {
|
||||
node.networkMapRegistrationFuture.get()
|
||||
val party = node.netMapCache.getNodeByLegalName("Bank A")?.identity ?: throw IllegalStateException("Cannot find other node?!")
|
||||
runSeller(node, amount, party)
|
||||
node.networkMapRegistrationFuture.success {
|
||||
val party = node.netMapCache.getNodeByLegalName("Bank A")?.identity ?: throw IllegalStateException("Cannot find other node?!")
|
||||
runSeller(node, amount, party)
|
||||
}
|
||||
}
|
||||
|
||||
return 0
|
||||
node.run()
|
||||
}
|
||||
|
||||
private fun runSeller(node: Node, amount: Amount<Currency>, otherSide: Party) {
|
||||
@ -182,18 +177,20 @@ private fun runSeller(node: Node, amount: Amount<Currency>, otherSide: Party) {
|
||||
}
|
||||
}
|
||||
|
||||
var tradeTX: SignedTransaction? = null
|
||||
val tradeTX: ListenableFuture<SignedTransaction>
|
||||
if (node.isPreviousCheckpointsPresent) {
|
||||
node.smm.findStateMachines(TraderDemoProtocolSeller::class.java).forEach {
|
||||
tradeTX = it.second.get()
|
||||
}
|
||||
tradeTX = node.smm.findStateMachines(TraderDemoProtocolSeller::class.java).single().second
|
||||
} else {
|
||||
val seller = TraderDemoProtocolSeller(otherSide, amount)
|
||||
tradeTX = node.smm.add("demo.seller", seller).get()
|
||||
tradeTX = node.smm.add("demo.seller", seller)
|
||||
}
|
||||
println("Sale completed - we have a happy customer!\n\nFinal transaction is:\n\n${Emoji.renderIfSupported(tradeTX!!.tx)}")
|
||||
|
||||
node.stop()
|
||||
tradeTX.success {
|
||||
println("Sale completed - we have a happy customer!\n\nFinal transaction is:\n\n${Emoji.renderIfSupported(it.tx)}")
|
||||
thread {
|
||||
node.stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun runBuyer(node: Node, amount: Amount<Currency>) {
|
||||
@ -222,13 +219,11 @@ private fun runBuyer(node: Node, amount: Amount<Currency>) {
|
||||
val buyer = TraderDemoProtocolBuyer(otherSide, attachmentsPath, amount)
|
||||
node.smm.add("demo.buyer", buyer)
|
||||
}
|
||||
|
||||
CountDownLatch(1).await() // Prevent the application from terminating
|
||||
}
|
||||
|
||||
// We create a couple of ad-hoc test protocols that wrap the two party trade protocol, to give us the demo logic.
|
||||
|
||||
val DEMO_TOPIC = "initiate.demo.trade"
|
||||
private val DEMO_TOPIC = "initiate.demo.trade"
|
||||
|
||||
private class TraderDemoProtocolBuyer(val otherSide: Party,
|
||||
private val attachmentsPath: Path,
|
||||
|
Loading…
x
Reference in New Issue
Block a user