Merged in artemis-client-service (pull request #267)

Split up ArtemisMessaging into client & service
This commit is contained in:
Andrius Dagys 2016-08-05 14:12:50 +01:00
commit 5c6728b5fd
13 changed files with 630 additions and 524 deletions

View File

@ -2,7 +2,6 @@ package com.r3corda.node.internal
import com.codahale.metrics.JmxReporter import com.codahale.metrics.JmxReporter
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.ServiceType
@ -10,18 +9,15 @@ import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.serialization.NodeClock import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingService import com.r3corda.node.services.messaging.ArtemisMessagingClient
import com.r3corda.node.services.messaging.ArtemisMessagingServer
import com.r3corda.node.servlets.AttachmentDownloadServlet import com.r3corda.node.servlets.AttachmentDownloadServlet
import com.r3corda.node.servlets.Config import com.r3corda.node.servlets.Config
import com.r3corda.node.servlets.DataUploadServlet import com.r3corda.node.servlets.DataUploadServlet
import com.r3corda.node.servlets.ResponseFilter import com.r3corda.node.servlets.ResponseFilter
import com.r3corda.node.utilities.AffinityExecutor import com.r3corda.node.utilities.AffinityExecutor
import org.eclipse.jetty.server.Handler
import org.eclipse.jetty.server.Server import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.handler.DefaultHandler
import org.eclipse.jetty.server.handler.HandlerCollection import org.eclipse.jetty.server.handler.HandlerCollection
import org.eclipse.jetty.server.handler.HandlerList
import org.eclipse.jetty.server.handler.ResourceHandler
import org.eclipse.jetty.servlet.DefaultServlet import org.eclipse.jetty.servlet.DefaultServlet
import org.eclipse.jetty.servlet.ServletContextHandler import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.servlet.ServletHolder import org.eclipse.jetty.servlet.ServletHolder
@ -55,9 +51,10 @@ class ConfigurationException(message: String) : Exception(message)
* but nodes are not required to advertise services they run (hence subset). * but nodes are not required to advertise services they run (hence subset).
* @param clock The clock used within the node and by all protocols etc. * @param clock The clock used within the node and by all protocols etc.
*/ */
class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, configuration: NodeConfiguration, class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
networkMapAddress: NodeInfo?, advertisedServices: Set<ServiceType>, configuration: NodeConfiguration, networkMapAddress: NodeInfo?,
clock: Clock = NodeClock()) : AbstractNode(dir, configuration, networkMapAddress, advertisedServices, clock) { advertisedServices: Set<ServiceType>, clock: Clock = NodeClock(),
val messagingServerAddr: HostAndPort? = null) : AbstractNode(dir, configuration, networkMapAddress, advertisedServices, clock) {
companion object { companion object {
/** The port that is used by default if none is specified. As you know, 31337 is the most elite number. */ /** The port that is used by default if none is specified. As you know, 31337 is the most elite number. */
val DEFAULT_PORT = 31337 val DEFAULT_PORT = 31337
@ -68,17 +65,31 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
override val serverThread = AffinityExecutor.ServiceAffinityExecutor("Node thread", 1) override val serverThread = AffinityExecutor.ServiceAffinityExecutor("Node thread", 1)
lateinit var webServer: Server lateinit var webServer: Server
var messageBroker: ArtemisMessagingServer? = null
// Avoid the lock being garbage collected. We don't really need to release it as the OS will do so for us // Avoid the lock being garbage collected. We don't really need to release it as the OS will do so for us
// when our process shuts down, but we try in stop() anyway just to be nice. // when our process shuts down, but we try in stop() anyway just to be nice.
private var nodeFileLock: FileLock? = null private var nodeFileLock: FileLock? = null
override fun makeMessagingService(): MessagingServiceInternal = ArtemisMessagingService(dir, p2pAddr, configuration, serverThread) override fun makeMessagingService(): MessagingServiceInternal {
val serverAddr = messagingServerAddr ?: {
messageBroker = ArtemisMessagingServer(dir, configuration, p2pAddr)
p2pAddr
}()
return ArtemisMessagingClient(dir, configuration, serverAddr, p2pAddr, serverThread)
}
override fun startMessagingService() { override fun startMessagingService() {
// Start up the MQ service. // Start up the embedded MQ server
(net as ArtemisMessagingService).apply { messageBroker?.apply {
configureWithDevSSLCertificate() // TODO Create proper certificate provisioning process configureWithDevSSLCertificate() // TODO: Create proper certificate provisioning process
start()
}
// Start up the MQ client.
(net as ArtemisMessagingClient).apply {
configureWithDevSSLCertificate() // TODO: Client might need a separate certificate
start() start()
} }
} }
@ -179,6 +190,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
override fun stop() { override fun stop() {
webServer.stop() webServer.stop()
super.stop() super.stop()
messageBroker?.stop()
nodeFileLock!!.release() nodeFileLock!!.release()
serverThread.shutdownNow() serverThread.shutdownNow()
} }

View File

@ -3,7 +3,6 @@ package com.r3corda.node.services.api
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.contracts.SignedTransaction import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.messaging.MessagingService import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.TxWritableStorageService import com.r3corda.core.node.services.TxWritableStorageService
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
@ -11,10 +10,6 @@ import com.r3corda.core.protocols.ProtocolLogicRefFactory
interface MessagingServiceInternal: MessagingService { interface MessagingServiceInternal: MessagingService {
fun stop() fun stop()
// Allow messaging service to be signalled by the NetworkMapCache about Nodes
// Thus providing an opportunity to permission the other Node and possibly to setup a link
fun registerTrustedAddress(address: SingleMessageRecipient)
} }
/** /**

View File

@ -0,0 +1,238 @@
package com.r3corda.node.services.messaging
import com.google.common.net.HostAndPort
import com.r3corda.core.RunOnCallerThread
import com.r3corda.core.ThreadBox
import com.r3corda.core.messaging.*
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.internal.Node
import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.config.NodeConfiguration
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
import java.nio.file.FileSystems
import java.nio.file.Path
import java.time.Instant
import java.util.*
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.Executor
import javax.annotation.concurrent.ThreadSafe
/**
* This class implements the [MessagingService] API using Apache Artemis, the successor to their ActiveMQ product.
* Artemis is a message queue broker and here we run a client connecting to the specified broker instance [ArtemisMessagingServer]
*
* @param serverHostPort The address of the broker instance to connect to (might be running in the same process)
* @param myHostPort What host and port to use as an address for incoming messages
* @param defaultExecutor This will be used as the default executor to run message handlers on, if no other is specified.
*/
@ThreadSafe
class ArtemisMessagingClient(directory: Path,
config: NodeConfiguration,
val serverHostPort: HostAndPort,
val myHostPort: HostAndPort,
val defaultExecutor: Executor = RunOnCallerThread) : ArtemisMessagingComponent(directory, config), MessagingServiceInternal {
companion object {
val log = loggerFor<ArtemisMessagingClient>()
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
// We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint
// that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid
// confusion.
val TOPIC_PROPERTY = "platform-topic"
val SESSION_ID_PROPERTY = "session-id"
/** Temp helper until network map is established. */
fun makeRecipient(hostAndPort: HostAndPort): SingleMessageRecipient = Address(hostAndPort)
fun makeRecipient(hostname: String) = makeRecipient(toHostAndPort(hostname))
fun toHostAndPort(hostname: String) = HostAndPort.fromString(hostname).withDefaultPort(Node.DEFAULT_PORT)
}
private class InnerState {
var running = false
val producers = HashMap<Address, ClientProducer>()
}
/** A registration to handle messages of different types */
data class Handler(val executor: Executor?,
val topicSession: TopicSession,
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
override val myAddress: SingleMessageRecipient = Address(myHostPort)
private val mutex = ThreadBox(InnerState())
private val handlers = CopyOnWriteArrayList<Handler>()
private lateinit var clientFactory: ClientSessionFactory
private var session: ClientSession? = null
private var consumer: ClientConsumer? = null
// TODO: This is not robust and needs to be replaced by more intelligently using the message queue server.
private val undeliveredMessages = CopyOnWriteArrayList<Message>()
init {
require(directory.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
}
fun start() = mutex.locked {
if (!running) {
configureAndStartClient()
running = true
}
}
private fun configureAndStartClient() {
log.info("Connecting to server: $serverHostPort")
// Connect to our server.
clientFactory = ActiveMQClient.createServerLocatorWithoutHA(
tcpTransport(ConnectionDirection.OUTBOUND, serverHostPort.hostText, serverHostPort.port)).createSessionFactory()
// Create a queue on which to receive messages and set up the handler.
val session = clientFactory.createSession()
this.session = session
val address = myHostPort.toString()
val queueName = myHostPort.toString()
session.createQueue(address, queueName, false)
consumer = session.createConsumer(queueName).setMessageHandler { message: ClientMessage -> handleIncomingMessage(message) }
session.start()
}
private fun handleIncomingMessage(message: ClientMessage) {
// This code runs for every inbound message.
try {
if (!message.containsProperty(TOPIC_PROPERTY)) {
log.warn("Received message without a $TOPIC_PROPERTY property, ignoring")
return
}
if (!message.containsProperty(SESSION_ID_PROPERTY)) {
log.warn("Received message without a $SESSION_ID_PROPERTY property, ignoring")
return
}
val topic = message.getStringProperty(TOPIC_PROPERTY)
val sessionID = message.getLongProperty(SESSION_ID_PROPERTY)
val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
val msg = object : Message {
override val topicSession = TopicSession(topic, sessionID)
override val data: ByteArray = body
override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp)
override val debugMessageID: String = message.messageID.toString()
override fun serialise(): ByteArray = body
override fun toString() = topic + "#" + String(data)
}
deliverMessage(msg)
} finally {
// TODO the message is delivered onto an executor and so we may be acking the message before we've
// finished processing it
message.acknowledge()
}
}
private fun deliverMessage(msg: Message): Boolean {
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
// or removed whilst the filter is executing will not affect anything.
val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession }
if (deliverTo.isEmpty()) {
// This should probably be downgraded to a trace in future, so the protocol can evolve with new topics
// without causing log spam.
log.warn("Received message for ${msg.topicSession} that doesn't have any registered handlers yet")
// This is a hack; transient messages held in memory isn't crash resistant.
// TODO: Use Artemis API more effectively so we don't pop messages off a queue that we aren't ready to use.
undeliveredMessages += msg
return false
}
for (handler in deliverTo) {
(handler.executor ?: defaultExecutor).execute {
try {
handler.callback(msg, handler)
} catch(e: Exception) {
log.error("Caught exception whilst executing message handler for ${msg.topicSession}", e)
}
}
}
return true
}
override fun stop() = mutex.locked {
for (producer in producers.values) producer.close()
producers.clear()
consumer?.close()
session?.close()
// We expect to be garbage collected shortly after being stopped, so we don't null anything explicitly here.
running = false
}
override fun send(message: Message, target: MessageRecipients) {
if (target !is Address)
TODO("Only simple sends to single recipients are currently implemented")
val artemisMessage = session!!.createMessage(true).apply {
val sessionID = message.topicSession.sessionID
putStringProperty(TOPIC_PROPERTY, message.topicSession.topic)
putLongProperty(SESSION_ID_PROPERTY, sessionID)
writeBodyBufferBytes(message.data)
}
getProducerForAddress(target).send(artemisMessage)
}
private fun getProducerForAddress(address: Address): ClientProducer {
return mutex.locked {
producers.getOrPut(address) {
if (address != myAddress) {
maybeCreateQueue(address.hostAndPort)
}
session!!.createProducer(address.hostAndPort.toString())
}
}
}
private fun maybeCreateQueue(hostAndPort: HostAndPort) {
val name = hostAndPort.toString()
val queueQuery = session!!.queueQuery(SimpleString(name))
if (!queueQuery.isExists) {
session!!.createQueue(name, name, true /* durable */)
}
}
override fun addMessageHandler(topic: String, sessionID: Long, executor: Executor?,
callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
= addMessageHandler(TopicSession(topic, sessionID), executor, callback)
override fun addMessageHandler(topicSession: TopicSession,
executor: Executor?,
callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
require(!topicSession.isBlank()) { "Topic must not be blank, as the empty topic is a special case." }
val handler = Handler(executor, topicSession, callback)
handlers.add(handler)
undeliveredMessages.removeIf { deliverMessage(it) }
return handler
}
override fun removeMessageHandler(registration: MessageHandlerRegistration) {
handlers.remove(registration)
}
override fun createMessage(topicSession: TopicSession, data: ByteArray): Message {
// TODO: We could write an object that proxies directly to an underlying MQ message here and avoid copying.
return object : Message {
override val topicSession: TopicSession get() = topicSession
override val data: ByteArray get() = data
override val debugTimestamp: Instant = Instant.now()
override fun serialise(): ByteArray = this.serialise()
override val debugMessageID: String get() = Instant.now().toEpochMilli().toString()
override fun toString() = topicSession.toString() + "#" + String(data)
}
}
override fun createMessage(topic: String, sessionID: Long, data: ByteArray): Message
= createMessage(TopicSession(topic, sessionID), data)
}

View File

@ -0,0 +1,95 @@
package com.r3corda.node.services.messaging
import com.google.common.net.HostAndPort
import com.r3corda.core.crypto.X509Utilities
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.node.services.config.NodeConfiguration
import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
import java.nio.file.Files
import java.nio.file.Path
/**
* The base class for Artemis services that defines shared data structures and transport configuration
*
* @param directory A place where Artemis can stash its message journal and other files.
* @param config The config object is used to pass in the passwords for the certificate KeyStore and TrustStore
*/
abstract class ArtemisMessagingComponent(val directory: Path, val config: NodeConfiguration) : SingletonSerializeAsToken() {
private val keyStorePath = directory.resolve("certificates").resolve("sslkeystore.jks")
private val trustStorePath = directory.resolve("certificates").resolve("truststore.jks")
// In future: can contain onion routing info, etc.
protected data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient
protected enum class ConnectionDirection { INBOUND, OUTBOUND }
// Restrict enabled Cipher Suites to AES and GCM as minimum for the bulk cipher.
// Our self-generated certificates all use ECDSA for handshakes, but we allow classical RSA certificates to work
// in case we need to use keytool certificates in some demos
private val CIPHER_SUITES = listOf(
"TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
"TLS_RSA_WITH_AES_128_GCM_SHA256",
"TLS_ECDH_ECDSA_WITH_AES_128_GCM_SHA256",
"TLS_ECDH_RSA_WITH_AES_128_GCM_SHA256",
"TLS_DHE_RSA_WITH_AES_128_GCM_SHA256",
"TLS_DHE_DSS_WITH_AES_128_GCM_SHA256")
protected fun tcpTransport(direction: ConnectionDirection, host: String, port: Int) =
TransportConfiguration(
when (direction) {
ConnectionDirection.INBOUND -> NettyAcceptorFactory::class.java.name
ConnectionDirection.OUTBOUND -> NettyConnectorFactory::class.java.name
},
mapOf(
// Basic TCP target details
TransportConstants.HOST_PROP_NAME to host,
TransportConstants.PORT_PROP_NAME to port.toInt(),
// Turn on AMQP support, which needs the protocol jar on the classpath.
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop
// It does not use AMQP messages for its own messages e.g. topology and heartbeats
// TODO further investigate how to ensure we use a well defined wire level protocol for Node to Node communications
TransportConstants.PROTOCOLS_PROP_NAME to "CORE,AMQP",
// Enable TLS transport layer with client certs and restrict to at least SHA256 in handshake
// and AES encryption
TransportConstants.SSL_ENABLED_PROP_NAME to true,
TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to "JKS",
TransportConstants.KEYSTORE_PATH_PROP_NAME to keyStorePath,
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to config.keyStorePassword, // TODO proper management of keystores and password
TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME to "JKS",
TransportConstants.TRUSTSTORE_PATH_PROP_NAME to trustStorePath,
TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to config.trustStorePassword,
TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","),
TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to "TLSv1.2",
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true
)
)
/**
* Strictly for dev only automatically construct a server certificate/private key signed from
* the CA certs in Node resources. Then provision KeyStores into certificates folder under node path.
*/
fun configureWithDevSSLCertificate() {
val keyStorePath = directory.resolve("certificates").resolve("sslkeystore.jks")
val trustStorePath = directory.resolve("certificates").resolve("truststore.jks")
Files.createDirectories(directory.resolve("certificates"))
if (!Files.exists(trustStorePath)) {
Files.copy(javaClass.classLoader.getResourceAsStream("com/r3corda/node/internal/certificates/cordatruststore.jks"),
trustStorePath)
}
if (!Files.exists(keyStorePath)) {
val caKeyStore = X509Utilities.loadKeyStore(
javaClass.classLoader.getResourceAsStream("com/r3corda/node/internal/certificates/cordadevcakeys.jks"),
"cordacadevpass")
X509Utilities.createKeystoreForSSL(keyStorePath, config.keyStorePassword, config.keyStorePassword, caKeyStore, "cordacadevkeypass")
}
}
}

View File

@ -0,0 +1,144 @@
package com.r3corda.node.services.messaging
import com.google.common.net.HostAndPort
import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.newSecureRandom
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.config.NodeConfiguration
import org.apache.activemq.artemis.core.config.BridgeConfiguration
import org.apache.activemq.artemis.core.config.Configuration
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
import org.apache.activemq.artemis.core.security.Role
import org.apache.activemq.artemis.core.server.ActiveMQServer
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule
import java.math.BigInteger
import java.nio.file.Path
import javax.annotation.concurrent.ThreadSafe
// TODO: Verify that nobody can connect to us and fiddle with our config over the socket due to the secman.
// TODO: Implement a discovery engine that can trigger builds of new connections when another node registers? (later)
/**
* This class configures and manages an Apache Artemis message queue broker.
*
* Nodes communication is managed using an Artemis specific protocol, but it supports other protocols like AMQP/1.0
* as well for interop.
*
* The current implementation is skeletal and lacks features like security or firewall tunnelling (that is, you must
* be able to receive TCP connections in order to receive messages). It is good enough for local communication within
* a fully connected network, trusted network or on localhost.
*/
@ThreadSafe
class ArtemisMessagingServer(directory: Path,
config: NodeConfiguration,
val myHostPort: HostAndPort) : ArtemisMessagingComponent(directory, config) {
companion object {
val log = loggerFor<ArtemisMessagingServer>()
}
private class InnerState {
var running = false
}
val myAddress: SingleMessageRecipient = Address(myHostPort)
private val mutex = ThreadBox(InnerState())
private lateinit var activeMQServer: ActiveMQServer
fun start() = mutex.locked {
if (!running) {
configureAndStartServer()
running = true
}
}
fun stop() = mutex.locked {
activeMQServer.stop()
running = false
}
private fun configureAndStartServer() {
val config = createArtemisConfig(directory, myHostPort).apply {
securityRoles = mapOf(
"#" to setOf(Role("internal", true, true, true, true, true, true, true))
)
}
val securityManager = createArtemisSecurityManager()
activeMQServer = ActiveMQServerImpl(config, securityManager).apply {
// Throw any exceptions which are detected during startup
registerActivationFailureListener { exception -> throw exception }
// Deploy bridge for a newly created queue
registerPostQueueCreationCallback { queueName ->
log.trace("Queue created: $queueName")
maybeDeployBridgeForAddress(queueName.toString())
}
}
activeMQServer.start()
}
private fun createArtemisConfig(directory: Path, hp: HostAndPort): Configuration {
val config = ConfigurationImpl()
setConfigDirectories(config, directory)
// We will be talking to our server purely in memory.
config.acceptorConfigurations = setOf(
tcpTransport(ConnectionDirection.INBOUND, "0.0.0.0", hp.port)
)
return config
}
private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager {
// TODO: set up proper security configuration https://r3-cev.atlassian.net/browse/COR-307
val securityConfig = SecurityConfiguration().apply {
addUser("internal", BigInteger(128, newSecureRandom()).toString(16))
addRole("internal", "internal")
defaultUser = "internal"
}
return ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, securityConfig)
}
/**
* For every queue created we need to have a bridge deployed in case the address of the queue
* is that of a remote party
*/
private fun maybeDeployBridgeForAddress(name: String) {
val hostAndPort = HostAndPort.fromString(name)
fun connectorExists() = name in activeMQServer.configuration.connectorConfigurations
fun addConnector() = activeMQServer.configuration.addConnectorConfiguration(
name,
tcpTransport(
ConnectionDirection.OUTBOUND,
hostAndPort.hostText,
hostAndPort.port
)
)
fun deployBridge() = activeMQServer.deployBridge(BridgeConfiguration().apply {
setName(name)
queueName = name
forwardingAddress = name
staticConnectors = listOf(name)
confirmationWindowSize = 100000 // a guess
})
if (!connectorExists()) {
addConnector()
deployBridge()
}
}
private fun setConfigDirectories(config: Configuration, dir: Path) {
config.apply {
bindingsDirectory = dir.resolve("bindings").toString()
journalDirectory = dir.resolve("journal").toString()
largeMessagesDirectory = dir.resolve("largemessages").toString()
}
}
}

View File

@ -1,414 +0,0 @@
package com.r3corda.node.services.messaging
import com.google.common.net.HostAndPort
import com.r3corda.core.RunOnCallerThread
import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.WhitelistTrustManagerProvider
import com.r3corda.core.crypto.X509Utilities
import com.r3corda.core.crypto.newSecureRandom
import com.r3corda.core.crypto.registerWhitelistTrustManager
import com.r3corda.core.messaging.*
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.internal.Node
import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.config.NodeConfiguration
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.api.core.client.*
import org.apache.activemq.artemis.core.config.BridgeConfiguration
import org.apache.activemq.artemis.core.config.Configuration
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.*
import org.apache.activemq.artemis.core.security.Role
import org.apache.activemq.artemis.core.server.ActiveMQServer
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule
import java.math.BigInteger
import java.nio.file.FileSystems
import java.nio.file.Files
import java.nio.file.Path
import java.time.Instant
import java.util.*
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.Executor
import javax.annotation.concurrent.ThreadSafe
// TODO: Verify that nobody can connect to us and fiddle with our config over the socket due to the secman.
// TODO: Implement a discovery engine that can trigger builds of new connections when another node registers? (later)
/**
* This class implements the [MessagingService] API using Apache Artemis, the successor to their ActiveMQ product.
* Artemis is a message queue broker and here, we embed the entire server inside our own process. Nodes communicate
* with each other using an Artemis specific protocol, but it supports other protocols like AMQP/1.0
* as well for interop.
*
* The current implementation is skeletal and lacks features like security or firewall tunnelling (that is, you must
* be able to receive TCP connections in order to receive messages). It is good enough for local communication within
* a fully connected network, trusted network or on localhost.
*
* @param directory A place where Artemis can stash its message journal and other files.
* @param myHostPort What host and port to bind to for receiving inbound connections.
* @param config The config object is used to pass in the passwords for the certificate KeyStore and TrustStore
* @param defaultExecutor This will be used as the default executor to run message handlers on, if no other is specified.
*/
@ThreadSafe
class ArtemisMessagingService(val directory: Path,
val myHostPort: HostAndPort,
val config: NodeConfiguration,
val defaultExecutor: Executor = RunOnCallerThread) : SingletonSerializeAsToken(), MessagingServiceInternal {
// In future: can contain onion routing info, etc.
private data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient
companion object {
init {
// Until https://issues.apache.org/jira/browse/ARTEMIS-656 is resolved gate acceptable
// certificate hosts manually.
registerWhitelistTrustManager()
}
val log = loggerFor<ArtemisMessagingService>()
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
// We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint
// that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid
// confusion.
val TOPIC_PROPERTY = "platform-topic"
val SESSION_ID_PROPERTY = "session-id"
/** Temp helper until network map is established. */
fun makeRecipient(hostAndPort: HostAndPort): SingleMessageRecipient = Address(hostAndPort)
fun makeRecipient(hostname: String) = makeRecipient(toHostAndPort(hostname))
fun toHostAndPort(hostname: String) = HostAndPort.fromString(hostname).withDefaultPort(Node.DEFAULT_PORT)
}
private lateinit var activeMQServer: ActiveMQServer
private lateinit var clientFactory: ClientSessionFactory
private var session: ClientSession? = null
private var inboundConsumer: ClientConsumer? = null
private class InnerState {
var running = false
val sendClients = HashMap<Address, ClientProducer>()
}
private val mutex = ThreadBox(InnerState())
/** A registration to handle messages of different types */
inner class Handler(val executor: Executor?,
val topicSession: TopicSession,
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
private val handlers = CopyOnWriteArrayList<Handler>()
// TODO: This is not robust and needs to be replaced by more intelligently using the message queue server.
private val undeliveredMessages = CopyOnWriteArrayList<Message>()
private val keyStorePath = directory.resolve("certificates").resolve("sslkeystore.jks")
private val trustStorePath = directory.resolve("certificates").resolve("truststore.jks")
// Restrict enabled Cipher Suites to AES and GCM as minimum for the bulk cipher.
// Our self-generated certificates all use ECDSA for handshakes, but we allow classical RSA certificates to work
// in case we need to use keytool certificates in some demos
private val CIPHER_SUITES = listOf(
"TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
"TLS_RSA_WITH_AES_128_GCM_SHA256",
"TLS_ECDH_ECDSA_WITH_AES_128_GCM_SHA256",
"TLS_ECDH_RSA_WITH_AES_128_GCM_SHA256",
"TLS_DHE_RSA_WITH_AES_128_GCM_SHA256",
"TLS_DHE_DSS_WITH_AES_128_GCM_SHA256")
init {
require(directory.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
}
private fun getSendClient(address: Address): ClientProducer {
return mutex.locked {
sendClients.getOrPut(address) {
if (address != myAddress) {
maybeSetupConnection(address.hostAndPort)
}
session!!.createProducer(address.hostAndPort.toString())
}
}
}
fun start() {
// Wire up various bits of configuration. This is so complicated because Artemis is an embedded message queue
// server. Thus we're running both a "server" and a "client" in the same JVM process. A future node might be
// able to use an external MQ server instead, for instance, if a bank already has an MQ setup and wishes to
// reuse it, or if it makes sense for scaling to split the functionality out, or if it makes sense for security.
//
// But for now, we bundle it all up into one thing.
val config = createArtemisConfig(directory, myHostPort).apply {
securityRoles = mapOf(
"#" to setOf(Role("internal", true, true, true, true, true, true, true))
)
}
val securityConfig = SecurityConfiguration().apply {
addUser("internal", BigInteger(128, newSecureRandom()).toString(16))
addRole("internal", "internal")
defaultUser = "internal"
}
val securityManager = ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, securityConfig)
activeMQServer = ActiveMQServerImpl(config, securityManager)
// Throw any exceptions which are detected during startup
activeMQServer.registerActivationFailureListener { exception -> throw exception }
activeMQServer.start()
// Connect to our server.
clientFactory = ActiveMQClient.createServerLocatorWithoutHA(
tcpTransport(ConnectionDirection.OUTBOUND, myHostPort.hostText, myHostPort.port)).createSessionFactory()
// Create a queue on which to receive messages and set up the handler.
val session = clientFactory.createSession()
this.session = session
session.createQueue(myHostPort.toString(), "inbound", false)
inboundConsumer = session.createConsumer("inbound").setMessageHandler { message: ClientMessage ->
// This code runs for every inbound message.
try {
if (!message.containsProperty(TOPIC_PROPERTY)) {
log.warn("Received message without a $TOPIC_PROPERTY property, ignoring")
return@setMessageHandler
}
if (!message.containsProperty(SESSION_ID_PROPERTY)) {
log.warn("Received message without a $SESSION_ID_PROPERTY property, ignoring")
return@setMessageHandler
}
val topic = message.getStringProperty(TOPIC_PROPERTY)
val sessionID = message.getLongProperty(SESSION_ID_PROPERTY)
val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
val msg = object : Message {
override val topicSession = TopicSession(topic, sessionID)
override val data: ByteArray = body
override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp)
override val debugMessageID: String = message.messageID.toString()
override fun serialise(): ByteArray = body
override fun toString() = topic + "#" + String(data)
}
deliverMessage(msg)
} finally {
// TODO the message is delivered onto an executor and so we may be acking the message before we've
// finished processing it
message.acknowledge()
}
}
session.start()
mutex.locked { running = true }
}
private fun deliverMessage(msg: Message): Boolean {
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
// or removed whilst the filter is executing will not affect anything.
val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession }
if (deliverTo.isEmpty()) {
// This should probably be downgraded to a trace in future, so the protocol can evolve with new topics
// without causing log spam.
log.warn("Received message for ${msg.topicSession} that doesn't have any registered handlers yet")
// This is a hack; transient messages held in memory isn't crash resistant.
// TODO: Use Artemis API more effectively so we don't pop messages off a queue that we aren't ready to use.
undeliveredMessages += msg
return false
}
for (handler in deliverTo) {
(handler.executor ?: defaultExecutor).execute {
try {
handler.callback(msg, handler)
} catch(e: Exception) {
log.error("Caught exception whilst executing message handler for ${msg.topicSession}", e)
}
}
}
return true
}
override fun stop() {
mutex.locked {
for (producer in sendClients.values)
producer.close()
sendClients.clear()
inboundConsumer?.close()
session?.close()
activeMQServer.stop()
// We expect to be garbage collected shortly after being stopped, so we don't null anything explicitly here.
running = false
}
}
override fun registerTrustedAddress(address: SingleMessageRecipient) {
require(address is Address) { "Address is not an Artemis Message Address" }
val hostName = (address as Address).hostAndPort.hostText
WhitelistTrustManagerProvider.addWhitelistEntry(hostName)
}
override fun send(message: Message, target: MessageRecipients) {
if (target !is Address)
TODO("Only simple sends to single recipients are currently implemented")
val artemisMessage = session!!.createMessage(true).apply {
val sessionID = message.topicSession.sessionID
putStringProperty(TOPIC_PROPERTY, message.topicSession.topic)
putLongProperty(SESSION_ID_PROPERTY, sessionID)
writeBodyBufferBytes(message.data)
}
getSendClient(target).send(artemisMessage)
}
override fun addMessageHandler(topic: String, sessionID: Long, executor: Executor?,
callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
= addMessageHandler(TopicSession(topic, sessionID), executor, callback)
override fun addMessageHandler(topicSession: TopicSession,
executor: Executor?,
callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
require(!topicSession.isBlank()) { "Topic must not be blank, as the empty topic is a special case." }
val handler = Handler(executor, topicSession, callback)
handlers.add(handler)
undeliveredMessages.removeIf { deliverMessage(it) }
return handler
}
override fun removeMessageHandler(registration: MessageHandlerRegistration) {
handlers.remove(registration)
}
override fun createMessage(topicSession: TopicSession, data: ByteArray): Message {
// TODO: We could write an object that proxies directly to an underlying MQ message here and avoid copying.
return object : Message {
override val topicSession: TopicSession get() = topicSession
override val data: ByteArray get() = data
override val debugTimestamp: Instant = Instant.now()
override fun serialise(): ByteArray = this.serialise()
override val debugMessageID: String get() = Instant.now().toEpochMilli().toString()
override fun toString() = topicSession.toString() + "#" + String(data)
}
}
override fun createMessage(topic: String, sessionID: Long, data: ByteArray): Message
= createMessage(TopicSession(topic, sessionID), data)
override val myAddress: SingleMessageRecipient = Address(myHostPort)
private enum class ConnectionDirection { INBOUND, OUTBOUND }
private fun maybeSetupConnection(hostAndPort: HostAndPort) {
val name = hostAndPort.toString()
// To make ourselves talk to a remote server, we need a "bridge". Bridges are things inside Artemis that know how
// to handle remote machines going away temporarily, retry connections, etc. They're the bit that handles
// unreliable peers. Thus, we need one bridge per node we are talking to.
//
// Each bridge consumes from a queue on our end and forwards messages to a queue on their end. So for each node
// we must create a queue, then create and configure a bridge.
//
// Note that bridges are not two way. A having a bridge to B does not imply that B can connect back to A. This
// becomes important for cases like firewall tunnelling and connection proxying where connectivity is not
// entirely duplex. The Artemis team may add this functionality in future:
//
// https://issues.apache.org/jira/browse/ARTEMIS-355
if (!session!!.queueQuery(SimpleString(name)).isExists) {
session!!.createQueue(name, name, true /* durable */)
}
if (!activeMQServer.configuration.connectorConfigurations.containsKey(name)) {
activeMQServer.configuration.addConnectorConfiguration(name, tcpTransport(ConnectionDirection.OUTBOUND,
hostAndPort.hostText, hostAndPort.port))
activeMQServer.deployBridge(BridgeConfiguration().apply {
setName(name)
queueName = name
forwardingAddress = name
staticConnectors = listOf(name)
confirmationWindowSize = 100000 // a guess
})
}
}
private fun setConfigDirectories(config: Configuration, dir: Path) {
config.apply {
bindingsDirectory = dir.resolve("bindings").toString()
journalDirectory = dir.resolve("journal").toString()
largeMessagesDirectory = dir.resolve("largemessages").toString()
}
}
private fun createArtemisConfig(directory: Path, hp: HostAndPort): Configuration {
val config = ConfigurationImpl()
setConfigDirectories(config, directory)
// We will be talking to our server purely in memory.
config.acceptorConfigurations = setOf(
tcpTransport(ConnectionDirection.INBOUND, "0.0.0.0", hp.port)
)
return config
}
private fun tcpTransport(direction: ConnectionDirection, host: String, port: Int) =
TransportConfiguration(
when (direction) {
ConnectionDirection.INBOUND -> NettyAcceptorFactory::class.java.name
ConnectionDirection.OUTBOUND -> NettyConnectorFactory::class.java.name
},
mapOf(
// Basic TCP target details
HOST_PROP_NAME to host,
PORT_PROP_NAME to port.toInt(),
// Turn on AMQP support, which needs the protocol jar on the classpath.
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop
// It does not use AMQP messages for its own messages e.g. topology and heartbeats
// TODO further investigate how to ensure we use a well defined wire level protocol for Node to Node communications
PROTOCOLS_PROP_NAME to "CORE,AMQP",
// Enable TLS transport layer with client certs and restrict to at least SHA256 in handshake
// and AES encryption
SSL_ENABLED_PROP_NAME to true,
KEYSTORE_PROVIDER_PROP_NAME to "JKS",
KEYSTORE_PATH_PROP_NAME to keyStorePath,
KEYSTORE_PASSWORD_PROP_NAME to config.keyStorePassword, // TODO proper management of keystores and password
TRUSTSTORE_PROVIDER_PROP_NAME to "JKS",
TRUSTSTORE_PATH_PROP_NAME to trustStorePath,
TRUSTSTORE_PASSWORD_PROP_NAME to config.trustStorePassword,
ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","),
ENABLED_PROTOCOLS_PROP_NAME to "TLSv1.2",
NEED_CLIENT_AUTH_PROP_NAME to true
)
)
/**
* Strictly for dev only automatically construct a server certificate/private key signed from
* the CA certs in Node resources. Then provision KeyStores into certificates folder under node path.
*/
fun configureWithDevSSLCertificate() {
Files.createDirectories(directory.resolve("certificates"))
if (!Files.exists(trustStorePath)) {
Files.copy(javaClass.classLoader.getResourceAsStream("com/r3corda/node/internal/certificates/cordatruststore.jks"),
trustStorePath)
}
if (!Files.exists(keyStorePath)) {
val caKeyStore = X509Utilities.loadKeyStore(
javaClass.classLoader.getResourceAsStream("com/r3corda/node/internal/certificates/cordadevcakeys.jks"),
"cordacadevpass")
X509Utilities.createKeystoreForSSL(keyStorePath, config.keyStorePassword, config.keyStorePassword, caKeyStore, "cordacadevkeypass")
}
}
}

View File

@ -12,6 +12,7 @@ import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.utilities.trace import com.r3corda.core.utilities.trace
import com.r3corda.node.services.api.MessagingServiceBuilder import com.r3corda.node.services.api.MessagingServiceBuilder
import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.network.InMemoryMessagingNetwork.InMemoryMessaging
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import rx.Observable import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
@ -223,8 +224,6 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
} }
} }
override fun registerTrustedAddress(address: SingleMessageRecipient) {}
override fun addMessageHandler(topic: String, sessionID: Long, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration override fun addMessageHandler(topic: String, sessionID: Long, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
= addMessageHandler(TopicSession(topic, sessionID), executor, callback) = addMessageHandler(TopicSession(topic, sessionID), executor, callback)

View File

@ -100,7 +100,6 @@ open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) :
override fun addNode(node: NodeInfo) { override fun addNode(node: NodeInfo) {
registeredNodes[node.identity] = node registeredNodes[node.identity] = node
netInternal?.registerTrustedAddress(node.address)
_changed.onNext(MapChange(node, MapChangeType.Added)) _changed.onNext(MapChange(node, MapChangeType.Added))
} }

View File

@ -1,78 +0,0 @@
package com.r3corda.node.services
import com.r3corda.core.messaging.Message
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.testing.freeLocalHostAndPort
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingService
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.After
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.net.ServerSocket
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.SECONDS
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull
class ArtemisMessagingServiceTests {
@Rule @JvmField val temporaryFolder = TemporaryFolder()
val hostAndPort = freeLocalHostAndPort()
val topic = "platform.self"
var messagingNetwork: ArtemisMessagingService? = null
@After
fun cleanUp() {
messagingNetwork?.stop()
}
@Test
fun `starting with the port already bound`() {
ServerSocket(hostAndPort.port).use {
val messagingNetwork = createMessagingService()
assertThatThrownBy { messagingNetwork.start() }
}
}
@Test
fun `sending message to self`() {
val receivedMessages = LinkedBlockingQueue<Message>()
val messagingNetwork = createMessagingService()
messagingNetwork.start()
messagingNetwork.addMessageHandler(topic) { message, r ->
receivedMessages.add(message)
}
val message = messagingNetwork.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray())
messagingNetwork.send(message, messagingNetwork.myAddress)
val actual = receivedMessages.poll(2, SECONDS)
assertNotNull(actual)
assertEquals("first msg", String(actual.data))
assertNull(receivedMessages.poll(200, MILLISECONDS))
}
private fun createMessagingService(): ArtemisMessagingService {
val config = object: NodeConfiguration {
override val myLegalName: String = "me"
override val exportJMXto: String = ""
override val nearestCity: String = "London"
override val keyStorePassword: String = "testpass"
override val trustStorePassword: String = "trustpass"
}
return ArtemisMessagingService(temporaryFolder.newFolder().toPath(), hostAndPort, config).apply {
configureWithDevSSLCertificate()
messagingNetwork = this
}
}
}

