mirror of
https://github.com/corda/corda.git
synced 2025-01-01 18:56:44 +00:00
Correct processing of unhandled messages
Unhandled messages in the in memory messaging network can disrupt runNetwork(), as they result in getNextQueue() returning null, irrespective of whether there is further work which could be done. This modifies the flow to loop through the remaining transfers on the queue before giving up, rather than stopping after the first.
This commit is contained in:
parent
c442cd01a7
commit
c92d51a0b6
@ -283,24 +283,46 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
|||||||
return pumpReceiveInternal(block)
|
return pumpReceiveInternal(block)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the next transfer, and matching queue, that is ready to handle. Any pending transfers without handlers
|
||||||
|
* are placed into `pendingRedelivery` to try again later.
|
||||||
|
*
|
||||||
|
* @param block if this should block until a message it can process.
|
||||||
|
*/
|
||||||
|
private fun getNextQueue(q: LinkedBlockingQueue<MessageTransfer>, block: Boolean): Pair<MessageTransfer, List<Handler>>? {
|
||||||
|
var deliverTo: List<Handler>? = null
|
||||||
|
// Pop transfers off the queue until we run out (and are not blocking), or find something we can process
|
||||||
|
while (deliverTo == null) {
|
||||||
|
val transfer = (if (block) q.take() else q.poll()) ?: return null
|
||||||
|
deliverTo = state.locked {
|
||||||
|
val h = handlers.filter { if (it.topic.isBlank()) true else transfer.message.topic == it.topic }
|
||||||
|
|
||||||
|
if (h.isEmpty()) {
|
||||||
|
// Got no handlers for this message yet. Keep the message around and attempt redelivery after a new
|
||||||
|
// handler has been registered. The purpose of this path is to make unit tests that have multi-threading
|
||||||
|
// reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting
|
||||||
|
// up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at
|
||||||
|
// least sometimes.
|
||||||
|
pendingRedelivery.add(transfer)
|
||||||
|
null
|
||||||
|
} else {
|
||||||
|
h
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (deliverTo != null) {
|
||||||
|
return Pair(transfer, deliverTo)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
|
||||||
private fun pumpReceiveInternal(block: Boolean): MessageTransfer? {
|
private fun pumpReceiveInternal(block: Boolean): MessageTransfer? {
|
||||||
val q = getQueueForHandle(handle)
|
val q = getQueueForHandle(handle)
|
||||||
val transfer = (if (block) q.take() else q.poll()) ?: return null
|
val next = getNextQueue(q, block)
|
||||||
val deliverTo = state.locked {
|
if (next == null) {
|
||||||
val h = handlers.filter { if (it.topic.isBlank()) true else transfer.message.topic == it.topic }
|
return null
|
||||||
|
|
||||||
if (h.isEmpty()) {
|
|
||||||
// Got no handlers for this message yet. Keep the message around and attempt redelivery after a new
|
|
||||||
// handler has been registered. The purpose of this path is to make unit tests that have multi-threading
|
|
||||||
// reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting
|
|
||||||
// up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at
|
|
||||||
// least sometimes.
|
|
||||||
pendingRedelivery.add(transfer)
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
|
|
||||||
h
|
|
||||||
}
|
}
|
||||||
|
val (transfer, deliverTo) = next
|
||||||
|
|
||||||
for (handler in deliverTo) {
|
for (handler in deliverTo) {
|
||||||
// Now deliver via the requested executor, or on this thread if no executor was provided at registration time.
|
// Now deliver via the requested executor, or on this thread if no executor was provided at registration time.
|
||||||
|
@ -82,4 +82,37 @@ class InMemoryMessagingTests {
|
|||||||
network.runNetwork(rounds = 1)
|
network.runNetwork(rounds = 1)
|
||||||
assertEquals(3, counter)
|
assertEquals(3, counter)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that unhandled messages in the received queue are skipped and the next message processed, rather than
|
||||||
|
* causing processing to return null as if there was no message.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
fun `skip unhandled messages`() {
|
||||||
|
val node1 = network.createNode()
|
||||||
|
val node2 = network.createNode()
|
||||||
|
var received: Int = 0
|
||||||
|
|
||||||
|
node1.net.addMessageHandler("valid_message") { msg, reg ->
|
||||||
|
received++
|
||||||
|
}
|
||||||
|
|
||||||
|
val invalidMessage = node2.net.createMessage("invalid_message", ByteArray(0))
|
||||||
|
val validMessage = node2.net.createMessage("valid_message", ByteArray(0))
|
||||||
|
node2.net.send(invalidMessage, node1.net.myAddress)
|
||||||
|
network.runNetwork()
|
||||||
|
assertEquals(0, received)
|
||||||
|
|
||||||
|
node2.net.send(validMessage, node1.net.myAddress)
|
||||||
|
network.runNetwork()
|
||||||
|
assertEquals(1, received)
|
||||||
|
|
||||||
|
// Here's the core of the test; previously the unhandled message would cause runNetwork() to abort early, so
|
||||||
|
// this would fail.
|
||||||
|
node2.net.send(invalidMessage, node1.net.myAddress)
|
||||||
|
node2.net.send(validMessage, node1.net.myAddress)
|
||||||
|
network.runNetwork()
|
||||||
|
assertEquals(2, received)
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user