diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt index 09b0e54b5e..5ff369304f 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt @@ -283,24 +283,46 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria return pumpReceiveInternal(block) } + /** + * Get the next transfer, and matching queue, that is ready to handle. Any pending transfers without handlers + * are placed into `pendingRedelivery` to try again later. + * + * @param block if this should block until a message it can process. + */ + private fun getNextQueue(q: LinkedBlockingQueue, block: Boolean): Pair>? { + var deliverTo: List? = null + // Pop transfers off the queue until we run out (and are not blocking), or find something we can process + while (deliverTo == null) { + val transfer = (if (block) q.take() else q.poll()) ?: return null + deliverTo = state.locked { + val h = handlers.filter { if (it.topic.isBlank()) true else transfer.message.topic == it.topic } + + if (h.isEmpty()) { + // Got no handlers for this message yet. Keep the message around and attempt redelivery after a new + // handler has been registered. The purpose of this path is to make unit tests that have multi-threading + // reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting + // up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at + // least sometimes. + pendingRedelivery.add(transfer) + null + } else { + h + } + } + if (deliverTo != null) { + return Pair(transfer, deliverTo) + } + } + return null + } + private fun pumpReceiveInternal(block: Boolean): MessageTransfer? { val q = getQueueForHandle(handle) - val transfer = (if (block) q.take() else q.poll()) ?: return null - val deliverTo = state.locked { - val h = handlers.filter { if (it.topic.isBlank()) true else transfer.message.topic == it.topic } - - if (h.isEmpty()) { - // Got no handlers for this message yet. Keep the message around and attempt redelivery after a new - // handler has been registered. The purpose of this path is to make unit tests that have multi-threading - // reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting - // up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at - // least sometimes. - pendingRedelivery.add(transfer) - return null - } - - h + val next = getNextQueue(q, block) + if (next == null) { + return null } + val (transfer, deliverTo) = next for (handler in deliverTo) { // Now deliver via the requested executor, or on this thread if no executor was provided at registration time. diff --git a/node/src/test/kotlin/com/r3corda/node/messaging/InMemoryMessagingTests.kt b/node/src/test/kotlin/com/r3corda/node/messaging/InMemoryMessagingTests.kt index a8337f40c4..d939d93015 100644 --- a/node/src/test/kotlin/com/r3corda/node/messaging/InMemoryMessagingTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/messaging/InMemoryMessagingTests.kt @@ -82,4 +82,37 @@ class InMemoryMessagingTests { network.runNetwork(rounds = 1) assertEquals(3, counter) } + + /** + * Tests that unhandled messages in the received queue are skipped and the next message processed, rather than + * causing processing to return null as if there was no message. + */ + @Test + fun `skip unhandled messages`() { + val node1 = network.createNode() + val node2 = network.createNode() + var received: Int = 0 + + node1.net.addMessageHandler("valid_message") { msg, reg -> + received++ + } + + val invalidMessage = node2.net.createMessage("invalid_message", ByteArray(0)) + val validMessage = node2.net.createMessage("valid_message", ByteArray(0)) + node2.net.send(invalidMessage, node1.net.myAddress) + network.runNetwork() + assertEquals(0, received) + + node2.net.send(validMessage, node1.net.myAddress) + network.runNetwork() + assertEquals(1, received) + + // Here's the core of the test; previously the unhandled message would cause runNetwork() to abort early, so + // this would fail. + node2.net.send(invalidMessage, node1.net.myAddress) + node2.net.send(validMessage, node1.net.myAddress) + network.runNetwork() + assertEquals(2, received) + + } }