node: change InMemoryMessagingNetwork so that the exposed allMessages stream is in sync with iterate()

This commit is contained in:
Andras Slemmer 2016-06-08 17:28:41 +01:00
parent 53bd5c2287
commit 3bf95429e9
2 changed files with 4 additions and 3 deletions

View File

@ -214,7 +214,8 @@ abstract class Simulation(val runAsync: Boolean,
}
}
val networkInitialisationFinished: ListenableFuture<Unit> = Futures.transform(Futures.allAsList(network.nodes.map { it.networkMapRegistrationFuture }), Function { })
val networkInitialisationFinished: ListenableFuture<Unit> =
Futures.transform(Futures.allAsList(network.nodes.map { it.networkMapRegistrationFuture }), Function { })
fun start(): ListenableFuture<Unit> {
network.startNodes()

View File

@ -104,7 +104,6 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
} else {
msgSendInternal(transfer)
}
_allMessages.onNext(MessageTransfer(from, message, recipients))
}
private fun msgSendInternal(transfer: MessageTransfer) {
@ -256,7 +255,7 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
private fun pumpInternal(block: Boolean): MessageTransfer? {
val q = getQueueForHandle(handle)
val transfer = (if (block) q.take() else q.poll()) ?: return null
System.err.println("T " + transfer.hashCode())
val deliverTo = state.locked {
val h = handlers.filter { if (it.topic.isBlank()) true else transfer.message.topic == it.topic }
@ -277,6 +276,7 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() {
// Now deliver via the requested executor, or on this thread if no executor was provided at registration time.
(handler.executor ?: MoreExecutors.directExecutor()).execute {
try {
_allMessages.onNext(transfer)
handler.callback(transfer.message, handler)
} catch(e: Exception) {
loggerFor<InMemoryMessagingNetwork>().error("Caught exception in handler for $this/${handler.topic}", e)