From 3adfd02e3150ae2636b4f4d10aeb42ad1c178afb Mon Sep 17 00:00:00 2001 From: Mike Hearn Date: Wed, 23 Mar 2016 16:48:16 +0000 Subject: [PATCH] Support injection of latency into the mock (simulated) network. --- .../core/testing/InMemoryMessagingNetwork.kt | 37 +++++++++++++++++-- src/main/kotlin/core/testing/MockNode.kt | 3 ++ 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/core/testing/InMemoryMessagingNetwork.kt b/src/main/kotlin/core/testing/InMemoryMessagingNetwork.kt index 0e8082bd6d..1320fc7a39 100644 --- a/src/main/kotlin/core/testing/InMemoryMessagingNetwork.kt +++ b/src/main/kotlin/core/testing/InMemoryMessagingNetwork.kt @@ -11,18 +11,22 @@ package core.testing import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.MoreExecutors -import core.node.services.DummyTimestampingAuthority import core.ThreadBox import core.crypto.sha256 import core.messaging.* +import core.node.services.DummyTimestampingAuthority import core.node.services.LegallyIdentifiableNode import core.node.services.NodeTimestamperService import core.utilities.loggerFor +import rx.Observable +import rx.subjects.PublishSubject +import java.time.Duration import java.time.Instant import java.util.* import java.util.concurrent.Executor import java.util.concurrent.LinkedBlockingQueue import javax.annotation.concurrent.ThreadSafe +import kotlin.concurrent.schedule import kotlin.concurrent.thread /** @@ -67,8 +71,33 @@ class InMemoryMessagingNetwork { return Builder(manuallyPumped, Handle(id)) } + private val _allMessages = PublishSubject.create>() + /** A stream of (sender, message, recipients) triples */ + val allMessages: Observable> = _allMessages + + interface LatencyCalculator { + fun between(sender: SingleMessageRecipient, receiver: SingleMessageRecipient): Duration + } + + /** This can be set to an object which can inject artificial latency between sender/recipient pairs. */ + @Volatile var latencyCalculator: LatencyCalculator? = null + private val timer = Timer() + @Synchronized - private fun msgSend(message: Message, recipients: MessageRecipients) { + private fun msgSend(from: InMemoryMessaging, message: Message, recipients: MessageRecipients) { + val calc = latencyCalculator + if (calc != null && recipients is SingleMessageRecipient) { + // Inject some artificial latency. + timer.schedule(calc.between(from.myAddress, recipients).toMillis()) { + msgSendInternal(from, message, recipients) + } + } else { + msgSendInternal(from, message, recipients) + } + _allMessages.onNext(Triple(from.myAddress, message, recipients)) + } + + private fun msgSendInternal(from: InMemoryMessaging, message: Message, recipients: MessageRecipients) { when (recipients) { is Handle -> getQueueForHandle(recipients).add(message) @@ -174,7 +203,7 @@ class InMemoryMessagingNetwork { Pair(handler, items) } for (it in items) - msgSend(it, handle) + msgSend(this, it, handle) return handler } @@ -185,7 +214,7 @@ class InMemoryMessagingNetwork { override fun send(message: Message, target: MessageRecipients) { check(running) - msgSend(message, target) + msgSend(this, message, target) } override fun stop() { diff --git a/src/main/kotlin/core/testing/MockNode.kt b/src/main/kotlin/core/testing/MockNode.kt index e0effdaeb8..a5cb0319c0 100644 --- a/src/main/kotlin/core/testing/MockNode.kt +++ b/src/main/kotlin/core/testing/MockNode.kt @@ -12,6 +12,7 @@ import com.google.common.jimfs.Jimfs import com.google.common.util.concurrent.MoreExecutors import core.Party import core.messaging.MessagingService +import core.messaging.SingleMessageRecipient import core.node.AbstractNode import core.node.NodeConfiguration import core.node.services.FixedIdentityService @@ -133,4 +134,6 @@ class MockNetwork(private val threadPerNode: Boolean = false, require(nodes.isEmpty()) return Pair(createNode(null, -1, nodeFactory), createNode(nodes[0].legallyIdentifableAddress, -1, nodeFactory)) } + + fun addressToNode(address: SingleMessageRecipient): MockNode = nodes.single { it.net.myAddress == address } } \ No newline at end of file