Testing: expose the full message transfer record (with sender and recipients) when iterating a mock network and simulation.

This is useful for the visualiser to exclude uninteresting interactions.
This commit is contained in:
Mike Hearn 2016-05-11 19:02:41 +02:00 committed by Mike Hearn
parent 5de2ba4ef9
commit 8bcc6bdf1c
5 changed files with 41 additions and 27 deletions

View File

@ -126,9 +126,9 @@ class IRSSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork
}
}
override fun iterate() {
override fun iterate(): InMemoryMessagingNetwork.MessageTransfer? {
if (executeOnNextIteration.isNotEmpty())
executeOnNextIteration.removeAt(0)()
super.iterate()
return super.iterate()
}
}

View File

@ -36,11 +36,16 @@ class InMemoryMessagingNetwork {
private var counter = 0 // -1 means stopped.
private val handleEndpointMap = HashMap<Handle, InMemoryMessaging>()
data class MessageTransfer(val sender: InMemoryMessaging, val message: Message, val recipients: MessageRecipients) {
override fun toString() = "${message.topic} from '${sender.myAddress}' to '$recipients'"
}
// All messages are kept here until the messages are pumped off the queue by a caller to the node class.
// Queues are created on-demand when a message is sent to an address: the receiving node doesn't have to have
// been created yet. If the node identified by the given handle has gone away/been shut down then messages
// stack up here waiting for it to come back. The intent of this is to simulate a reliable messaging network.
private val messageQueues = HashMap<Handle, LinkedBlockingQueue<Message>>()
private val messageQueues = HashMap<Handle, LinkedBlockingQueue<MessageTransfer>>()
val endpoints: List<InMemoryMessaging> @Synchronized get() = handleEndpointMap.values.toList()
@ -73,9 +78,9 @@ class InMemoryMessagingNetwork {
return Builder(manuallyPumped, Handle(id, description ?: "In memory node $id"))
}
private val _allMessages = PublishSubject.create<Triple<SingleMessageRecipient, Message, MessageRecipients>>()
private val _allMessages = PublishSubject.create<MessageTransfer>()
/** A stream of (sender, message, recipients) triples */
val allMessages: Observable<Triple<SingleMessageRecipient, Message, MessageRecipients>> = _allMessages
val allMessages: Observable<MessageTransfer> = _allMessages
interface LatencyCalculator {
fun between(sender: SingleMessageRecipient, receiver: SingleMessageRecipient): Duration
@ -87,28 +92,29 @@ class InMemoryMessagingNetwork {
@Synchronized
private fun msgSend(from: InMemoryMessaging, message: Message, recipients: MessageRecipients) {
log.trace { "${message.topic} from '${from.myAddress}' to '$recipients'" }
val transfer = MessageTransfer(from, message, recipients)
log.trace { transfer.toString() }
val calc = latencyCalculator
if (calc != null && recipients is SingleMessageRecipient) {
// Inject some artificial latency.
timer.schedule(calc.between(from.myAddress, recipients).toMillis()) {
msgSendInternal(message, recipients)
msgSendInternal(transfer)
}
} else {
msgSendInternal(message, recipients)
msgSendInternal(transfer)
}
_allMessages.onNext(Triple(from.myAddress, message, recipients))
_allMessages.onNext(MessageTransfer(from, message, recipients))
}
private fun msgSendInternal(message: Message, recipients: MessageRecipients) {
when (recipients) {
is Handle -> getQueueForHandle(recipients).add(message)
private fun msgSendInternal(transfer: MessageTransfer) {
when (transfer.recipients) {
is Handle -> getQueueForHandle(transfer.recipients).add(transfer)
is AllPossibleRecipients -> {
// This means all possible recipients _that the network knows about at the time_, not literally everyone
// who joins into the indefinite future.
for (handle in handleEndpointMap.keys)
getQueueForHandle(handle).add(message)
getQueueForHandle(handle).add(transfer)
}
else -> throw IllegalArgumentException("Unknown type of recipient handle")
}
@ -170,7 +176,7 @@ class InMemoryMessagingNetwork {
protected inner class InnerState {
val handlers: MutableList<Handler> = ArrayList()
val pendingRedelivery = LinkedList<Message>()
val pendingRedelivery = LinkedList<MessageTransfer>()
}
protected val state = ThreadBox(InnerState())
@ -197,7 +203,7 @@ class InMemoryMessagingNetwork {
Pair(handler, items)
}
for (it in items)
msgSend(this, it, handle)
msgSend(this, it.message, handle)
return handler
}
@ -237,19 +243,21 @@ class InMemoryMessagingNetwork {
* Delivers a single message from the internal queue. If there are no messages waiting to be delivered and block
* is true, waits until one has been provided on a different thread via send. If block is false, the return
* result indicates whether a message was delivered or not.
*
* @return the message that was processed, if any in this round.
*/
fun pump(block: Boolean): Boolean {
fun pump(block: Boolean): MessageTransfer? {
check(manuallyPumped)
check(running)
return pumpInternal(block)
}
private fun pumpInternal(block: Boolean): Boolean {
private fun pumpInternal(block: Boolean): MessageTransfer? {
val q = getQueueForHandle(handle)
val message = (if (block) q.take() else q.poll()) ?: return false
val transfer = (if (block) q.take() else q.poll()) ?: return null
val deliverTo = state.locked {
val h = handlers.filter { if (it.topic.isBlank()) true else message.topic == it.topic }
val h = handlers.filter { if (it.topic.isBlank()) true else transfer.message.topic == it.topic }
if (h.isEmpty()) {
// Got no handlers for this message yet. Keep the message around and attempt redelivery after a new
@ -257,8 +265,8 @@ class InMemoryMessagingNetwork {
// reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting
// up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at
// least sometimes.
pendingRedelivery.add(message)
return false
pendingRedelivery.add(transfer)
return null
}
h
@ -268,14 +276,14 @@ class InMemoryMessagingNetwork {
// Now deliver via the requested executor, or on this thread if no executor was provided at registration time.
(handler.executor ?: MoreExecutors.directExecutor()).execute {
try {
handler.callback(message, handler)
handler.callback(transfer.message, handler)
} catch(e: Exception) {
loggerFor<InMemoryMessagingNetwork>().error("Caught exception in handler for $this/${handler.topic}", e)
}
}
}
return true
return transfer
}
}
}

View File

@ -131,8 +131,9 @@ class MockNetwork(private val threadPerNode: Boolean = false,
*/
fun runNetwork(rounds: Int = -1) {
fun pumpAll() = messagingNetwork.endpoints.map { it.pump(false) }
if (rounds == -1)
while (pumpAll().any { it }) {
while (pumpAll().any { it != null }) {
}
else
repeat(rounds) { pumpAll() }

View File

@ -175,18 +175,22 @@ abstract class Simulation(val runAsync: Boolean,
* will carry on from where this one stopped. In an environment where you want to take actions between anything
* interesting happening, or control the precise speed at which things operate (beyond the latency injector), this
* is a useful way to do things.
*
* @return the message that was processed, or null if no node accepted a message in this round.
*/
open fun iterate() {
open fun iterate(): InMemoryMessagingNetwork.MessageTransfer? {
// Keep going until one of the nodes has something to do, or we have checked every node.
val endpoints = network.messagingNetwork.endpoints
var countDown = endpoints.size
while (countDown > 0) {
val handledMessage = endpoints[pumpCursor].pump(false)
if (handledMessage) break
if (handledMessage != null)
return handledMessage
// If this node had nothing to do, advance the cursor with wraparound and try again.
pumpCursor = (pumpCursor + 1) % endpoints.size
countDown--
}
return null
}
protected fun linkProtocolProgress(node: SimulatedNode, protocol: ProtocolLogic<*>) {

View File

@ -36,6 +36,7 @@ import java.util.jar.JarOutputStream
import java.util.zip.ZipEntry
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
/**
@ -171,7 +172,7 @@ class TwoPartyTradeProtocolTests {
// Alice doesn't know that and carries on: she wants to know about the cash transactions he's trying to use.
// She will wait around until Bob comes back.
assertTrue(pumpAlice())
assertNotNull(pumpAlice())
// ... bring the node back up ... the act of constructing the SMM will re-register the message handlers
// that Bob was waiting on before the reboot occurred.