Merged in rnicoll-in-memory-networking (pull request #232)

Correct processing of unhandled messages
This commit is contained in:
Ross Nicoll 2016-07-19 14:20:29 +01:00
commit ab7ddaa264
2 changed files with 70 additions and 15 deletions

View File

@ -283,24 +283,46 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
return pumpReceiveInternal(block)
}
/**
* Get the next transfer, and matching queue, that is ready to handle. Any pending transfers without handlers
* are placed into `pendingRedelivery` to try again later.
*
* @param block if this should block until a message it can process.
*/
private fun getNextQueue(q: LinkedBlockingQueue<MessageTransfer>, block: Boolean): Pair<MessageTransfer, List<Handler>>? {
var deliverTo: List<Handler>? = null
// Pop transfers off the queue until we run out (and are not blocking), or find something we can process
while (deliverTo == null) {
val transfer = (if (block) q.take() else q.poll()) ?: return null
deliverTo = state.locked {
val h = handlers.filter { if (it.topic.isBlank()) true else transfer.message.topic == it.topic }
if (h.isEmpty()) {
// Got no handlers for this message yet. Keep the message around and attempt redelivery after a new
// handler has been registered. The purpose of this path is to make unit tests that have multi-threading
// reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting
// up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at
// least sometimes.
pendingRedelivery.add(transfer)
null
} else {
h
}
}
if (deliverTo != null) {
return Pair(transfer, deliverTo)
}
}
return null
}
private fun pumpReceiveInternal(block: Boolean): MessageTransfer? {
val q = getQueueForHandle(handle)
val transfer = (if (block) q.take() else q.poll()) ?: return null
val deliverTo = state.locked {
val h = handlers.filter { if (it.topic.isBlank()) true else transfer.message.topic == it.topic }
if (h.isEmpty()) {
// Got no handlers for this message yet. Keep the message around and attempt redelivery after a new
// handler has been registered. The purpose of this path is to make unit tests that have multi-threading
// reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting
// up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at
// least sometimes.
pendingRedelivery.add(transfer)
return null
}
h
val next = getNextQueue(q, block)
if (next == null) {
return null
}
val (transfer, deliverTo) = next
for (handler in deliverTo) {
// Now deliver via the requested executor, or on this thread if no executor was provided at registration time.

View File

@ -82,4 +82,37 @@ class InMemoryMessagingTests {
network.runNetwork(rounds = 1)
assertEquals(3, counter)
}
/**
* Tests that unhandled messages in the received queue are skipped and the next message processed, rather than
* causing processing to return null as if there was no message.
*/
@Test
fun `skip unhandled messages`() {
val node1 = network.createNode()
val node2 = network.createNode()
var received: Int = 0
node1.net.addMessageHandler("valid_message") { msg, reg ->
received++
}
val invalidMessage = node2.net.createMessage("invalid_message", ByteArray(0))
val validMessage = node2.net.createMessage("valid_message", ByteArray(0))
node2.net.send(invalidMessage, node1.net.myAddress)
network.runNetwork()
assertEquals(0, received)
node2.net.send(validMessage, node1.net.myAddress)
network.runNetwork()
assertEquals(1, received)
// Here's the core of the test; previously the unhandled message would cause runNetwork() to abort early, so
// this would fail.
node2.net.send(invalidMessage, node1.net.myAddress)
node2.net.send(validMessage, node1.net.myAddress)
network.runNetwork()
assertEquals(2, received)
}
}