diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt index b297295d36..65f22209f9 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt @@ -203,6 +203,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria protected inner class InnerState { val handlers: MutableList = ArrayList() + val pendingRedelivery = LinkedList() } protected val state = ThreadBox(InnerState()) @@ -222,9 +223,16 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria override fun addMessageHandler(topic: String, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { check(running) - return state.locked { - Handler(executor, topic, callback).apply { handlers.add(this) } + val (handler, items) = state.locked { + val handler = Handler(executor, topic, callback).apply { handlers.add(this) } + val items = ArrayList(pendingRedelivery) + pendingRedelivery.clear() + Pair(handler, items) } + for (it in items) { + send(it.message, handle) + } + return handler } override fun removeMessageHandler(registration: MessageHandlerRegistration) { @@ -282,7 +290,12 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria val h = handlers.filter { if (it.topic.isBlank()) true else transfer.message.topic == it.topic } if (h.isEmpty()) { - loggerFor().warn("No handlers for message ${transfer}") + // Got no handlers for this message yet. Keep the message around and attempt redelivery after a new + // handler has been registered. The purpose of this path is to make unit tests that have multi-threading + // reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting + // up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at + // least sometimes. + pendingRedelivery.add(transfer) return null }