From e29a9b15c3b340e41257528ca3884b2ef6a694d0 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Mon, 20 Jun 2016 17:50:19 +0100 Subject: [PATCH] Revert "node: remove buffering logic from InMemoryMessagingNetwork" This reverts commit bc7ea5f0c5a61e9c3f9d7825e2944e4d27141320. --- .../network/InMemoryMessagingNetwork.kt | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt index b297295d36..65f22209f9 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt @@ -203,6 +203,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria protected inner class InnerState { val handlers: MutableList = ArrayList() + val pendingRedelivery = LinkedList() } protected val state = ThreadBox(InnerState()) @@ -222,9 +223,16 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria override fun addMessageHandler(topic: String, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { check(running) - return state.locked { - Handler(executor, topic, callback).apply { handlers.add(this) } + val (handler, items) = state.locked { + val handler = Handler(executor, topic, callback).apply { handlers.add(this) } + val items = ArrayList(pendingRedelivery) + pendingRedelivery.clear() + Pair(handler, items) } + for (it in items) { + send(it.message, handle) + } + return handler } override fun removeMessageHandler(registration: MessageHandlerRegistration) { @@ -282,7 +290,12 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria val h = handlers.filter { if (it.topic.isBlank()) true else transfer.message.topic == it.topic } if (h.isEmpty()) { - loggerFor().warn("No handlers for message ${transfer}") + // Got no handlers for this message yet. Keep the message around and attempt redelivery after a new + // handler has been registered. The purpose of this path is to make unit tests that have multi-threading + // reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting + // up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at + // least sometimes. + pendingRedelivery.add(transfer) return null }