From 0575bcc9599d9559d1dba427ea262906517a9d5d Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Wed, 27 Apr 2016 10:45:02 +0100 Subject: [PATCH] Sending message to self no longer goes into continuous loop --- build.gradle | 1 + .../node/services/ArtemisMessagingService.kt | 26 +++++----- .../services/ArtemisMessagingServiceTests.kt | 52 +++++++++++++++++++ src/test/kotlin/core/testutils/TestUtils.kt | 9 +++- 4 files changed, 75 insertions(+), 13 deletions(-) create mode 100644 src/test/kotlin/core/node/services/ArtemisMessagingServiceTests.kt diff --git a/build.gradle b/build.gradle index 2a9de5dc78..c5d3e403ab 100644 --- a/build.gradle +++ b/build.gradle @@ -118,6 +118,7 @@ dependencies { // Unit testing helpers. testCompile 'junit:junit:4.12' + testCompile 'org.assertj:assertj-core:3.4.1' } // These lines tell Gradle to add a couple of JVM command line arguments to unit test and program runs, which set up diff --git a/src/main/kotlin/core/node/services/ArtemisMessagingService.kt b/src/main/kotlin/core/node/services/ArtemisMessagingService.kt index 914c024845..56b3a8b619 100644 --- a/src/main/kotlin/core/node/services/ArtemisMessagingService.kt +++ b/src/main/kotlin/core/node/services/ArtemisMessagingService.kt @@ -17,7 +17,8 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory -import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.HOST_PROP_NAME +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME import org.apache.activemq.artemis.core.security.Role import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager @@ -91,12 +92,13 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort, // TODO: This is not robust and needs to be replaced by more intelligently using the message queue server. private val undeliveredMessages = CopyOnWriteArrayList() - private fun getSendClient(addr: Address): ClientProducer { + private fun getSendClient(address: Address): ClientProducer { return mutex.locked { - sendClients.getOrPut(addr) { - maybeSetupConnection(addr.hostAndPort) - val qName = addr.hostAndPort.toString() - session.createProducer(qName) + sendClients.getOrPut(address) { + if (address != myAddress) { + maybeSetupConnection(address.hostAndPort) + } + session.createProducer(address.hostAndPort.toString()) } } } @@ -144,15 +146,15 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort, } val topic = message.getStringProperty(TOPIC_PROPERTY) - val bits = ByteArray(message.bodySize) - message.bodyBuffer.readBytes(bits) + val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) } val msg = object : Message { override val topic = topic - override val data: ByteArray = bits + override val data: ByteArray = body override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp) override val debugMessageID: String = message.messageID.toString() - override fun serialise(): ByteArray = bits + override fun serialise(): ByteArray = body + override fun toString() = topic + "#" + String(data) } deliverMessage(msg) @@ -302,8 +304,8 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort, ConnectionDirection.OUTBOUND -> NettyConnectorFactory::class.java.name }, mapOf( - TransportConstants.HOST_PROP_NAME to host, - TransportConstants.PORT_PROP_NAME to port.toInt() + HOST_PROP_NAME to host, + PORT_PROP_NAME to port.toInt() ) ) diff --git a/src/test/kotlin/core/node/services/ArtemisMessagingServiceTests.kt b/src/test/kotlin/core/node/services/ArtemisMessagingServiceTests.kt new file mode 100644 index 0000000000..369fc0d99a --- /dev/null +++ b/src/test/kotlin/core/node/services/ArtemisMessagingServiceTests.kt @@ -0,0 +1,52 @@ +package core.node.services + +import core.messaging.Message +import core.messaging.MessageRecipients +import core.testutils.freeLocalHostAndPort +import org.assertj.core.api.Assertions.assertThat +import org.junit.After +import org.junit.Before +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TemporaryFolder +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit.MILLISECONDS +import java.util.concurrent.TimeUnit.SECONDS + +class ArtemisMessagingServiceTests { + + @Rule @JvmField val temporaryFolder = TemporaryFolder() + + val topic = "platform.self" + lateinit var messagingNetwork: ArtemisMessagingService + + @Before + fun setUp() { + messagingNetwork = ArtemisMessagingService(temporaryFolder.newFolder().toPath(), freeLocalHostAndPort()) + messagingNetwork.start() + } + + @After + fun tearDown() { + messagingNetwork.stop() + } + + @Test + fun `sending message to self`() { + val receivedMessages = LinkedBlockingQueue() + + messagingNetwork.addMessageHandler(topic) { message, r -> + receivedMessages.add(message) + } + + sendMessage("first msg", messagingNetwork.myAddress) + + assertThat(String(receivedMessages.poll(2, SECONDS).data)).isEqualTo("first msg") + assertThat(receivedMessages.poll(200, MILLISECONDS)).isNull() + } + + private fun sendMessage(body: String, address: MessageRecipients) { + messagingNetwork.send(messagingNetwork.createMessage(topic, body.toByteArray()), address) + } + +} diff --git a/src/test/kotlin/core/testutils/TestUtils.kt b/src/test/kotlin/core/testutils/TestUtils.kt index 6d71d0e2af..fa5d705dc8 100644 --- a/src/test/kotlin/core/testutils/TestUtils.kt +++ b/src/test/kotlin/core/testutils/TestUtils.kt @@ -3,13 +3,15 @@ package core.testutils import com.google.common.base.Throwables +import com.google.common.net.HostAndPort import contracts.* import core.* import core.crypto.* import core.node.services.DummyTimestampingAuthority -import core.testing.MockIdentityService import core.serialization.serialize +import core.testing.MockIdentityService import core.visualiser.GraphVisualiser +import java.net.ServerSocket import java.security.KeyPair import java.security.PublicKey import java.time.Instant @@ -27,6 +29,11 @@ inline fun rootCauseExceptions(body: () -> R): R { } } +fun freeLocalHostAndPort(): HostAndPort { + val freePort = ServerSocket(0).use { it.localPort } + return HostAndPort.fromParts("localhost", freePort) +} + object TestUtils { val keypair = generateKeyPair() val keypair2 = generateKeyPair()