diff --git a/node/src/main/kotlin/com/r3corda/node/internal/testing/Simulation.kt b/node/src/main/kotlin/com/r3corda/node/internal/testing/Simulation.kt index ee94d6eaf7..a91a1e159e 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/testing/Simulation.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/testing/Simulation.kt @@ -214,7 +214,8 @@ abstract class Simulation(val runAsync: Boolean, } } - val networkInitialisationFinished: ListenableFuture = Futures.transform(Futures.allAsList(network.nodes.map { it.networkMapRegistrationFuture }), Function { }) + val networkInitialisationFinished: ListenableFuture = + Futures.transform(Futures.allAsList(network.nodes.map { it.networkMapRegistrationFuture }), Function { }) fun start(): ListenableFuture { network.startNodes() diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt index fa5e61e6f0..2a968c542b 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt @@ -104,7 +104,6 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() { } else { msgSendInternal(transfer) } - _allMessages.onNext(MessageTransfer(from, message, recipients)) } private fun msgSendInternal(transfer: MessageTransfer) { @@ -256,7 +255,7 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() { private fun pumpInternal(block: Boolean): MessageTransfer? { val q = getQueueForHandle(handle) val transfer = (if (block) q.take() else q.poll()) ?: return null - + System.err.println("T " + transfer.hashCode()) val deliverTo = state.locked { val h = handlers.filter { if (it.topic.isBlank()) true else transfer.message.topic == it.topic } @@ -277,6 +276,7 @@ class InMemoryMessagingNetwork() : SingletonSerializeAsToken() { // Now deliver via the requested executor, or on this thread if no executor was provided at registration time. (handler.executor ?: MoreExecutors.directExecutor()).execute { try { + _allMessages.onNext(transfer) handler.callback(transfer.message, handler) } catch(e: Exception) { loggerFor().error("Caught exception in handler for $this/${handler.topic}", e)