Support injection of latency into the mock (simulated) network.

This commit is contained in:
Mike Hearn 2016-03-23 16:48:16 +00:00
parent d1e62c27c4
commit 3adfd02e31
2 changed files with 36 additions and 4 deletions

View File

@ -11,18 +11,22 @@ package core.testing
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import core.node.services.DummyTimestampingAuthority
import core.ThreadBox
import core.crypto.sha256
import core.messaging.*
import core.node.services.DummyTimestampingAuthority
import core.node.services.LegallyIdentifiableNode
import core.node.services.NodeTimestamperService
import core.utilities.loggerFor
import rx.Observable
import rx.subjects.PublishSubject
import java.time.Duration
import java.time.Instant
import java.util.*
import java.util.concurrent.Executor
import java.util.concurrent.LinkedBlockingQueue
import javax.annotation.concurrent.ThreadSafe
import kotlin.concurrent.schedule
import kotlin.concurrent.thread
/**
@ -67,8 +71,33 @@ class InMemoryMessagingNetwork {
return Builder(manuallyPumped, Handle(id))
}
private val _allMessages = PublishSubject.create<Triple<SingleMessageRecipient, Message, MessageRecipients>>()
/** A stream of (sender, message, recipients) triples */
val allMessages: Observable<Triple<SingleMessageRecipient, Message, MessageRecipients>> = _allMessages
interface LatencyCalculator {
fun between(sender: SingleMessageRecipient, receiver: SingleMessageRecipient): Duration
}
/** This can be set to an object which can inject artificial latency between sender/recipient pairs. */
@Volatile var latencyCalculator: LatencyCalculator? = null
private val timer = Timer()
@Synchronized
private fun msgSend(message: Message, recipients: MessageRecipients) {
private fun msgSend(from: InMemoryMessaging, message: Message, recipients: MessageRecipients) {
val calc = latencyCalculator
if (calc != null && recipients is SingleMessageRecipient) {
// Inject some artificial latency.
timer.schedule(calc.between(from.myAddress, recipients).toMillis()) {
msgSendInternal(from, message, recipients)
}
} else {
msgSendInternal(from, message, recipients)
}
_allMessages.onNext(Triple(from.myAddress, message, recipients))
}
private fun msgSendInternal(from: InMemoryMessaging, message: Message, recipients: MessageRecipients) {
when (recipients) {
is Handle -> getQueueForHandle(recipients).add(message)
@ -174,7 +203,7 @@ class InMemoryMessagingNetwork {
Pair(handler, items)
}
for (it in items)
msgSend(it, handle)
msgSend(this, it, handle)
return handler
}
@ -185,7 +214,7 @@ class InMemoryMessagingNetwork {
override fun send(message: Message, target: MessageRecipients) {
check(running)
msgSend(message, target)
msgSend(this, message, target)
}
override fun stop() {

View File

@ -12,6 +12,7 @@ import com.google.common.jimfs.Jimfs
import com.google.common.util.concurrent.MoreExecutors
import core.Party
import core.messaging.MessagingService
import core.messaging.SingleMessageRecipient
import core.node.AbstractNode
import core.node.NodeConfiguration
import core.node.services.FixedIdentityService
@ -133,4 +134,6 @@ class MockNetwork(private val threadPerNode: Boolean = false,
require(nodes.isEmpty())
return Pair(createNode(null, -1, nodeFactory), createNode(nodes[0].legallyIdentifableAddress, -1, nodeFactory))
}
fun addressToNode(address: SingleMessageRecipient): MockNode = nodes.single { it.net.myAddress == address }
}