View File

@ -0,0 +1,114 @@
package com.r3corda.node.services
import com.google.common.net.HostAndPort
import com.r3corda.core.messaging.Message
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.testing.freeLocalHostAndPort
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingClient
import com.r3corda.node.services.messaging.ArtemisMessagingServer
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.After
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.net.ServerSocket
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.SECONDS
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull
class ArtemisMessagingTests {
@Rule @JvmField val temporaryFolder = TemporaryFolder()
val hostAndPort = freeLocalHostAndPort()
val topic = "platform.self"
val config = object : NodeConfiguration {
override val myLegalName: String = "me"
override val exportJMXto: String = ""
override val nearestCity: String = "London"
override val keyStorePassword: String = "testpass"
override val trustStorePassword: String = "trustpass"
}
var messagingClient: ArtemisMessagingClient? = null
var messagingServer: ArtemisMessagingServer? = null
@After
fun cleanUp() {
messagingClient?.stop()
messagingServer?.stop()
}
@Test
fun `server starting with the port already bound should throw`() {
ServerSocket(hostAndPort.port).use {
val messagingServer = createMessagingServer()
assertThatThrownBy { messagingServer.start() }
}
}
@Test
fun `client should connect to remote server`() {
val remoteServerAddress = freeLocalHostAndPort()
createMessagingServer(remoteServerAddress).start()
createMessagingClient(server = remoteServerAddress).start()
}
@Test
fun `client should throw if remote server not found`() {
val serverAddress = freeLocalHostAndPort()
val invalidServerAddress = freeLocalHostAndPort()
createMessagingServer(serverAddress).start()
val messagingClient = createMessagingClient(server = invalidServerAddress)
assertThatThrownBy { messagingClient.start() }
}
@Test
fun `client should connect to local server`() {
createMessagingServer().start()
createMessagingClient().start()
}
@Test
fun `client should be able to send message to itself`() {
val receivedMessages = LinkedBlockingQueue<Message>()
createMessagingServer().start()
val messagingClient = createMessagingClient()
messagingClient.start()
messagingClient.addMessageHandler(topic) { message, r ->
receivedMessages.add(message)
}
val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray())
messagingClient.send(message, messagingClient.myAddress)
val actual = receivedMessages.poll(2, SECONDS)
assertNotNull(actual)
assertEquals("first msg", String(actual.data))
assertNull(receivedMessages.poll(200, MILLISECONDS))
}
private fun createMessagingClient(server: HostAndPort = hostAndPort,
local: HostAndPort = hostAndPort): ArtemisMessagingClient {
return ArtemisMessagingClient(temporaryFolder.newFolder().toPath(), config, server, local).apply {
configureWithDevSSLCertificate()
messagingClient = this
}
}
private fun createMessagingServer(local: HostAndPort = hostAndPort): ArtemisMessagingServer {
return ArtemisMessagingServer(temporaryFolder.newFolder().toPath(), config, local).apply {
configureWithDevSSLCertificate()
messagingServer = this
}
}
}

