Revert "node: remove buffering logic from InMemoryMessagingNetwork"

This reverts commit bc7ea5f0c5a61e9c3f9d7825e2944e4d27141320.
This commit is contained in:
Andras Slemmer 2016-06-20 17:50:19 +01:00
parent 39d60bc74b
commit e29a9b15c3

View File

@ -203,6 +203,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
protected inner class InnerState { protected inner class InnerState {
val handlers: MutableList<Handler> = ArrayList() val handlers: MutableList<Handler> = ArrayList()
val pendingRedelivery = LinkedList<MessageTransfer>()
} }
protected val state = ThreadBox(InnerState()) protected val state = ThreadBox(InnerState())
@ -222,9 +223,16 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
override fun addMessageHandler(topic: String, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { override fun addMessageHandler(topic: String, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
check(running) check(running)
return state.locked { val (handler, items) = state.locked {
Handler(executor, topic, callback).apply { handlers.add(this) } val handler = Handler(executor, topic, callback).apply { handlers.add(this) }
val items = ArrayList(pendingRedelivery)
pendingRedelivery.clear()
Pair(handler, items)
} }
for (it in items) {
send(it.message, handle)
}
return handler
} }
override fun removeMessageHandler(registration: MessageHandlerRegistration) { override fun removeMessageHandler(registration: MessageHandlerRegistration) {
@ -282,7 +290,12 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
val h = handlers.filter { if (it.topic.isBlank()) true else transfer.message.topic == it.topic } val h = handlers.filter { if (it.topic.isBlank()) true else transfer.message.topic == it.topic }
if (h.isEmpty()) { if (h.isEmpty()) {
loggerFor<InMemoryMessagingNetwork>().warn("No handlers for message ${transfer}") // Got no handlers for this message yet. Keep the message around and attempt redelivery after a new
// handler has been registered. The purpose of this path is to make unit tests that have multi-threading
// reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting
// up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at
// least sometimes.
pendingRedelivery.add(transfer)
return null return null
} }