Sending message to self no longer goes into continuous loop

This commit is contained in:
Shams Asari 2016-04-27 10:45:02 +01:00
parent 47401a2a28
commit 0575bcc959
4 changed files with 75 additions and 13 deletions

View File

@ -118,6 +118,7 @@ dependencies {
// Unit testing helpers.
testCompile 'junit:junit:4.12'
testCompile 'org.assertj:assertj-core:3.4.1'
}
// These lines tell Gradle to add a couple of JVM command line arguments to unit test and program runs, which set up

View File

@ -17,7 +17,8 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.HOST_PROP_NAME
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME
import org.apache.activemq.artemis.core.security.Role
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
@ -91,12 +92,13 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
// TODO: This is not robust and needs to be replaced by more intelligently using the message queue server.
private val undeliveredMessages = CopyOnWriteArrayList<Message>()
private fun getSendClient(addr: Address): ClientProducer {
private fun getSendClient(address: Address): ClientProducer {
return mutex.locked {
sendClients.getOrPut(addr) {
maybeSetupConnection(addr.hostAndPort)
val qName = addr.hostAndPort.toString()
session.createProducer(qName)
sendClients.getOrPut(address) {
if (address != myAddress) {
maybeSetupConnection(address.hostAndPort)
}
session.createProducer(address.hostAndPort.toString())
}
}
}
@ -144,15 +146,15 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
}
val topic = message.getStringProperty(TOPIC_PROPERTY)
val bits = ByteArray(message.bodySize)
message.bodyBuffer.readBytes(bits)
val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
val msg = object : Message {
override val topic = topic
override val data: ByteArray = bits
override val data: ByteArray = body
override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp)
override val debugMessageID: String = message.messageID.toString()
override fun serialise(): ByteArray = bits
override fun serialise(): ByteArray = body
override fun toString() = topic + "#" + String(data)
}
deliverMessage(msg)
@ -302,8 +304,8 @@ class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort,
ConnectionDirection.OUTBOUND -> NettyConnectorFactory::class.java.name
},
mapOf(
TransportConstants.HOST_PROP_NAME to host,
TransportConstants.PORT_PROP_NAME to port.toInt()
HOST_PROP_NAME to host,
PORT_PROP_NAME to port.toInt()
)
)

View File

@ -0,0 +1,52 @@
package core.node.services
import core.messaging.Message
import core.messaging.MessageRecipients
import core.testutils.freeLocalHostAndPort
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.SECONDS
class ArtemisMessagingServiceTests {
@Rule @JvmField val temporaryFolder = TemporaryFolder()
val topic = "platform.self"
lateinit var messagingNetwork: ArtemisMessagingService
@Before
fun setUp() {
messagingNetwork = ArtemisMessagingService(temporaryFolder.newFolder().toPath(), freeLocalHostAndPort())
messagingNetwork.start()
}
@After
fun tearDown() {
messagingNetwork.stop()
}
@Test
fun `sending message to self`() {
val receivedMessages = LinkedBlockingQueue<Message>()
messagingNetwork.addMessageHandler(topic) { message, r ->
receivedMessages.add(message)
}
sendMessage("first msg", messagingNetwork.myAddress)
assertThat(String(receivedMessages.poll(2, SECONDS).data)).isEqualTo("first msg")
assertThat(receivedMessages.poll(200, MILLISECONDS)).isNull()
}
private fun sendMessage(body: String, address: MessageRecipients) {
messagingNetwork.send(messagingNetwork.createMessage(topic, body.toByteArray()), address)
}
}

View File

@ -3,13 +3,15 @@
package core.testutils
import com.google.common.base.Throwables
import com.google.common.net.HostAndPort
import contracts.*
import core.*
import core.crypto.*
import core.node.services.DummyTimestampingAuthority
import core.testing.MockIdentityService
import core.serialization.serialize
import core.testing.MockIdentityService
import core.visualiser.GraphVisualiser
import java.net.ServerSocket
import java.security.KeyPair
import java.security.PublicKey
import java.time.Instant
@ -27,6 +29,11 @@ inline fun <R> rootCauseExceptions(body: () -> R): R {
}
}
fun freeLocalHostAndPort(): HostAndPort {
val freePort = ServerSocket(0).use { it.localPort }
return HostAndPort.fromParts("localhost", freePort)
}
object TestUtils {
val keypair = generateKeyPair()
val keypair2 = generateKeyPair()