View File

@ -14,19 +14,24 @@ import com.r3corda.demos.api.InterestRateSwapAPI
import com.r3corda.demos.protocols.AutoOfferProtocol import com.r3corda.demos.protocols.AutoOfferProtocol
import com.r3corda.demos.protocols.ExitServerProtocol import com.r3corda.demos.protocols.ExitServerProtocol
import com.r3corda.demos.protocols.UpdateBusinessDayProtocol import com.r3corda.demos.protocols.UpdateBusinessDayProtocol
import com.r3corda.demos.utilities.postJson
import com.r3corda.demos.utilities.putJson
import com.r3corda.demos.utilities.uploadFile
import com.r3corda.node.internal.AbstractNode import com.r3corda.node.internal.AbstractNode
import com.r3corda.node.internal.Node import com.r3corda.node.internal.Node
import com.r3corda.node.internal.testing.MockNetwork import com.r3corda.node.internal.testing.MockNetwork
import com.r3corda.node.services.clientapi.NodeInterestRates import com.r3corda.node.services.clientapi.NodeInterestRates
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.config.NodeConfigurationFromConfig import com.r3corda.node.services.config.NodeConfigurationFromConfig
import com.r3corda.node.services.messaging.ArtemisMessagingService import com.r3corda.node.services.messaging.ArtemisMessagingClient
import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.transactions.SimpleNotaryService import com.r3corda.node.services.transactions.SimpleNotaryService
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import joptsimple.OptionParser import joptsimple.OptionParser
import joptsimple.OptionSet import joptsimple.OptionSet
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.io.File import java.io.File
import java.net.URL import java.net.URL
import java.nio.file.Files import java.nio.file.Files
@ -35,9 +40,6 @@ import java.nio.file.Paths
import java.util.* import java.util.*
import kotlin.concurrent.fixedRateTimer import kotlin.concurrent.fixedRateTimer
import kotlin.system.exitProcess import kotlin.system.exitProcess
import com.r3corda.demos.utilities.*
import org.slf4j.Logger
import org.slf4j.LoggerFactory
// IRS DEMO // IRS DEMO
// //
@ -378,7 +380,7 @@ private fun runTrade(cliParams: CliParams.Trade): Int {
private fun createRecipient(addr: String) : SingleMessageRecipient { private fun createRecipient(addr: String) : SingleMessageRecipient {
val hostAndPort = HostAndPort.fromString(addr).withDefaultPort(Node.DEFAULT_PORT) val hostAndPort = HostAndPort.fromString(addr).withDefaultPort(Node.DEFAULT_PORT)
return ArtemisMessagingService.makeRecipient(hostAndPort) return ArtemisMessagingClient.makeRecipient(hostAndPort)
} }
private fun startNode(params: CliParams.RunNode, networkMap: SingleMessageRecipient) : Node { private fun startNode(params: CliParams.RunNode, networkMap: SingleMessageRecipient) : Node {

View File

@ -14,7 +14,7 @@ import com.r3corda.core.utilities.Emoji
import com.r3corda.node.internal.Node import com.r3corda.node.internal.Node
import com.r3corda.node.services.clientapi.NodeInterestRates import com.r3corda.node.services.clientapi.NodeInterestRates
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingService import com.r3corda.node.services.messaging.ArtemisMessagingClient
import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.transactions.NotaryService import com.r3corda.node.services.transactions.NotaryService
import com.r3corda.protocols.RatesFixProtocol import com.r3corda.protocols.RatesFixProtocol
@ -56,12 +56,12 @@ fun main(args: Array<String>) {
BriefLogFormatter.initVerbose("+demo.ratefix", "-org.apache.activemq") BriefLogFormatter.initVerbose("+demo.ratefix", "-org.apache.activemq")
val dir = Paths.get(options.valueOf(dirArg)) val dir = Paths.get(options.valueOf(dirArg))
val networkMapAddr = ArtemisMessagingService.makeRecipient(options.valueOf(networkMapAddrArg)) val networkMapAddr = ArtemisMessagingClient.makeRecipient(options.valueOf(networkMapAddrArg))
val networkMapIdentity = Files.readAllBytes(Paths.get(options.valueOf(networkMapIdentityArg))).deserialize<Party>() val networkMapIdentity = Files.readAllBytes(Paths.get(options.valueOf(networkMapIdentityArg))).deserialize<Party>()
val networkMapAddress = NodeInfo(networkMapAddr, networkMapIdentity, setOf(NetworkMapService.Type, NotaryService.Type)) val networkMapAddress = NodeInfo(networkMapAddr, networkMapIdentity, setOf(NetworkMapService.Type, NotaryService.Type))
// Load oracle stuff (in lieu of having a network map service) // Load oracle stuff (in lieu of having a network map service)
val oracleAddr = ArtemisMessagingService.makeRecipient(options.valueOf(oracleAddrArg)) val oracleAddr = ArtemisMessagingClient.makeRecipient(options.valueOf(oracleAddrArg))
val oracleIdentity = Files.readAllBytes(Paths.get(options.valueOf(oracleIdentityArg))).deserialize<Party>() val oracleIdentity = Files.readAllBytes(Paths.get(options.valueOf(oracleIdentityArg))).deserialize<Party>()
val oracleNode = NodeInfo(oracleAddr, oracleIdentity) val oracleNode = NodeInfo(oracleAddr, oracleIdentity)
@ -71,7 +71,7 @@ fun main(args: Array<String>) {
// Bring up node. // Bring up node.
val advertisedServices: Set<ServiceType> = emptySet() val advertisedServices: Set<ServiceType> = emptySet()
val myNetAddr = ArtemisMessagingService.toHostAndPort(options.valueOf(networkAddressArg)) val myNetAddr = ArtemisMessagingClient.toHostAndPort(options.valueOf(networkAddressArg))
val config = object : NodeConfiguration { val config = object : NodeConfiguration {
override val myLegalName: String = "Rate fix demo node" override val myLegalName: String = "Rate fix demo node"
override val exportJMXto: String = "http" override val exportJMXto: String = "http"

View File

@ -24,7 +24,7 @@ import com.r3corda.core.utilities.Emoji
import com.r3corda.core.utilities.ProgressTracker import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.node.internal.Node import com.r3corda.node.internal.Node
import com.r3corda.node.services.config.NodeConfigurationFromConfig import com.r3corda.node.services.config.NodeConfigurationFromConfig
import com.r3corda.node.services.messaging.ArtemisMessagingService import com.r3corda.node.services.messaging.ArtemisMessagingClient
import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.persistence.NodeAttachmentService import com.r3corda.node.services.persistence.NodeAttachmentService
import com.r3corda.node.services.transactions.SimpleNotaryService import com.r3corda.node.services.transactions.SimpleNotaryService
@ -147,7 +147,7 @@ fun runTraderDemo(args: Array<String>): Int {
val path = Paths.get(baseDirectory, Role.BUYER.name.toLowerCase(), "identity-public") val path = Paths.get(baseDirectory, Role.BUYER.name.toLowerCase(), "identity-public")
val party = Files.readAllBytes(path).deserialize<Party>() val party = Files.readAllBytes(path).deserialize<Party>()
advertisedServices = emptySet() advertisedServices = emptySet()
NodeInfo(ArtemisMessagingService.makeRecipient(theirNetAddr), party, setOf(NetworkMapService.Type)) NodeInfo(ArtemisMessagingClient.makeRecipient(theirNetAddr), party, setOf(NetworkMapService.Type))
} }
// And now construct then start the node object. It takes a little while. // And now construct then start the node object. It takes a little while.