diff --git a/node/src/main/kotlin/core/testing/IRSSimulation.kt b/node/src/main/kotlin/core/testing/IRSSimulation.kt index fa1437517e..b741788d68 100644 --- a/node/src/main/kotlin/core/testing/IRSSimulation.kt +++ b/node/src/main/kotlin/core/testing/IRSSimulation.kt @@ -126,9 +126,9 @@ class IRSSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwork } } - override fun iterate() { + override fun iterate(): InMemoryMessagingNetwork.MessageTransfer? { if (executeOnNextIteration.isNotEmpty()) executeOnNextIteration.removeAt(0)() - super.iterate() + return super.iterate() } } \ No newline at end of file diff --git a/node/src/main/kotlin/core/testing/InMemoryMessagingNetwork.kt b/node/src/main/kotlin/core/testing/InMemoryMessagingNetwork.kt index 2dcb728d48..ff4068d3c6 100644 --- a/node/src/main/kotlin/core/testing/InMemoryMessagingNetwork.kt +++ b/node/src/main/kotlin/core/testing/InMemoryMessagingNetwork.kt @@ -36,11 +36,16 @@ class InMemoryMessagingNetwork { private var counter = 0 // -1 means stopped. private val handleEndpointMap = HashMap() + + data class MessageTransfer(val sender: InMemoryMessaging, val message: Message, val recipients: MessageRecipients) { + override fun toString() = "${message.topic} from '${sender.myAddress}' to '$recipients'" + } + // All messages are kept here until the messages are pumped off the queue by a caller to the node class. // Queues are created on-demand when a message is sent to an address: the receiving node doesn't have to have // been created yet. If the node identified by the given handle has gone away/been shut down then messages // stack up here waiting for it to come back. The intent of this is to simulate a reliable messaging network. - private val messageQueues = HashMap>() + private val messageQueues = HashMap>() val endpoints: List @Synchronized get() = handleEndpointMap.values.toList() @@ -73,9 +78,9 @@ class InMemoryMessagingNetwork { return Builder(manuallyPumped, Handle(id, description ?: "In memory node $id")) } - private val _allMessages = PublishSubject.create>() + private val _allMessages = PublishSubject.create() /** A stream of (sender, message, recipients) triples */ - val allMessages: Observable> = _allMessages + val allMessages: Observable = _allMessages interface LatencyCalculator { fun between(sender: SingleMessageRecipient, receiver: SingleMessageRecipient): Duration @@ -87,28 +92,29 @@ class InMemoryMessagingNetwork { @Synchronized private fun msgSend(from: InMemoryMessaging, message: Message, recipients: MessageRecipients) { - log.trace { "${message.topic} from '${from.myAddress}' to '$recipients'" } + val transfer = MessageTransfer(from, message, recipients) + log.trace { transfer.toString() } val calc = latencyCalculator if (calc != null && recipients is SingleMessageRecipient) { // Inject some artificial latency. timer.schedule(calc.between(from.myAddress, recipients).toMillis()) { - msgSendInternal(message, recipients) + msgSendInternal(transfer) } } else { - msgSendInternal(message, recipients) + msgSendInternal(transfer) } - _allMessages.onNext(Triple(from.myAddress, message, recipients)) + _allMessages.onNext(MessageTransfer(from, message, recipients)) } - private fun msgSendInternal(message: Message, recipients: MessageRecipients) { - when (recipients) { - is Handle -> getQueueForHandle(recipients).add(message) + private fun msgSendInternal(transfer: MessageTransfer) { + when (transfer.recipients) { + is Handle -> getQueueForHandle(transfer.recipients).add(transfer) is AllPossibleRecipients -> { // This means all possible recipients _that the network knows about at the time_, not literally everyone // who joins into the indefinite future. for (handle in handleEndpointMap.keys) - getQueueForHandle(handle).add(message) + getQueueForHandle(handle).add(transfer) } else -> throw IllegalArgumentException("Unknown type of recipient handle") } @@ -170,7 +176,7 @@ class InMemoryMessagingNetwork { protected inner class InnerState { val handlers: MutableList = ArrayList() - val pendingRedelivery = LinkedList() + val pendingRedelivery = LinkedList() } protected val state = ThreadBox(InnerState()) @@ -197,7 +203,7 @@ class InMemoryMessagingNetwork { Pair(handler, items) } for (it in items) - msgSend(this, it, handle) + msgSend(this, it.message, handle) return handler } @@ -237,19 +243,21 @@ class InMemoryMessagingNetwork { * Delivers a single message from the internal queue. If there are no messages waiting to be delivered and block * is true, waits until one has been provided on a different thread via send. If block is false, the return * result indicates whether a message was delivered or not. + * + * @return the message that was processed, if any in this round. */ - fun pump(block: Boolean): Boolean { + fun pump(block: Boolean): MessageTransfer? { check(manuallyPumped) check(running) return pumpInternal(block) } - private fun pumpInternal(block: Boolean): Boolean { + private fun pumpInternal(block: Boolean): MessageTransfer? { val q = getQueueForHandle(handle) - val message = (if (block) q.take() else q.poll()) ?: return false + val transfer = (if (block) q.take() else q.poll()) ?: return null val deliverTo = state.locked { - val h = handlers.filter { if (it.topic.isBlank()) true else message.topic == it.topic } + val h = handlers.filter { if (it.topic.isBlank()) true else transfer.message.topic == it.topic } if (h.isEmpty()) { // Got no handlers for this message yet. Keep the message around and attempt redelivery after a new @@ -257,8 +265,8 @@ class InMemoryMessagingNetwork { // reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting // up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at // least sometimes. - pendingRedelivery.add(message) - return false + pendingRedelivery.add(transfer) + return null } h @@ -268,14 +276,14 @@ class InMemoryMessagingNetwork { // Now deliver via the requested executor, or on this thread if no executor was provided at registration time. (handler.executor ?: MoreExecutors.directExecutor()).execute { try { - handler.callback(message, handler) + handler.callback(transfer.message, handler) } catch(e: Exception) { loggerFor().error("Caught exception in handler for $this/${handler.topic}", e) } } } - return true + return transfer } } } diff --git a/node/src/main/kotlin/core/testing/MockNode.kt b/node/src/main/kotlin/core/testing/MockNode.kt index f66593be35..e4ed552616 100644 --- a/node/src/main/kotlin/core/testing/MockNode.kt +++ b/node/src/main/kotlin/core/testing/MockNode.kt @@ -131,8 +131,9 @@ class MockNetwork(private val threadPerNode: Boolean = false, */ fun runNetwork(rounds: Int = -1) { fun pumpAll() = messagingNetwork.endpoints.map { it.pump(false) } + if (rounds == -1) - while (pumpAll().any { it }) { + while (pumpAll().any { it != null }) { } else repeat(rounds) { pumpAll() } diff --git a/node/src/main/kotlin/core/testing/Simulation.kt b/node/src/main/kotlin/core/testing/Simulation.kt index edfe3c6cbb..f58b95eeda 100644 --- a/node/src/main/kotlin/core/testing/Simulation.kt +++ b/node/src/main/kotlin/core/testing/Simulation.kt @@ -175,18 +175,22 @@ abstract class Simulation(val runAsync: Boolean, * will carry on from where this one stopped. In an environment where you want to take actions between anything * interesting happening, or control the precise speed at which things operate (beyond the latency injector), this * is a useful way to do things. + * + * @return the message that was processed, or null if no node accepted a message in this round. */ - open fun iterate() { + open fun iterate(): InMemoryMessagingNetwork.MessageTransfer? { // Keep going until one of the nodes has something to do, or we have checked every node. val endpoints = network.messagingNetwork.endpoints var countDown = endpoints.size while (countDown > 0) { val handledMessage = endpoints[pumpCursor].pump(false) - if (handledMessage) break + if (handledMessage != null) + return handledMessage // If this node had nothing to do, advance the cursor with wraparound and try again. pumpCursor = (pumpCursor + 1) % endpoints.size countDown-- } + return null } protected fun linkProtocolProgress(node: SimulatedNode, protocol: ProtocolLogic<*>) { diff --git a/node/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt b/node/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt index a9fd7feeb3..45253076d1 100644 --- a/node/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt +++ b/node/src/test/kotlin/core/messaging/TwoPartyTradeProtocolTests.kt @@ -36,6 +36,7 @@ import java.util.jar.JarOutputStream import java.util.zip.ZipEntry import kotlin.test.assertEquals import kotlin.test.assertFailsWith +import kotlin.test.assertNotNull import kotlin.test.assertTrue /** @@ -171,7 +172,7 @@ class TwoPartyTradeProtocolTests { // Alice doesn't know that and carries on: she wants to know about the cash transactions he's trying to use. // She will wait around until Bob comes back. - assertTrue(pumpAlice()) + assertNotNull(pumpAlice()) // ... bring the node back up ... the act of constructing the SMM will re-register the message handlers // that Bob was waiting on before the reboot occurred.