diff --git a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt index 74743b48e9..9958566634 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt @@ -2,7 +2,6 @@ package com.r3corda.node.internal import com.codahale.metrics.JmxReporter import com.google.common.net.HostAndPort -import com.r3corda.core.messaging.MessagingService import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.services.ServiceType @@ -10,18 +9,15 @@ import com.r3corda.core.utilities.loggerFor import com.r3corda.node.serialization.NodeClock import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.config.NodeConfiguration -import com.r3corda.node.services.messaging.ArtemisMessagingService +import com.r3corda.node.services.messaging.ArtemisMessagingClient +import com.r3corda.node.services.messaging.ArtemisMessagingServer import com.r3corda.node.servlets.AttachmentDownloadServlet import com.r3corda.node.servlets.Config import com.r3corda.node.servlets.DataUploadServlet import com.r3corda.node.servlets.ResponseFilter import com.r3corda.node.utilities.AffinityExecutor -import org.eclipse.jetty.server.Handler import org.eclipse.jetty.server.Server -import org.eclipse.jetty.server.handler.DefaultHandler import org.eclipse.jetty.server.handler.HandlerCollection -import org.eclipse.jetty.server.handler.HandlerList -import org.eclipse.jetty.server.handler.ResourceHandler import org.eclipse.jetty.servlet.DefaultServlet import org.eclipse.jetty.servlet.ServletContextHandler import org.eclipse.jetty.servlet.ServletHolder @@ -55,9 +51,10 @@ class ConfigurationException(message: String) : Exception(message) * but nodes are not required to advertise services they run (hence subset). * @param clock The clock used within the node and by all protocols etc. */ -class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, configuration: NodeConfiguration, - networkMapAddress: NodeInfo?, advertisedServices: Set, - clock: Clock = NodeClock()) : AbstractNode(dir, configuration, networkMapAddress, advertisedServices, clock) { +class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, + configuration: NodeConfiguration, networkMapAddress: NodeInfo?, + advertisedServices: Set, clock: Clock = NodeClock(), + val messagingServerAddr: HostAndPort? = null) : AbstractNode(dir, configuration, networkMapAddress, advertisedServices, clock) { companion object { /** The port that is used by default if none is specified. As you know, 31337 is the most elite number. */ val DEFAULT_PORT = 31337 @@ -68,17 +65,31 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, override val serverThread = AffinityExecutor.ServiceAffinityExecutor("Node thread", 1) lateinit var webServer: Server + var messageBroker: ArtemisMessagingServer? = null // Avoid the lock being garbage collected. We don't really need to release it as the OS will do so for us // when our process shuts down, but we try in stop() anyway just to be nice. private var nodeFileLock: FileLock? = null - override fun makeMessagingService(): MessagingServiceInternal = ArtemisMessagingService(dir, p2pAddr, configuration, serverThread) + override fun makeMessagingService(): MessagingServiceInternal { + val serverAddr = messagingServerAddr ?: { + messageBroker = ArtemisMessagingServer(dir, configuration, p2pAddr) + p2pAddr + }() + + return ArtemisMessagingClient(dir, configuration, serverAddr, p2pAddr, serverThread) + } override fun startMessagingService() { - // Start up the MQ service. - (net as ArtemisMessagingService).apply { - configureWithDevSSLCertificate() // TODO Create proper certificate provisioning process + // Start up the embedded MQ server + messageBroker?.apply { + configureWithDevSSLCertificate() // TODO: Create proper certificate provisioning process + start() + } + + // Start up the MQ client. + (net as ArtemisMessagingClient).apply { + configureWithDevSSLCertificate() // TODO: Client might need a separate certificate start() } } @@ -179,6 +190,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, override fun stop() { webServer.stop() super.stop() + messageBroker?.stop() nodeFileLock!!.release() serverThread.shutdownNow() } diff --git a/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt index 3fe120b410..3d5ce9c33e 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt @@ -3,7 +3,6 @@ package com.r3corda.node.services.api import com.google.common.util.concurrent.ListenableFuture import com.r3corda.core.contracts.SignedTransaction import com.r3corda.core.messaging.MessagingService -import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.services.TxWritableStorageService import com.r3corda.core.protocols.ProtocolLogic @@ -11,10 +10,6 @@ import com.r3corda.core.protocols.ProtocolLogicRefFactory interface MessagingServiceInternal: MessagingService { fun stop() - - // Allow messaging service to be signalled by the NetworkMapCache about Nodes - // Thus providing an opportunity to permission the other Node and possibly to setup a link - fun registerTrustedAddress(address: SingleMessageRecipient) } /** diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt new file mode 100644 index 0000000000..2b2f5374e1 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt @@ -0,0 +1,238 @@ +package com.r3corda.node.services.messaging + +import com.google.common.net.HostAndPort +import com.r3corda.core.RunOnCallerThread +import com.r3corda.core.ThreadBox +import com.r3corda.core.messaging.* +import com.r3corda.core.utilities.loggerFor +import com.r3corda.node.internal.Node +import com.r3corda.node.services.api.MessagingServiceInternal +import com.r3corda.node.services.config.NodeConfiguration +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.api.core.client.* +import java.nio.file.FileSystems +import java.nio.file.Path +import java.time.Instant +import java.util.* +import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.Executor +import javax.annotation.concurrent.ThreadSafe + +/** + * This class implements the [MessagingService] API using Apache Artemis, the successor to their ActiveMQ product. + * Artemis is a message queue broker and here we run a client connecting to the specified broker instance [ArtemisMessagingServer] + * + * @param serverHostPort The address of the broker instance to connect to (might be running in the same process) + * @param myHostPort What host and port to use as an address for incoming messages + * @param defaultExecutor This will be used as the default executor to run message handlers on, if no other is specified. + */ +@ThreadSafe +class ArtemisMessagingClient(directory: Path, + config: NodeConfiguration, + val serverHostPort: HostAndPort, + val myHostPort: HostAndPort, + val defaultExecutor: Executor = RunOnCallerThread) : ArtemisMessagingComponent(directory, config), MessagingServiceInternal { + companion object { + val log = loggerFor() + + // This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic". + // We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint + // that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid + // confusion. + val TOPIC_PROPERTY = "platform-topic" + + val SESSION_ID_PROPERTY = "session-id" + + /** Temp helper until network map is established. */ + fun makeRecipient(hostAndPort: HostAndPort): SingleMessageRecipient = Address(hostAndPort) + + fun makeRecipient(hostname: String) = makeRecipient(toHostAndPort(hostname)) + fun toHostAndPort(hostname: String) = HostAndPort.fromString(hostname).withDefaultPort(Node.DEFAULT_PORT) + } + + private class InnerState { + var running = false + val producers = HashMap() + } + + /** A registration to handle messages of different types */ + data class Handler(val executor: Executor?, + val topicSession: TopicSession, + val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration + + override val myAddress: SingleMessageRecipient = Address(myHostPort) + + private val mutex = ThreadBox(InnerState()) + private val handlers = CopyOnWriteArrayList() + + private lateinit var clientFactory: ClientSessionFactory + private var session: ClientSession? = null + private var consumer: ClientConsumer? = null + + // TODO: This is not robust and needs to be replaced by more intelligently using the message queue server. + private val undeliveredMessages = CopyOnWriteArrayList() + + init { + require(directory.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" } + } + + fun start() = mutex.locked { + if (!running) { + configureAndStartClient() + running = true + } + } + + private fun configureAndStartClient() { + log.info("Connecting to server: $serverHostPort") + // Connect to our server. + clientFactory = ActiveMQClient.createServerLocatorWithoutHA( + tcpTransport(ConnectionDirection.OUTBOUND, serverHostPort.hostText, serverHostPort.port)).createSessionFactory() + + // Create a queue on which to receive messages and set up the handler. + val session = clientFactory.createSession() + this.session = session + + val address = myHostPort.toString() + val queueName = myHostPort.toString() + session.createQueue(address, queueName, false) + consumer = session.createConsumer(queueName).setMessageHandler { message: ClientMessage -> handleIncomingMessage(message) } + session.start() + } + + private fun handleIncomingMessage(message: ClientMessage) { + // This code runs for every inbound message. + try { + if (!message.containsProperty(TOPIC_PROPERTY)) { + log.warn("Received message without a $TOPIC_PROPERTY property, ignoring") + return + } + if (!message.containsProperty(SESSION_ID_PROPERTY)) { + log.warn("Received message without a $SESSION_ID_PROPERTY property, ignoring") + return + } + val topic = message.getStringProperty(TOPIC_PROPERTY) + val sessionID = message.getLongProperty(SESSION_ID_PROPERTY) + + val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) } + + val msg = object : Message { + override val topicSession = TopicSession(topic, sessionID) + override val data: ByteArray = body + override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp) + override val debugMessageID: String = message.messageID.toString() + override fun serialise(): ByteArray = body + override fun toString() = topic + "#" + String(data) + } + + deliverMessage(msg) + } finally { + // TODO the message is delivered onto an executor and so we may be acking the message before we've + // finished processing it + message.acknowledge() + } + } + + private fun deliverMessage(msg: Message): Boolean { + // Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added + // or removed whilst the filter is executing will not affect anything. + val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession } + + if (deliverTo.isEmpty()) { + // This should probably be downgraded to a trace in future, so the protocol can evolve with new topics + // without causing log spam. + log.warn("Received message for ${msg.topicSession} that doesn't have any registered handlers yet") + + // This is a hack; transient messages held in memory isn't crash resistant. + // TODO: Use Artemis API more effectively so we don't pop messages off a queue that we aren't ready to use. + undeliveredMessages += msg + + return false + } + + for (handler in deliverTo) { + (handler.executor ?: defaultExecutor).execute { + try { + handler.callback(msg, handler) + } catch(e: Exception) { + log.error("Caught exception whilst executing message handler for ${msg.topicSession}", e) + } + } + } + + return true + } + + override fun stop() = mutex.locked { + for (producer in producers.values) producer.close() + producers.clear() + consumer?.close() + session?.close() + // We expect to be garbage collected shortly after being stopped, so we don't null anything explicitly here. + running = false + } + + override fun send(message: Message, target: MessageRecipients) { + if (target !is Address) + TODO("Only simple sends to single recipients are currently implemented") + val artemisMessage = session!!.createMessage(true).apply { + val sessionID = message.topicSession.sessionID + putStringProperty(TOPIC_PROPERTY, message.topicSession.topic) + putLongProperty(SESSION_ID_PROPERTY, sessionID) + writeBodyBufferBytes(message.data) + } + getProducerForAddress(target).send(artemisMessage) + } + + private fun getProducerForAddress(address: Address): ClientProducer { + return mutex.locked { + producers.getOrPut(address) { + if (address != myAddress) { + maybeCreateQueue(address.hostAndPort) + } + session!!.createProducer(address.hostAndPort.toString()) + } + } + } + + private fun maybeCreateQueue(hostAndPort: HostAndPort) { + val name = hostAndPort.toString() + val queueQuery = session!!.queueQuery(SimpleString(name)) + if (!queueQuery.isExists) { + session!!.createQueue(name, name, true /* durable */) + } + } + + override fun addMessageHandler(topic: String, sessionID: Long, executor: Executor?, + callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration + = addMessageHandler(TopicSession(topic, sessionID), executor, callback) + + override fun addMessageHandler(topicSession: TopicSession, + executor: Executor?, + callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { + require(!topicSession.isBlank()) { "Topic must not be blank, as the empty topic is a special case." } + val handler = Handler(executor, topicSession, callback) + handlers.add(handler) + undeliveredMessages.removeIf { deliverMessage(it) } + return handler + } + + override fun removeMessageHandler(registration: MessageHandlerRegistration) { + handlers.remove(registration) + } + + override fun createMessage(topicSession: TopicSession, data: ByteArray): Message { + // TODO: We could write an object that proxies directly to an underlying MQ message here and avoid copying. + return object : Message { + override val topicSession: TopicSession get() = topicSession + override val data: ByteArray get() = data + override val debugTimestamp: Instant = Instant.now() + override fun serialise(): ByteArray = this.serialise() + override val debugMessageID: String get() = Instant.now().toEpochMilli().toString() + override fun toString() = topicSession.toString() + "#" + String(data) + } + } + + override fun createMessage(topic: String, sessionID: Long, data: ByteArray): Message + = createMessage(TopicSession(topic, sessionID), data) +} diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingComponent.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingComponent.kt new file mode 100644 index 0000000000..cb190f0af2 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingComponent.kt @@ -0,0 +1,95 @@ +package com.r3corda.node.services.messaging + +import com.google.common.net.HostAndPort +import com.r3corda.core.crypto.X509Utilities +import com.r3corda.core.messaging.SingleMessageRecipient +import com.r3corda.core.serialization.SingletonSerializeAsToken +import com.r3corda.node.services.config.NodeConfiguration +import org.apache.activemq.artemis.api.core.TransportConfiguration +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants +import java.nio.file.Files +import java.nio.file.Path + +/** + * The base class for Artemis services that defines shared data structures and transport configuration + * + * @param directory A place where Artemis can stash its message journal and other files. + * @param config The config object is used to pass in the passwords for the certificate KeyStore and TrustStore + */ +abstract class ArtemisMessagingComponent(val directory: Path, val config: NodeConfiguration) : SingletonSerializeAsToken() { + private val keyStorePath = directory.resolve("certificates").resolve("sslkeystore.jks") + private val trustStorePath = directory.resolve("certificates").resolve("truststore.jks") + + // In future: can contain onion routing info, etc. + protected data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient + + protected enum class ConnectionDirection { INBOUND, OUTBOUND } + + // Restrict enabled Cipher Suites to AES and GCM as minimum for the bulk cipher. + // Our self-generated certificates all use ECDSA for handshakes, but we allow classical RSA certificates to work + // in case we need to use keytool certificates in some demos + private val CIPHER_SUITES = listOf( + "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", + "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", + "TLS_RSA_WITH_AES_128_GCM_SHA256", + "TLS_ECDH_ECDSA_WITH_AES_128_GCM_SHA256", + "TLS_ECDH_RSA_WITH_AES_128_GCM_SHA256", + "TLS_DHE_RSA_WITH_AES_128_GCM_SHA256", + "TLS_DHE_DSS_WITH_AES_128_GCM_SHA256") + + protected fun tcpTransport(direction: ConnectionDirection, host: String, port: Int) = + TransportConfiguration( + when (direction) { + ConnectionDirection.INBOUND -> NettyAcceptorFactory::class.java.name + ConnectionDirection.OUTBOUND -> NettyConnectorFactory::class.java.name + }, + mapOf( + // Basic TCP target details + TransportConstants.HOST_PROP_NAME to host, + TransportConstants.PORT_PROP_NAME to port.toInt(), + + // Turn on AMQP support, which needs the protocol jar on the classpath. + // Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop + // It does not use AMQP messages for its own messages e.g. topology and heartbeats + // TODO further investigate how to ensure we use a well defined wire level protocol for Node to Node communications + TransportConstants.PROTOCOLS_PROP_NAME to "CORE,AMQP", + + // Enable TLS transport layer with client certs and restrict to at least SHA256 in handshake + // and AES encryption + TransportConstants.SSL_ENABLED_PROP_NAME to true, + TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to "JKS", + TransportConstants.KEYSTORE_PATH_PROP_NAME to keyStorePath, + TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to config.keyStorePassword, // TODO proper management of keystores and password + TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME to "JKS", + TransportConstants.TRUSTSTORE_PATH_PROP_NAME to trustStorePath, + TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to config.trustStorePassword, + TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","), + TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to "TLSv1.2", + TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true + ) + ) + + /** + * Strictly for dev only automatically construct a server certificate/private key signed from + * the CA certs in Node resources. Then provision KeyStores into certificates folder under node path. + */ + fun configureWithDevSSLCertificate() { + + val keyStorePath = directory.resolve("certificates").resolve("sslkeystore.jks") + val trustStorePath = directory.resolve("certificates").resolve("truststore.jks") + + Files.createDirectories(directory.resolve("certificates")) + if (!Files.exists(trustStorePath)) { + Files.copy(javaClass.classLoader.getResourceAsStream("com/r3corda/node/internal/certificates/cordatruststore.jks"), + trustStorePath) + } + if (!Files.exists(keyStorePath)) { + val caKeyStore = X509Utilities.loadKeyStore( + javaClass.classLoader.getResourceAsStream("com/r3corda/node/internal/certificates/cordadevcakeys.jks"), + "cordacadevpass") + X509Utilities.createKeystoreForSSL(keyStorePath, config.keyStorePassword, config.keyStorePassword, caKeyStore, "cordacadevkeypass") + } + } +} diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingServer.kt new file mode 100644 index 0000000000..9012cf5141 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingServer.kt @@ -0,0 +1,144 @@ +package com.r3corda.node.services.messaging + +import com.google.common.net.HostAndPort +import com.r3corda.core.ThreadBox +import com.r3corda.core.crypto.newSecureRandom +import com.r3corda.core.messaging.SingleMessageRecipient +import com.r3corda.core.utilities.loggerFor +import com.r3corda.node.services.config.NodeConfiguration +import org.apache.activemq.artemis.core.config.BridgeConfiguration +import org.apache.activemq.artemis.core.config.Configuration +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl +import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration +import org.apache.activemq.artemis.core.security.Role +import org.apache.activemq.artemis.core.server.ActiveMQServer +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl +import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager +import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule +import java.math.BigInteger +import java.nio.file.Path +import javax.annotation.concurrent.ThreadSafe + +// TODO: Verify that nobody can connect to us and fiddle with our config over the socket due to the secman. +// TODO: Implement a discovery engine that can trigger builds of new connections when another node registers? (later) + +/** + * This class configures and manages an Apache Artemis message queue broker. + * + * Nodes communication is managed using an Artemis specific protocol, but it supports other protocols like AMQP/1.0 + * as well for interop. + * + * The current implementation is skeletal and lacks features like security or firewall tunnelling (that is, you must + * be able to receive TCP connections in order to receive messages). It is good enough for local communication within + * a fully connected network, trusted network or on localhost. + */ +@ThreadSafe +class ArtemisMessagingServer(directory: Path, + config: NodeConfiguration, + val myHostPort: HostAndPort) : ArtemisMessagingComponent(directory, config) { + companion object { + val log = loggerFor() + } + + private class InnerState { + var running = false + } + + val myAddress: SingleMessageRecipient = Address(myHostPort) + private val mutex = ThreadBox(InnerState()) + private lateinit var activeMQServer: ActiveMQServer + + fun start() = mutex.locked { + if (!running) { + configureAndStartServer() + running = true + } + } + + fun stop() = mutex.locked { + activeMQServer.stop() + running = false + } + + private fun configureAndStartServer() { + val config = createArtemisConfig(directory, myHostPort).apply { + securityRoles = mapOf( + "#" to setOf(Role("internal", true, true, true, true, true, true, true)) + ) + } + + val securityManager = createArtemisSecurityManager() + + activeMQServer = ActiveMQServerImpl(config, securityManager).apply { + // Throw any exceptions which are detected during startup + registerActivationFailureListener { exception -> throw exception } + // Deploy bridge for a newly created queue + registerPostQueueCreationCallback { queueName -> + log.trace("Queue created: $queueName") + maybeDeployBridgeForAddress(queueName.toString()) + } + } + activeMQServer.start() + } + + private fun createArtemisConfig(directory: Path, hp: HostAndPort): Configuration { + val config = ConfigurationImpl() + setConfigDirectories(config, directory) + // We will be talking to our server purely in memory. + config.acceptorConfigurations = setOf( + tcpTransport(ConnectionDirection.INBOUND, "0.0.0.0", hp.port) + ) + return config + } + + private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager { + // TODO: set up proper security configuration https://r3-cev.atlassian.net/browse/COR-307 + val securityConfig = SecurityConfiguration().apply { + addUser("internal", BigInteger(128, newSecureRandom()).toString(16)) + addRole("internal", "internal") + defaultUser = "internal" + } + + return ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, securityConfig) + } + + /** + * For every queue created we need to have a bridge deployed in case the address of the queue + * is that of a remote party + */ + private fun maybeDeployBridgeForAddress(name: String) { + val hostAndPort = HostAndPort.fromString(name) + + fun connectorExists() = name in activeMQServer.configuration.connectorConfigurations + + fun addConnector() = activeMQServer.configuration.addConnectorConfiguration( + name, + tcpTransport( + ConnectionDirection.OUTBOUND, + hostAndPort.hostText, + hostAndPort.port + ) + ) + + fun deployBridge() = activeMQServer.deployBridge(BridgeConfiguration().apply { + setName(name) + queueName = name + forwardingAddress = name + staticConnectors = listOf(name) + confirmationWindowSize = 100000 // a guess + }) + + if (!connectorExists()) { + addConnector() + deployBridge() + } + } + + private fun setConfigDirectories(config: Configuration, dir: Path) { + config.apply { + bindingsDirectory = dir.resolve("bindings").toString() + journalDirectory = dir.resolve("journal").toString() + largeMessagesDirectory = dir.resolve("largemessages").toString() + } + } +} diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt deleted file mode 100644 index c5da7a90ca..0000000000 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt +++ /dev/null @@ -1,414 +0,0 @@ -package com.r3corda.node.services.messaging - -import com.google.common.net.HostAndPort -import com.r3corda.core.RunOnCallerThread -import com.r3corda.core.ThreadBox -import com.r3corda.core.crypto.WhitelistTrustManagerProvider -import com.r3corda.core.crypto.X509Utilities -import com.r3corda.core.crypto.newSecureRandom -import com.r3corda.core.crypto.registerWhitelistTrustManager -import com.r3corda.core.messaging.* -import com.r3corda.core.serialization.SingletonSerializeAsToken -import com.r3corda.core.utilities.loggerFor -import com.r3corda.node.internal.Node -import com.r3corda.node.services.api.MessagingServiceInternal -import com.r3corda.node.services.config.NodeConfiguration -import org.apache.activemq.artemis.api.core.SimpleString -import org.apache.activemq.artemis.api.core.TransportConfiguration -import org.apache.activemq.artemis.api.core.client.* -import org.apache.activemq.artemis.core.config.BridgeConfiguration -import org.apache.activemq.artemis.core.config.Configuration -import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl -import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration -import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory -import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory -import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.* -import org.apache.activemq.artemis.core.security.Role -import org.apache.activemq.artemis.core.server.ActiveMQServer -import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl -import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager -import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule -import java.math.BigInteger -import java.nio.file.FileSystems -import java.nio.file.Files -import java.nio.file.Path -import java.time.Instant -import java.util.* -import java.util.concurrent.CopyOnWriteArrayList -import java.util.concurrent.Executor -import javax.annotation.concurrent.ThreadSafe - -// TODO: Verify that nobody can connect to us and fiddle with our config over the socket due to the secman. -// TODO: Implement a discovery engine that can trigger builds of new connections when another node registers? (later) - -/** - * This class implements the [MessagingService] API using Apache Artemis, the successor to their ActiveMQ product. - * Artemis is a message queue broker and here, we embed the entire server inside our own process. Nodes communicate - * with each other using an Artemis specific protocol, but it supports other protocols like AMQP/1.0 - * as well for interop. - * - * The current implementation is skeletal and lacks features like security or firewall tunnelling (that is, you must - * be able to receive TCP connections in order to receive messages). It is good enough for local communication within - * a fully connected network, trusted network or on localhost. - * - * @param directory A place where Artemis can stash its message journal and other files. - * @param myHostPort What host and port to bind to for receiving inbound connections. - * @param config The config object is used to pass in the passwords for the certificate KeyStore and TrustStore - * @param defaultExecutor This will be used as the default executor to run message handlers on, if no other is specified. - */ -@ThreadSafe -class ArtemisMessagingService(val directory: Path, - val myHostPort: HostAndPort, - val config: NodeConfiguration, - val defaultExecutor: Executor = RunOnCallerThread) : SingletonSerializeAsToken(), MessagingServiceInternal { - - // In future: can contain onion routing info, etc. - private data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient - - companion object { - init { - // Until https://issues.apache.org/jira/browse/ARTEMIS-656 is resolved gate acceptable - // certificate hosts manually. - registerWhitelistTrustManager() - } - - - val log = loggerFor() - - // This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic". - // We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint - // that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid - // confusion. - val TOPIC_PROPERTY = "platform-topic" - - val SESSION_ID_PROPERTY = "session-id" - - /** Temp helper until network map is established. */ - fun makeRecipient(hostAndPort: HostAndPort): SingleMessageRecipient = Address(hostAndPort) - fun makeRecipient(hostname: String) = makeRecipient(toHostAndPort(hostname)) - fun toHostAndPort(hostname: String) = HostAndPort.fromString(hostname).withDefaultPort(Node.DEFAULT_PORT) - } - - private lateinit var activeMQServer: ActiveMQServer - private lateinit var clientFactory: ClientSessionFactory - private var session: ClientSession? = null - private var inboundConsumer: ClientConsumer? = null - - private class InnerState { - var running = false - val sendClients = HashMap() - } - - private val mutex = ThreadBox(InnerState()) - - /** A registration to handle messages of different types */ - inner class Handler(val executor: Executor?, - val topicSession: TopicSession, - val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration - - private val handlers = CopyOnWriteArrayList() - - // TODO: This is not robust and needs to be replaced by more intelligently using the message queue server. - private val undeliveredMessages = CopyOnWriteArrayList() - - private val keyStorePath = directory.resolve("certificates").resolve("sslkeystore.jks") - private val trustStorePath = directory.resolve("certificates").resolve("truststore.jks") - - // Restrict enabled Cipher Suites to AES and GCM as minimum for the bulk cipher. - // Our self-generated certificates all use ECDSA for handshakes, but we allow classical RSA certificates to work - // in case we need to use keytool certificates in some demos - private val CIPHER_SUITES = listOf( - "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", - "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", - "TLS_RSA_WITH_AES_128_GCM_SHA256", - "TLS_ECDH_ECDSA_WITH_AES_128_GCM_SHA256", - "TLS_ECDH_RSA_WITH_AES_128_GCM_SHA256", - "TLS_DHE_RSA_WITH_AES_128_GCM_SHA256", - "TLS_DHE_DSS_WITH_AES_128_GCM_SHA256") - - init { - require(directory.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" } - } - - private fun getSendClient(address: Address): ClientProducer { - return mutex.locked { - sendClients.getOrPut(address) { - if (address != myAddress) { - maybeSetupConnection(address.hostAndPort) - } - session!!.createProducer(address.hostAndPort.toString()) - } - } - } - - fun start() { - // Wire up various bits of configuration. This is so complicated because Artemis is an embedded message queue - // server. Thus we're running both a "server" and a "client" in the same JVM process. A future node might be - // able to use an external MQ server instead, for instance, if a bank already has an MQ setup and wishes to - // reuse it, or if it makes sense for scaling to split the functionality out, or if it makes sense for security. - // - // But for now, we bundle it all up into one thing. - val config = createArtemisConfig(directory, myHostPort).apply { - securityRoles = mapOf( - "#" to setOf(Role("internal", true, true, true, true, true, true, true)) - ) - } - - val securityConfig = SecurityConfiguration().apply { - addUser("internal", BigInteger(128, newSecureRandom()).toString(16)) - addRole("internal", "internal") - defaultUser = "internal" - } - val securityManager = ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, securityConfig) - - activeMQServer = ActiveMQServerImpl(config, securityManager) - // Throw any exceptions which are detected during startup - activeMQServer.registerActivationFailureListener { exception -> throw exception } - activeMQServer.start() - - // Connect to our server. - clientFactory = ActiveMQClient.createServerLocatorWithoutHA( - tcpTransport(ConnectionDirection.OUTBOUND, myHostPort.hostText, myHostPort.port)).createSessionFactory() - - // Create a queue on which to receive messages and set up the handler. - val session = clientFactory.createSession() - this.session = session - - session.createQueue(myHostPort.toString(), "inbound", false) - inboundConsumer = session.createConsumer("inbound").setMessageHandler { message: ClientMessage -> - // This code runs for every inbound message. - try { - if (!message.containsProperty(TOPIC_PROPERTY)) { - log.warn("Received message without a $TOPIC_PROPERTY property, ignoring") - return@setMessageHandler - } - if (!message.containsProperty(SESSION_ID_PROPERTY)) { - log.warn("Received message without a $SESSION_ID_PROPERTY property, ignoring") - return@setMessageHandler - } - val topic = message.getStringProperty(TOPIC_PROPERTY) - val sessionID = message.getLongProperty(SESSION_ID_PROPERTY) - - val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) } - - val msg = object : Message { - override val topicSession = TopicSession(topic, sessionID) - override val data: ByteArray = body - override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp) - override val debugMessageID: String = message.messageID.toString() - override fun serialise(): ByteArray = body - override fun toString() = topic + "#" + String(data) - } - - deliverMessage(msg) - } finally { - // TODO the message is delivered onto an executor and so we may be acking the message before we've - // finished processing it - message.acknowledge() - } - } - session.start() - - mutex.locked { running = true } - } - - private fun deliverMessage(msg: Message): Boolean { - // Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added - // or removed whilst the filter is executing will not affect anything. - val deliverTo = handlers.filter { it.topicSession.isBlank() || it.topicSession == msg.topicSession } - - if (deliverTo.isEmpty()) { - // This should probably be downgraded to a trace in future, so the protocol can evolve with new topics - // without causing log spam. - log.warn("Received message for ${msg.topicSession} that doesn't have any registered handlers yet") - - // This is a hack; transient messages held in memory isn't crash resistant. - // TODO: Use Artemis API more effectively so we don't pop messages off a queue that we aren't ready to use. - undeliveredMessages += msg - - return false - } - - for (handler in deliverTo) { - (handler.executor ?: defaultExecutor).execute { - try { - handler.callback(msg, handler) - } catch(e: Exception) { - log.error("Caught exception whilst executing message handler for ${msg.topicSession}", e) - } - } - } - - return true - } - - override fun stop() { - mutex.locked { - for (producer in sendClients.values) - producer.close() - sendClients.clear() - inboundConsumer?.close() - session?.close() - activeMQServer.stop() - - // We expect to be garbage collected shortly after being stopped, so we don't null anything explicitly here. - - running = false - } - } - - override fun registerTrustedAddress(address: SingleMessageRecipient) { - require(address is Address) { "Address is not an Artemis Message Address" } - val hostName = (address as Address).hostAndPort.hostText - WhitelistTrustManagerProvider.addWhitelistEntry(hostName) - } - - override fun send(message: Message, target: MessageRecipients) { - if (target !is Address) - TODO("Only simple sends to single recipients are currently implemented") - val artemisMessage = session!!.createMessage(true).apply { - val sessionID = message.topicSession.sessionID - putStringProperty(TOPIC_PROPERTY, message.topicSession.topic) - putLongProperty(SESSION_ID_PROPERTY, sessionID) - writeBodyBufferBytes(message.data) - } - getSendClient(target).send(artemisMessage) - } - - override fun addMessageHandler(topic: String, sessionID: Long, executor: Executor?, - callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration - = addMessageHandler(TopicSession(topic, sessionID), executor, callback) - - override fun addMessageHandler(topicSession: TopicSession, - executor: Executor?, - callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { - require(!topicSession.isBlank()) { "Topic must not be blank, as the empty topic is a special case." } - val handler = Handler(executor, topicSession, callback) - handlers.add(handler) - undeliveredMessages.removeIf { deliverMessage(it) } - return handler - } - - override fun removeMessageHandler(registration: MessageHandlerRegistration) { - handlers.remove(registration) - } - - override fun createMessage(topicSession: TopicSession, data: ByteArray): Message { - // TODO: We could write an object that proxies directly to an underlying MQ message here and avoid copying. - return object : Message { - override val topicSession: TopicSession get() = topicSession - override val data: ByteArray get() = data - override val debugTimestamp: Instant = Instant.now() - override fun serialise(): ByteArray = this.serialise() - override val debugMessageID: String get() = Instant.now().toEpochMilli().toString() - override fun toString() = topicSession.toString() + "#" + String(data) - } - } - - override fun createMessage(topic: String, sessionID: Long, data: ByteArray): Message - = createMessage(TopicSession(topic, sessionID), data) - - override val myAddress: SingleMessageRecipient = Address(myHostPort) - - private enum class ConnectionDirection { INBOUND, OUTBOUND } - - private fun maybeSetupConnection(hostAndPort: HostAndPort) { - val name = hostAndPort.toString() - - // To make ourselves talk to a remote server, we need a "bridge". Bridges are things inside Artemis that know how - // to handle remote machines going away temporarily, retry connections, etc. They're the bit that handles - // unreliable peers. Thus, we need one bridge per node we are talking to. - // - // Each bridge consumes from a queue on our end and forwards messages to a queue on their end. So for each node - // we must create a queue, then create and configure a bridge. - // - // Note that bridges are not two way. A having a bridge to B does not imply that B can connect back to A. This - // becomes important for cases like firewall tunnelling and connection proxying where connectivity is not - // entirely duplex. The Artemis team may add this functionality in future: - // - // https://issues.apache.org/jira/browse/ARTEMIS-355 - if (!session!!.queueQuery(SimpleString(name)).isExists) { - session!!.createQueue(name, name, true /* durable */) - } - if (!activeMQServer.configuration.connectorConfigurations.containsKey(name)) { - activeMQServer.configuration.addConnectorConfiguration(name, tcpTransport(ConnectionDirection.OUTBOUND, - hostAndPort.hostText, hostAndPort.port)) - activeMQServer.deployBridge(BridgeConfiguration().apply { - setName(name) - queueName = name - forwardingAddress = name - staticConnectors = listOf(name) - confirmationWindowSize = 100000 // a guess - }) - } - } - - private fun setConfigDirectories(config: Configuration, dir: Path) { - config.apply { - bindingsDirectory = dir.resolve("bindings").toString() - journalDirectory = dir.resolve("journal").toString() - largeMessagesDirectory = dir.resolve("largemessages").toString() - } - } - - private fun createArtemisConfig(directory: Path, hp: HostAndPort): Configuration { - val config = ConfigurationImpl() - setConfigDirectories(config, directory) - // We will be talking to our server purely in memory. - config.acceptorConfigurations = setOf( - tcpTransport(ConnectionDirection.INBOUND, "0.0.0.0", hp.port) - ) - return config - } - - private fun tcpTransport(direction: ConnectionDirection, host: String, port: Int) = - TransportConfiguration( - when (direction) { - ConnectionDirection.INBOUND -> NettyAcceptorFactory::class.java.name - ConnectionDirection.OUTBOUND -> NettyConnectorFactory::class.java.name - }, - mapOf( - // Basic TCP target details - HOST_PROP_NAME to host, - PORT_PROP_NAME to port.toInt(), - - // Turn on AMQP support, which needs the protocol jar on the classpath. - // Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop - // It does not use AMQP messages for its own messages e.g. topology and heartbeats - // TODO further investigate how to ensure we use a well defined wire level protocol for Node to Node communications - PROTOCOLS_PROP_NAME to "CORE,AMQP", - - // Enable TLS transport layer with client certs and restrict to at least SHA256 in handshake - // and AES encryption - SSL_ENABLED_PROP_NAME to true, - KEYSTORE_PROVIDER_PROP_NAME to "JKS", - KEYSTORE_PATH_PROP_NAME to keyStorePath, - KEYSTORE_PASSWORD_PROP_NAME to config.keyStorePassword, // TODO proper management of keystores and password - TRUSTSTORE_PROVIDER_PROP_NAME to "JKS", - TRUSTSTORE_PATH_PROP_NAME to trustStorePath, - TRUSTSTORE_PASSWORD_PROP_NAME to config.trustStorePassword, - ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","), - ENABLED_PROTOCOLS_PROP_NAME to "TLSv1.2", - NEED_CLIENT_AUTH_PROP_NAME to true - ) - ) - - /** - * Strictly for dev only automatically construct a server certificate/private key signed from - * the CA certs in Node resources. Then provision KeyStores into certificates folder under node path. - */ - fun configureWithDevSSLCertificate() { - Files.createDirectories(directory.resolve("certificates")) - if (!Files.exists(trustStorePath)) { - Files.copy(javaClass.classLoader.getResourceAsStream("com/r3corda/node/internal/certificates/cordatruststore.jks"), - trustStorePath) - } - if (!Files.exists(keyStorePath)) { - val caKeyStore = X509Utilities.loadKeyStore( - javaClass.classLoader.getResourceAsStream("com/r3corda/node/internal/certificates/cordadevcakeys.jks"), - "cordacadevpass") - X509Utilities.createKeystoreForSSL(keyStorePath, config.keyStorePassword, config.keyStorePassword, caKeyStore, "cordacadevkeypass") - } - } - -} diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt index 1940430fc7..29d7908d8a 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt @@ -12,6 +12,7 @@ import com.r3corda.core.utilities.loggerFor import com.r3corda.core.utilities.trace import com.r3corda.node.services.api.MessagingServiceBuilder import com.r3corda.node.services.api.MessagingServiceInternal +import com.r3corda.node.services.network.InMemoryMessagingNetwork.InMemoryMessaging import org.slf4j.LoggerFactory import rx.Observable import rx.subjects.PublishSubject @@ -223,8 +224,6 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria } } - override fun registerTrustedAddress(address: SingleMessageRecipient) {} - override fun addMessageHandler(topic: String, sessionID: Long, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration = addMessageHandler(TopicSession(topic, sessionID), executor, callback) diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt index b023efbc69..1e99eaff4c 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt @@ -100,7 +100,6 @@ open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) : override fun addNode(node: NodeInfo) { registeredNodes[node.identity] = node - netInternal?.registerTrustedAddress(node.address) _changed.onNext(MapChange(node, MapChangeType.Added)) } diff --git a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingServiceTests.kt b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingServiceTests.kt deleted file mode 100644 index a068ce61f9..0000000000 --- a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingServiceTests.kt +++ /dev/null @@ -1,78 +0,0 @@ -package com.r3corda.node.services - -import com.r3corda.core.messaging.Message -import com.r3corda.core.node.services.DEFAULT_SESSION_ID -import com.r3corda.core.testing.freeLocalHostAndPort -import com.r3corda.node.services.config.NodeConfiguration -import com.r3corda.node.services.messaging.ArtemisMessagingService -import org.assertj.core.api.Assertions.assertThatThrownBy -import org.junit.After -import org.junit.Rule -import org.junit.Test -import org.junit.rules.TemporaryFolder -import java.net.ServerSocket -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.TimeUnit.MILLISECONDS -import java.util.concurrent.TimeUnit.SECONDS -import kotlin.test.assertEquals -import kotlin.test.assertNotNull -import kotlin.test.assertNull - -class ArtemisMessagingServiceTests { - - @Rule @JvmField val temporaryFolder = TemporaryFolder() - - val hostAndPort = freeLocalHostAndPort() - val topic = "platform.self" - - var messagingNetwork: ArtemisMessagingService? = null - - @After - fun cleanUp() { - messagingNetwork?.stop() - } - - @Test - fun `starting with the port already bound`() { - ServerSocket(hostAndPort.port).use { - val messagingNetwork = createMessagingService() - assertThatThrownBy { messagingNetwork.start() } - } - } - - @Test - fun `sending message to self`() { - val receivedMessages = LinkedBlockingQueue() - - val messagingNetwork = createMessagingService() - messagingNetwork.start() - - messagingNetwork.addMessageHandler(topic) { message, r -> - receivedMessages.add(message) - } - - val message = messagingNetwork.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray()) - messagingNetwork.send(message, messagingNetwork.myAddress) - - val actual = receivedMessages.poll(2, SECONDS) - assertNotNull(actual) - assertEquals("first msg", String(actual.data)) - assertNull(receivedMessages.poll(200, MILLISECONDS)) - } - - private fun createMessagingService(): ArtemisMessagingService { - val config = object: NodeConfiguration { - override val myLegalName: String = "me" - override val exportJMXto: String = "" - override val nearestCity: String = "London" - override val keyStorePassword: String = "testpass" - override val trustStorePassword: String = "trustpass" - - } - return ArtemisMessagingService(temporaryFolder.newFolder().toPath(), hostAndPort, config).apply { - configureWithDevSSLCertificate() - messagingNetwork = this - } - } - -} \ No newline at end of file diff --git a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt new file mode 100644 index 0000000000..0ca66bef07 --- /dev/null +++ b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt @@ -0,0 +1,114 @@ +package com.r3corda.node.services + +import com.google.common.net.HostAndPort +import com.r3corda.core.messaging.Message +import com.r3corda.core.node.services.DEFAULT_SESSION_ID +import com.r3corda.core.testing.freeLocalHostAndPort +import com.r3corda.node.services.config.NodeConfiguration +import com.r3corda.node.services.messaging.ArtemisMessagingClient +import com.r3corda.node.services.messaging.ArtemisMessagingServer +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.After +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TemporaryFolder +import java.net.ServerSocket +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit.MILLISECONDS +import java.util.concurrent.TimeUnit.SECONDS +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertNull + +class ArtemisMessagingTests { + @Rule @JvmField val temporaryFolder = TemporaryFolder() + + val hostAndPort = freeLocalHostAndPort() + val topic = "platform.self" + val config = object : NodeConfiguration { + override val myLegalName: String = "me" + override val exportJMXto: String = "" + override val nearestCity: String = "London" + override val keyStorePassword: String = "testpass" + override val trustStorePassword: String = "trustpass" + } + + var messagingClient: ArtemisMessagingClient? = null + var messagingServer: ArtemisMessagingServer? = null + + @After + fun cleanUp() { + messagingClient?.stop() + messagingServer?.stop() + } + + @Test + fun `server starting with the port already bound should throw`() { + ServerSocket(hostAndPort.port).use { + val messagingServer = createMessagingServer() + assertThatThrownBy { messagingServer.start() } + } + } + + @Test + fun `client should connect to remote server`() { + val remoteServerAddress = freeLocalHostAndPort() + + createMessagingServer(remoteServerAddress).start() + createMessagingClient(server = remoteServerAddress).start() + } + + @Test + fun `client should throw if remote server not found`() { + val serverAddress = freeLocalHostAndPort() + val invalidServerAddress = freeLocalHostAndPort() + + createMessagingServer(serverAddress).start() + + val messagingClient = createMessagingClient(server = invalidServerAddress) + assertThatThrownBy { messagingClient.start() } + } + + @Test + fun `client should connect to local server`() { + createMessagingServer().start() + createMessagingClient().start() + } + + @Test + fun `client should be able to send message to itself`() { + val receivedMessages = LinkedBlockingQueue() + + createMessagingServer().start() + + val messagingClient = createMessagingClient() + messagingClient.start() + + messagingClient.addMessageHandler(topic) { message, r -> + receivedMessages.add(message) + } + + val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray()) + messagingClient.send(message, messagingClient.myAddress) + + val actual = receivedMessages.poll(2, SECONDS) + assertNotNull(actual) + assertEquals("first msg", String(actual.data)) + assertNull(receivedMessages.poll(200, MILLISECONDS)) + } + + private fun createMessagingClient(server: HostAndPort = hostAndPort, + local: HostAndPort = hostAndPort): ArtemisMessagingClient { + return ArtemisMessagingClient(temporaryFolder.newFolder().toPath(), config, server, local).apply { + configureWithDevSSLCertificate() + messagingClient = this + } + } + + private fun createMessagingServer(local: HostAndPort = hostAndPort): ArtemisMessagingServer { + return ArtemisMessagingServer(temporaryFolder.newFolder().toPath(), config, local).apply { + configureWithDevSSLCertificate() + messagingServer = this + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/r3corda/demos/IRSDemo.kt b/src/main/kotlin/com/r3corda/demos/IRSDemo.kt index c6977b52c5..51dbf3a2a3 100644 --- a/src/main/kotlin/com/r3corda/demos/IRSDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/IRSDemo.kt @@ -14,19 +14,24 @@ import com.r3corda.demos.api.InterestRateSwapAPI import com.r3corda.demos.protocols.AutoOfferProtocol import com.r3corda.demos.protocols.ExitServerProtocol import com.r3corda.demos.protocols.UpdateBusinessDayProtocol +import com.r3corda.demos.utilities.postJson +import com.r3corda.demos.utilities.putJson +import com.r3corda.demos.utilities.uploadFile import com.r3corda.node.internal.AbstractNode import com.r3corda.node.internal.Node import com.r3corda.node.internal.testing.MockNetwork import com.r3corda.node.services.clientapi.NodeInterestRates import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfigurationFromConfig -import com.r3corda.node.services.messaging.ArtemisMessagingService +import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.transactions.SimpleNotaryService import com.typesafe.config.ConfigFactory import joptsimple.OptionParser import joptsimple.OptionSet import org.apache.commons.io.IOUtils +import org.slf4j.Logger +import org.slf4j.LoggerFactory import java.io.File import java.net.URL import java.nio.file.Files @@ -35,9 +40,6 @@ import java.nio.file.Paths import java.util.* import kotlin.concurrent.fixedRateTimer import kotlin.system.exitProcess -import com.r3corda.demos.utilities.* -import org.slf4j.Logger -import org.slf4j.LoggerFactory // IRS DEMO // @@ -378,7 +380,7 @@ private fun runTrade(cliParams: CliParams.Trade): Int { private fun createRecipient(addr: String) : SingleMessageRecipient { val hostAndPort = HostAndPort.fromString(addr).withDefaultPort(Node.DEFAULT_PORT) - return ArtemisMessagingService.makeRecipient(hostAndPort) + return ArtemisMessagingClient.makeRecipient(hostAndPort) } private fun startNode(params: CliParams.RunNode, networkMap: SingleMessageRecipient) : Node { diff --git a/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt b/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt index 99037c5957..39e86788ff 100644 --- a/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt @@ -14,7 +14,7 @@ import com.r3corda.core.utilities.Emoji import com.r3corda.node.internal.Node import com.r3corda.node.services.clientapi.NodeInterestRates import com.r3corda.node.services.config.NodeConfiguration -import com.r3corda.node.services.messaging.ArtemisMessagingService +import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.transactions.NotaryService import com.r3corda.protocols.RatesFixProtocol @@ -56,12 +56,12 @@ fun main(args: Array) { BriefLogFormatter.initVerbose("+demo.ratefix", "-org.apache.activemq") val dir = Paths.get(options.valueOf(dirArg)) - val networkMapAddr = ArtemisMessagingService.makeRecipient(options.valueOf(networkMapAddrArg)) + val networkMapAddr = ArtemisMessagingClient.makeRecipient(options.valueOf(networkMapAddrArg)) val networkMapIdentity = Files.readAllBytes(Paths.get(options.valueOf(networkMapIdentityArg))).deserialize() val networkMapAddress = NodeInfo(networkMapAddr, networkMapIdentity, setOf(NetworkMapService.Type, NotaryService.Type)) // Load oracle stuff (in lieu of having a network map service) - val oracleAddr = ArtemisMessagingService.makeRecipient(options.valueOf(oracleAddrArg)) + val oracleAddr = ArtemisMessagingClient.makeRecipient(options.valueOf(oracleAddrArg)) val oracleIdentity = Files.readAllBytes(Paths.get(options.valueOf(oracleIdentityArg))).deserialize() val oracleNode = NodeInfo(oracleAddr, oracleIdentity) @@ -71,7 +71,7 @@ fun main(args: Array) { // Bring up node. val advertisedServices: Set = emptySet() - val myNetAddr = ArtemisMessagingService.toHostAndPort(options.valueOf(networkAddressArg)) + val myNetAddr = ArtemisMessagingClient.toHostAndPort(options.valueOf(networkAddressArg)) val config = object : NodeConfiguration { override val myLegalName: String = "Rate fix demo node" override val exportJMXto: String = "http" diff --git a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt index ed6c6509c2..2f5da40976 100644 --- a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt @@ -24,7 +24,7 @@ import com.r3corda.core.utilities.Emoji import com.r3corda.core.utilities.ProgressTracker import com.r3corda.node.internal.Node import com.r3corda.node.services.config.NodeConfigurationFromConfig -import com.r3corda.node.services.messaging.ArtemisMessagingService +import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.persistence.NodeAttachmentService import com.r3corda.node.services.transactions.SimpleNotaryService @@ -147,7 +147,7 @@ fun runTraderDemo(args: Array): Int { val path = Paths.get(baseDirectory, Role.BUYER.name.toLowerCase(), "identity-public") val party = Files.readAllBytes(path).deserialize() advertisedServices = emptySet() - NodeInfo(ArtemisMessagingService.makeRecipient(theirNetAddr), party, setOf(NetworkMapService.Type)) + NodeInfo(ArtemisMessagingClient.makeRecipient(theirNetAddr), party, setOf(NetworkMapService.Type)) } // And now construct then start the node object. It takes a little while.