From 3bf95429e948f9db07b9d57a3264f20eed881147 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Wed, 8 Jun 2016 17:28:41 +0100 Subject: [PATCH] node: change InMemoryMessagingNetwork so that the exposed allMessages stream is in sync with iterate() --- .../kotlin/com/r3corda/node/internal/testing/Simulation.kt | 3 ++- .../r3corda/node/services/network/InMemoryMessagingNetwork.kt | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/internal/testing/Simulation.kt b/node/src/main/kotlin/com/r3corda/node/internal/testing/Simulation.kt index ee94d6eaf7..a91a1e159e 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/testing/Simulation.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/testing/Simulation.kt @@ -214,7 +214,8 @@ abstract class Simulation(val runAsync: Boolean, } } - val networkInitialisationFinished: ListenableFuture = Futures.transform(Futures.allAsList(network.nodes.map { it.networkMapRegistrationFuture }), Function { }) + val networkInitialisationFinished: ListenableFuture = + Futures.transform(Futures.allAsList(network.nodes.map { it.networkMapRegistrationFuture }), Function { }) fun start(): ListenableFuture { network.startNodes() diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt index fa5e61e6f0..2a968c542b 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt @@ -104,7 +104,6 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() { } else { msgSendInternal(transfer) } - _allMessages.onNext(MessageTransfer(from, message, recipients)) } private fun msgSendInternal(transfer: MessageTransfer) { @@ -256,7 +255,7 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() { private fun pumpInternal(block: Boolean): MessageTransfer? { val q = getQueueForHandle(handle) val transfer = (if (block) q.take() else q.poll()) ?: return null - + System.err.println("T " + transfer.hashCode()) val deliverTo = state.locked { val h = handlers.filter { if (it.topic.isBlank()) true else transfer.message.topic == it.topic } @@ -277,6 +276,7 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() { // Now deliver via the requested executor, or on this thread if no executor was provided at registration time. (handler.executor ?: MoreExecutors.directExecutor()).execute { try { + _allMessages.onNext(transfer) handler.callback(transfer.message, handler) } catch(e: Exception) { loggerFor().error("Caught exception in handler for $this/${handler.topic}", e)