diff --git a/build.gradle b/build.gradle index 86ee5870ed..a72f742a04 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ buildscript { ext.kotlin_version = '1.0.2' ext.quasar_version = '0.7.5' ext.asm_version = '0.5.3' - ext.artemis_version = '1.2.0' + ext.artemis_version = '1.3.0' ext.jetty_version = '9.1.1.v20140108' ext.jersey_version = '2.22.2' ext.jolokia_version = '2.0.0-M1' diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt index b6206e3abe..8756e40bb0 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt @@ -22,12 +22,13 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactor import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.HOST_PROP_NAME import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME import org.apache.activemq.artemis.core.security.Role -import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ +import org.apache.activemq.artemis.core.server.ActiveMQServer +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule import java.math.BigInteger +import java.nio.file.FileSystems import java.nio.file.Path -import java.security.SecureRandom import java.time.Instant import java.util.* import java.util.concurrent.CopyOnWriteArrayList @@ -53,8 +54,10 @@ import javax.annotation.concurrent.ThreadSafe * @param defaultExecutor This will be used as the default executor to run message handlers on, if no other is specified. */ @ThreadSafe -class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort, +class ArtemisMessagingService(val directory: Path, + val myHostPort: HostAndPort, val defaultExecutor: Executor = RunOnCallerThread) : SingletonSerializeAsToken(), MessagingService { + // In future: can contain onion routing info, etc. private data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient @@ -73,10 +76,10 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort, fun toHostAndPort(hostname: String) = HostAndPort.fromString(hostname).withDefaultPort(Node.DEFAULT_PORT) } - private lateinit var mq: EmbeddedActiveMQ + private lateinit var activeMQServer: ActiveMQServer private lateinit var clientFactory: ClientSessionFactory - private lateinit var session: ClientSession - private lateinit var inboundConsumer: ClientConsumer + private var session: ClientSession? = null + private var inboundConsumer: ClientConsumer? = null private class InnerState { var running = false @@ -86,7 +89,8 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort, private val mutex = ThreadBox(InnerState()) /** A registration to handle messages of different types */ - inner class Handler(val executor: Executor?, val topic: String, + inner class Handler(val executor: Executor?, + val topic: String, val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration private val handlers = CopyOnWriteArrayList() @@ -94,13 +98,17 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort, // TODO: This is not robust and needs to be replaced by more intelligently using the message queue server. private val undeliveredMessages = CopyOnWriteArrayList() + init { + require(directory.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" } + } + private fun getSendClient(address: Address): ClientProducer { return mutex.locked { sendClients.getOrPut(address) { if (address != myAddress) { maybeSetupConnection(address.hostAndPort) } - session.createProducer(address.hostAndPort.toString()) + session!!.createProducer(address.hostAndPort.toString()) } } } @@ -112,31 +120,31 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort, // reuse it, or if it makes sense for scaling to split the functionality out, or if it makes sense for security. // // But for now, we bundle it all up into one thing. - mq = EmbeddedActiveMQ() - val config = createArtemisConfig(directory, myHostPort) - mq.setConfiguration(config) - val secConfig = SecurityConfiguration() - val password = BigInteger(128, newSecureRandom()).toString(16) - secConfig.addUser("internal", password) - secConfig.addRole("internal", "internal") - secConfig.defaultUser = "internal" - config.securityRoles = mapOf( - "#" to setOf(Role("internal", true, true, true, true, true, true, true)) - ) - val secManager = ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, secConfig) - mq.setSecurityManager(secManager) + val config = createArtemisConfig(directory, myHostPort).apply { + securityRoles = mapOf( + "#" to setOf(Role("internal", true, true, true, true, true, true, true)) + ) + } - // TODO Currently we cannot find out if something goes wrong during startup :( This is bug ARTEMIS-388 filed by me. - // The fix should be in the 1.3.0 release: - // https://issues.apache.org/jira/browse/ARTEMIS-388 - mq.start() + val securityConfig = SecurityConfiguration().apply { + addUser("internal", BigInteger(128, newSecureRandom()).toString(16)) + addRole("internal", "internal") + defaultUser = "internal" + } + val securityManager = ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, securityConfig) + + activeMQServer = ActiveMQServerImpl(config, securityManager) + // Throw any exceptions which are detected during startup + activeMQServer.registerActivationFailureListener { exception -> throw exception } + activeMQServer.start() // Connect to our in-memory server. clientFactory = ActiveMQClient.createServerLocatorWithoutHA( TransportConfiguration(InVMConnectorFactory::class.java.name)).createSessionFactory() // Create a queue on which to receive messages and set up the handler. - session = clientFactory.createSession() + val session = clientFactory.createSession() + this.session = session session.createQueue(myHostPort.toString(), "inbound", false) inboundConsumer = session.createConsumer("inbound").setMessageHandler { message: ClientMessage -> @@ -206,9 +214,9 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort, for (producer in sendClients.values) producer.close() sendClients.clear() - inboundConsumer.close() - session.close() - mq.stop() + inboundConsumer?.close() + session?.close() + activeMQServer.stop() // We expect to be garbage collected shortly after being stopped, so we don't null anything explicitly here. @@ -219,7 +227,7 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort, override fun send(message: Message, target: MessageRecipients) { if (target !is Address) TODO("Only simple sends to single recipients are currently implemented") - val artemisMessage = session.createMessage(true).putStringProperty("platform-topic", message.topic).writeBodyBufferBytes(message.data) + val artemisMessage = session!!.createMessage(true).putStringProperty("platform-topic", message.topic).writeBodyBufferBytes(message.data) getSendClient(target).send(artemisMessage) } @@ -266,13 +274,13 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort, // entirely duplex. The Artemis team may add this functionality in future: // // https://issues.apache.org/jira/browse/ARTEMIS-355 - if (!session.queueQuery(SimpleString(name)).isExists) { - session.createQueue(name, name, true /* durable */) + if (!session!!.queueQuery(SimpleString(name)).isExists) { + session!!.createQueue(name, name, true /* durable */) } - if (!mq.activeMQServer.configuration.connectorConfigurations.containsKey(name)) { - mq.activeMQServer.configuration.addConnectorConfiguration(name, tcpTransport(ConnectionDirection.OUTBOUND, + if (!activeMQServer.configuration.connectorConfigurations.containsKey(name)) { + activeMQServer.configuration.addConnectorConfiguration(name, tcpTransport(ConnectionDirection.OUTBOUND, hostAndPort.hostText, hostAndPort.port)) - mq.activeMQServer.deployBridge(BridgeConfiguration().apply { + activeMQServer.deployBridge(BridgeConfiguration().apply { setName(name) queueName = name forwardingAddress = name diff --git a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingServiceTests.kt b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingServiceTests.kt index 25eb9d69f9..917b2ae9d1 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingServiceTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingServiceTests.kt @@ -1,15 +1,15 @@ package com.r3corda.node.services import com.r3corda.core.messaging.Message -import com.r3corda.core.messaging.MessageRecipients import com.r3corda.core.testing.freeLocalHostAndPort import com.r3corda.node.services.messaging.ArtemisMessagingService import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.After -import org.junit.Before import org.junit.Rule import org.junit.Test import org.junit.rules.TemporaryFolder +import java.net.ServerSocket import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.SECONDS @@ -18,36 +18,46 @@ class ArtemisMessagingServiceTests { @Rule @JvmField val temporaryFolder = TemporaryFolder() + val hostAndPort = freeLocalHostAndPort() val topic = "platform.self" - lateinit var messagingNetwork: ArtemisMessagingService - @Before - fun setUp() { - messagingNetwork = ArtemisMessagingService(temporaryFolder.newFolder().toPath(), freeLocalHostAndPort()) - messagingNetwork.start() - } + var messagingNetwork: ArtemisMessagingService? = null @After - fun tearDown() { - messagingNetwork.stop() + fun cleanUp() { + messagingNetwork?.stop() + } + + @Test + fun `starting with the port already bound`() { + ServerSocket(hostAndPort.port).use { + val messagingNetwork = createMessagingService() + assertThatThrownBy { messagingNetwork.start() } + } } @Test fun `sending message to self`() { val receivedMessages = LinkedBlockingQueue() + val messagingNetwork = createMessagingService() + messagingNetwork.start() + messagingNetwork.addMessageHandler(topic) { message, r -> receivedMessages.add(message) } - sendMessage("first msg", messagingNetwork.myAddress) + val message = messagingNetwork.createMessage(topic, "first msg".toByteArray()) + messagingNetwork.send(message, messagingNetwork.myAddress) assertThat(String(receivedMessages.poll(2, SECONDS).data)).isEqualTo("first msg") assertThat(receivedMessages.poll(200, MILLISECONDS)).isNull() } - private fun sendMessage(body: String, address: MessageRecipients) { - messagingNetwork.send(messagingNetwork.createMessage(topic, body.toByteArray()), address) + private fun createMessagingService(): ArtemisMessagingService { + return ArtemisMessagingService(temporaryFolder.newFolder().toPath(), hostAndPort).apply { + messagingNetwork = this + } } } \ No newline at end of file