mirror of
https://github.com/corda/corda.git
synced 2025-02-01 08:48:09 +00:00
Upgraded to artemis 1.3.0 so that exceptions are thrown if something goes wrong on startup
This commit is contained in:
parent
8f57213270
commit
411fd1aeb3
@ -16,7 +16,7 @@ buildscript {
|
||||
ext.kotlin_version = '1.0.2'
|
||||
ext.quasar_version = '0.7.5'
|
||||
ext.asm_version = '0.5.3'
|
||||
ext.artemis_version = '1.2.0'
|
||||
ext.artemis_version = '1.3.0'
|
||||
ext.jetty_version = '9.1.1.v20140108'
|
||||
ext.jersey_version = '2.22.2'
|
||||
ext.jolokia_version = '2.0.0-M1'
|
||||
|
@ -22,12 +22,13 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactor
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.HOST_PROP_NAME
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME
|
||||
import org.apache.activemq.artemis.core.security.Role
|
||||
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
|
||||
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule
|
||||
import java.math.BigInteger
|
||||
import java.nio.file.FileSystems
|
||||
import java.nio.file.Path
|
||||
import java.security.SecureRandom
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
@ -53,8 +54,10 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
* @param defaultExecutor This will be used as the default executor to run message handlers on, if no other is specified.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
|
||||
class ArtemisMessagingService(val directory: Path,
|
||||
val myHostPort: HostAndPort,
|
||||
val defaultExecutor: Executor = RunOnCallerThread) : SingletonSerializeAsToken(), MessagingService {
|
||||
|
||||
// In future: can contain onion routing info, etc.
|
||||
private data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient
|
||||
|
||||
@ -73,10 +76,10 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
|
||||
fun toHostAndPort(hostname: String) = HostAndPort.fromString(hostname).withDefaultPort(Node.DEFAULT_PORT)
|
||||
}
|
||||
|
||||
private lateinit var mq: EmbeddedActiveMQ
|
||||
private lateinit var activeMQServer: ActiveMQServer
|
||||
private lateinit var clientFactory: ClientSessionFactory
|
||||
private lateinit var session: ClientSession
|
||||
private lateinit var inboundConsumer: ClientConsumer
|
||||
private var session: ClientSession? = null
|
||||
private var inboundConsumer: ClientConsumer? = null
|
||||
|
||||
private class InnerState {
|
||||
var running = false
|
||||
@ -86,7 +89,8 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
|
||||
/** A registration to handle messages of different types */
|
||||
inner class Handler(val executor: Executor?, val topic: String,
|
||||
inner class Handler(val executor: Executor?,
|
||||
val topic: String,
|
||||
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
|
||||
|
||||
private val handlers = CopyOnWriteArrayList<Handler>()
|
||||
@ -94,13 +98,17 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
|
||||
// TODO: This is not robust and needs to be replaced by more intelligently using the message queue server.
|
||||
private val undeliveredMessages = CopyOnWriteArrayList<Message>()
|
||||
|
||||
init {
|
||||
require(directory.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
|
||||
}
|
||||
|
||||
private fun getSendClient(address: Address): ClientProducer {
|
||||
return mutex.locked {
|
||||
sendClients.getOrPut(address) {
|
||||
if (address != myAddress) {
|
||||
maybeSetupConnection(address.hostAndPort)
|
||||
}
|
||||
session.createProducer(address.hostAndPort.toString())
|
||||
session!!.createProducer(address.hostAndPort.toString())
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -112,31 +120,31 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
|
||||
// reuse it, or if it makes sense for scaling to split the functionality out, or if it makes sense for security.
|
||||
//
|
||||
// But for now, we bundle it all up into one thing.
|
||||
mq = EmbeddedActiveMQ()
|
||||
val config = createArtemisConfig(directory, myHostPort)
|
||||
mq.setConfiguration(config)
|
||||
val secConfig = SecurityConfiguration()
|
||||
val password = BigInteger(128, newSecureRandom()).toString(16)
|
||||
secConfig.addUser("internal", password)
|
||||
secConfig.addRole("internal", "internal")
|
||||
secConfig.defaultUser = "internal"
|
||||
config.securityRoles = mapOf(
|
||||
"#" to setOf(Role("internal", true, true, true, true, true, true, true))
|
||||
)
|
||||
val secManager = ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, secConfig)
|
||||
mq.setSecurityManager(secManager)
|
||||
val config = createArtemisConfig(directory, myHostPort).apply {
|
||||
securityRoles = mapOf(
|
||||
"#" to setOf(Role("internal", true, true, true, true, true, true, true))
|
||||
)
|
||||
}
|
||||
|
||||
// TODO Currently we cannot find out if something goes wrong during startup :( This is bug ARTEMIS-388 filed by me.
|
||||
// The fix should be in the 1.3.0 release:
|
||||
// https://issues.apache.org/jira/browse/ARTEMIS-388
|
||||
mq.start()
|
||||
val securityConfig = SecurityConfiguration().apply {
|
||||
addUser("internal", BigInteger(128, newSecureRandom()).toString(16))
|
||||
addRole("internal", "internal")
|
||||
defaultUser = "internal"
|
||||
}
|
||||
val securityManager = ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, securityConfig)
|
||||
|
||||
activeMQServer = ActiveMQServerImpl(config, securityManager)
|
||||
// Throw any exceptions which are detected during startup
|
||||
activeMQServer.registerActivationFailureListener { exception -> throw exception }
|
||||
activeMQServer.start()
|
||||
|
||||
// Connect to our in-memory server.
|
||||
clientFactory = ActiveMQClient.createServerLocatorWithoutHA(
|
||||
TransportConfiguration(InVMConnectorFactory::class.java.name)).createSessionFactory()
|
||||
|
||||
// Create a queue on which to receive messages and set up the handler.
|
||||
session = clientFactory.createSession()
|
||||
val session = clientFactory.createSession()
|
||||
this.session = session
|
||||
|
||||
session.createQueue(myHostPort.toString(), "inbound", false)
|
||||
inboundConsumer = session.createConsumer("inbound").setMessageHandler { message: ClientMessage ->
|
||||
@ -206,9 +214,9 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
|
||||
for (producer in sendClients.values)
|
||||
producer.close()
|
||||
sendClients.clear()
|
||||
inboundConsumer.close()
|
||||
session.close()
|
||||
mq.stop()
|
||||
inboundConsumer?.close()
|
||||
session?.close()
|
||||
activeMQServer.stop()
|
||||
|
||||
// We expect to be garbage collected shortly after being stopped, so we don't null anything explicitly here.
|
||||
|
||||
@ -219,7 +227,7 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
|
||||
override fun send(message: Message, target: MessageRecipients) {
|
||||
if (target !is Address)
|
||||
TODO("Only simple sends to single recipients are currently implemented")
|
||||
val artemisMessage = session.createMessage(true).putStringProperty("platform-topic", message.topic).writeBodyBufferBytes(message.data)
|
||||
val artemisMessage = session!!.createMessage(true).putStringProperty("platform-topic", message.topic).writeBodyBufferBytes(message.data)
|
||||
getSendClient(target).send(artemisMessage)
|
||||
}
|
||||
|
||||
@ -266,13 +274,13 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
|
||||
// entirely duplex. The Artemis team may add this functionality in future:
|
||||
//
|
||||
// https://issues.apache.org/jira/browse/ARTEMIS-355
|
||||
if (!session.queueQuery(SimpleString(name)).isExists) {
|
||||
session.createQueue(name, name, true /* durable */)
|
||||
if (!session!!.queueQuery(SimpleString(name)).isExists) {
|
||||
session!!.createQueue(name, name, true /* durable */)
|
||||
}
|
||||
if (!mq.activeMQServer.configuration.connectorConfigurations.containsKey(name)) {
|
||||
mq.activeMQServer.configuration.addConnectorConfiguration(name, tcpTransport(ConnectionDirection.OUTBOUND,
|
||||
if (!activeMQServer.configuration.connectorConfigurations.containsKey(name)) {
|
||||
activeMQServer.configuration.addConnectorConfiguration(name, tcpTransport(ConnectionDirection.OUTBOUND,
|
||||
hostAndPort.hostText, hostAndPort.port))
|
||||
mq.activeMQServer.deployBridge(BridgeConfiguration().apply {
|
||||
activeMQServer.deployBridge(BridgeConfiguration().apply {
|
||||
setName(name)
|
||||
queueName = name
|
||||
forwardingAddress = name
|
||||
|
@ -1,15 +1,15 @@
|
||||
package com.r3corda.node.services
|
||||
|
||||
import com.r3corda.core.messaging.Message
|
||||
import com.r3corda.core.messaging.MessageRecipients
|
||||
import com.r3corda.core.testing.freeLocalHostAndPort
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingService
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import java.net.ServerSocket
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||
import java.util.concurrent.TimeUnit.SECONDS
|
||||
@ -18,36 +18,46 @@ class ArtemisMessagingServiceTests {
|
||||
|
||||
@Rule @JvmField val temporaryFolder = TemporaryFolder()
|
||||
|
||||
val hostAndPort = freeLocalHostAndPort()
|
||||
val topic = "platform.self"
|
||||
lateinit var messagingNetwork: ArtemisMessagingService
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
messagingNetwork = ArtemisMessagingService(temporaryFolder.newFolder().toPath(), freeLocalHostAndPort())
|
||||
messagingNetwork.start()
|
||||
}
|
||||
var messagingNetwork: ArtemisMessagingService? = null
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
messagingNetwork.stop()
|
||||
fun cleanUp() {
|
||||
messagingNetwork?.stop()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `starting with the port already bound`() {
|
||||
ServerSocket(hostAndPort.port).use {
|
||||
val messagingNetwork = createMessagingService()
|
||||
assertThatThrownBy { messagingNetwork.start() }
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `sending message to self`() {
|
||||
val receivedMessages = LinkedBlockingQueue<Message>()
|
||||
|
||||
val messagingNetwork = createMessagingService()
|
||||
messagingNetwork.start()
|
||||
|
||||
messagingNetwork.addMessageHandler(topic) { message, r ->
|
||||
receivedMessages.add(message)
|
||||
}
|
||||
|
||||
sendMessage("first msg", messagingNetwork.myAddress)
|
||||
val message = messagingNetwork.createMessage(topic, "first msg".toByteArray())
|
||||
messagingNetwork.send(message, messagingNetwork.myAddress)
|
||||
|
||||
assertThat(String(receivedMessages.poll(2, SECONDS).data)).isEqualTo("first msg")
|
||||
assertThat(receivedMessages.poll(200, MILLISECONDS)).isNull()
|
||||
}
|
||||
|
||||
private fun sendMessage(body: String, address: MessageRecipients) {
|
||||
messagingNetwork.send(messagingNetwork.createMessage(topic, body.toByteArray()), address)
|
||||
private fun createMessagingService(): ArtemisMessagingService {
|
||||
return ArtemisMessagingService(temporaryFolder.newFolder().toPath(), hostAndPort).apply {
|
||||
messagingNetwork = this
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user