From 65c5fa75022f61fa0b30d13f3122be9c3c610a80 Mon Sep 17 00:00:00 2001 From: Mike Hearn Date: Fri, 11 Dec 2015 14:54:23 +0100 Subject: [PATCH] Messaging: (mock) in memory network implementation improvements. - Now supports simulated restart of nodes - Messages sent to non-running nodes are queued for delivery - Messages received by a node that don't match any topics are queued until handlers are registered These improvements help us unit test various robustness features and ensure things work in truly concurrent context where there can be race conditions like receiving a message before the node had a chance to register the right handlers. --- .../kotlin/core/messaging/InMemoryNetwork.kt | 145 ++++++++++++------ src/main/kotlin/core/messaging/Messaging.kt | 13 +- .../core/messaging/InMemoryMessagingTests.kt | 52 ++++++- 3 files changed, 151 insertions(+), 59 deletions(-) diff --git a/src/main/kotlin/core/messaging/InMemoryNetwork.kt b/src/main/kotlin/core/messaging/InMemoryNetwork.kt index a9f07638cb..7511c9e25c 100644 --- a/src/main/kotlin/core/messaging/InMemoryNetwork.kt +++ b/src/main/kotlin/core/messaging/InMemoryNetwork.kt @@ -12,8 +12,6 @@ import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.MoreExecutors import core.sha256 -import core.utilities.loggerFor -import core.utilities.trace import java.time.Instant import java.util.* import java.util.concurrent.Executor @@ -22,6 +20,7 @@ import javax.annotation.concurrent.GuardedBy import javax.annotation.concurrent.ThreadSafe import kotlin.concurrent.currentThread import kotlin.concurrent.thread +import kotlin.test.fail /** * An in-memory network allows you to manufacture [Node]s for a set of participants. Each @@ -32,12 +31,13 @@ import kotlin.concurrent.thread */ @ThreadSafe public class InMemoryNetwork { - companion object { - private val L = loggerFor() - } - - @GuardedBy("this") private var counter = 0 // -1 means stopped. - private val networkMap: MutableMap = Collections.synchronizedMap(HashMap()) + private var counter = 0 // -1 means stopped. + private val networkMap = HashMap() + // All messages are kept here until the messages are pumped off the queue by a caller to the node class. + // Queues are created on-demand when a message is sent to an address: the receiving node doesn't have to have + // been created yet. If the node identified by the given handle has gone away/been shut down then messages + // stack up here waiting for it to come back. The intent of this is to simulate a reliable messaging network. + private val messageQueues = HashMap>() /** * Creates a node and returns the new object that identifies its location on the network to senders, and the @@ -49,35 +49,69 @@ public class InMemoryNetwork { * executor. */ @Synchronized - fun createNode(manuallyPumped: Boolean): Pair> { + fun createNode(manuallyPumped: Boolean): Pair> { check(counter >= 0) { "In memory network stopped: please recreate. "} - - val id = InMemoryNodeHandle(counter) + val builder = createNodeWithID(manuallyPumped, counter) as Builder counter++ - return Pair(id, Builder(manuallyPumped, id)) + val id = builder.id + return Pair(id, builder) } - val entireNetwork: AllPossibleRecipients = object : AllPossibleRecipients {} + /** Creates a node at the given address: useful if you want to recreate a node to simulate a restart */ + fun createNodeWithID(manuallyPumped: Boolean, id: Int): MessagingSystemBuilder { + return Builder(manuallyPumped, Handle(id)) + } + + @Synchronized + private fun netSend(message: Message, recipients: MessageRecipients) { + when (recipients) { + is Handle -> getQueueForHandle(recipients).add(message) + + is AllPossibleRecipients -> { + // This means all possible recipients _that the network knows about at the time_, not literally everyone + // who joins into the indefinite future. + for (handle in networkMap.keys) + getQueueForHandle(handle).add(message) + } + else -> fail("Unknown type of recipient handle") + } + } + + @Synchronized + private fun netNodeHasShutdown(handle: Handle) { + networkMap.remove(handle) + } + + @Synchronized + private fun getQueueForHandle(recipients: Handle) = messageQueues.getOrPut(recipients) { LinkedBlockingQueue() } + + val everyoneOnline: AllPossibleRecipients = object : AllPossibleRecipients {} @Synchronized fun stop() { - for (node in networkMap.values) { + // toArrayList here just copies the collection, which we need because node.stop() will delete itself from + // the network map by calling netNodeHasShutdown. So we would get a CoModException if we didn't copy first. + for (node in networkMap.values.toArrayList()) node.stop() - } + counter = -1 + networkMap.clear() + messageQueues.clear() } - private inner class Builder(val manuallyPumped: Boolean, val id: InMemoryNodeHandle) : MessagingSystemBuilder { + inner class Builder(val manuallyPumped: Boolean, val id: Handle) : MessagingSystemBuilder { override fun start(): ListenableFuture { - val node = Node(manuallyPumped) - networkMap[id] = node - return Futures.immediateFuture(node) + synchronized(this@InMemoryNetwork) { + val node = Node(manuallyPumped, id) + networkMap[id] = node + return Futures.immediateFuture(node) + } } } - private class InMemoryNodeHandle(val id: Int) : SingleMessageRecipient { + class Handle(val id: Int) : SingleMessageRecipient { override fun toString() = "In memory node $id" - override fun equals(other: Any?) = other is InMemoryNodeHandle && other.id == id + override fun equals(other: Any?) = other is Handle && other.id == id override fun hashCode() = id.hashCode() } @@ -88,52 +122,55 @@ public class InMemoryNetwork { * * An instance can be obtained by creating a builder and then using the start method. */ - inner class Node(private val manuallyPumped: Boolean): MessagingSystem { + inner class Node(private val manuallyPumped: Boolean, private val handle: Handle): MessagingSystem { inner class Handler(val executor: Executor?, val topic: String, val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration @GuardedBy("this") protected val handlers: MutableList = ArrayList() @GuardedBy("this") protected var running = true - protected val q = LinkedBlockingQueue() + @GuardedBy("this") + protected val pendingRedelivery = LinkedList() protected val backgroundThread = if (manuallyPumped) null else thread(isDaemon = true, name = "In-memory message dispatcher ") { - while (!currentThread.isInterrupted) pumpInternal(true) + while (!currentThread.isInterrupted) { + try { + pumpInternal(true) + } catch(e: InterruptedException) { + if (synchronized(this) { running }) + throw e + } + } } @Synchronized override fun addMessageHandler(topic: String, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { - check(running) - return Handler(executor, topic, callback).apply { handlers.add(this) } + checkRunning() + val handler = Handler(executor, topic, callback).apply { handlers.add(this) } + if (pendingRedelivery.isNotEmpty()) { + val items = ArrayList(pendingRedelivery) + pendingRedelivery.clear() + items.forEach { netSend(it, handle) } + } + return handler } @Synchronized override fun removeMessageHandler(registration: MessageHandlerRegistration) { - check(running) + checkRunning() check(handlers.remove(registration as Handler)) } @Synchronized override fun send(message: Message, target: MessageRecipients) { - check(running) - L.trace { "Sending message of topic '${message.topic}' to '$target'" } - when (target) { - is InMemoryNodeHandle -> { - val node = networkMap[target] ?: throw IllegalArgumentException("Unknown message recipient: $target") - node.q.put(message) - } - entireNetwork -> { - for (node in networkMap.values) { - node.q.put(message) - } - } - else -> throw IllegalArgumentException("Unhandled type of target: $target") - } + checkRunning() + netSend(message, target) } @Synchronized override fun stop() { - backgroundThread?.interrupt() running = false + backgroundThread?.interrupt() + netNodeHasShutdown(handle) } /** Returns the given (topic, data) pair as a newly created message object.*/ @@ -151,16 +188,22 @@ public class InMemoryNetwork { /** * Delivers a single message from the internal queue. If there are no messages waiting to be delivered and block - * is true, waits until one has been provided on a different thread via send. If block is false, the return result - * indicates whether a message was delivered or not. + * is true, waits until one has been provided on a different thread via send. If block is false, the return + * result indicates whether a message was delivered or not. */ fun pump(block: Boolean): Boolean { check(manuallyPumped) - synchronized(this) { check(running) } + checkRunning() return pumpInternal(block) } + @Synchronized + private fun checkRunning() { + check(running) + } + private fun pumpInternal(block: Boolean): Boolean { + val q = getQueueForHandle(handle) val message = if (block) q.take() else q.poll() if (message == null) @@ -170,6 +213,18 @@ public class InMemoryNetwork { handlers.filter { if (it.topic.isBlank()) true else message.topic == it.topic } } + if (deliverTo.isEmpty()) { + // Got no handlers for this message yet. Keep the message around and attempt redelivery after a new + // handler has been registered. The purpose of this path is to make unit tests that have multi-threading + // reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting + // up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at + // least sometimes. + synchronized(this) { + pendingRedelivery.add(message) + } + return false + } + for (handler in deliverTo) { // Now deliver via the requested executor, or on this thread if no executor was provided at registration time. (handler.executor ?: MoreExecutors.directExecutor()).execute { handler.callback(message, handler) } diff --git a/src/main/kotlin/core/messaging/Messaging.kt b/src/main/kotlin/core/messaging/Messaging.kt index 58e6021f19..a3d8972b36 100644 --- a/src/main/kotlin/core/messaging/Messaging.kt +++ b/src/main/kotlin/core/messaging/Messaging.kt @@ -9,7 +9,6 @@ package core.messaging import com.google.common.util.concurrent.ListenableFuture -import core.serialization.SerializeableWithKryo import core.serialization.deserialize import core.serialization.serialize import java.time.Duration @@ -85,15 +84,15 @@ fun MessagingSystem.runOnNextMessage(topic: String = "", executor: Executor? = n } } -fun MessagingSystem.send(topic: String, to: MessageRecipients, obj: SerializeableWithKryo) = send(createMessage(topic, obj.serialize()), to) +fun MessagingSystem.send(topic: String, to: MessageRecipients, obj: Any) = send(createMessage(topic, obj.serialize()), to) /** * Registers a handler for the given topic that runs the given callback with the message content deserialised to the * given type, and then removes itself. */ -inline fun MessagingSystem.runOnNextMessageWith(topic: String = "", - executor: Executor? = null, - noinline callback: (T) -> Unit) { +inline fun MessagingSystem.runOnNextMessageWith(topic: String = "", + executor: Executor? = null, + noinline callback: (T) -> Unit) { addMessageHandler(topic, executor) { msg, reg -> callback(msg.data.deserialize()) removeMessageHandler(reg) @@ -109,8 +108,8 @@ inline fun MessagingSystem.runOnNextMessageW * A specific implementation of the controller class will have extra features that let you customise it before starting * it up. */ -interface MessagingSystemBuilder { - fun start(): ListenableFuture +interface MessagingSystemBuilder { + fun start(): ListenableFuture } interface MessageHandlerRegistration diff --git a/src/test/kotlin/core/messaging/InMemoryMessagingTests.kt b/src/test/kotlin/core/messaging/InMemoryMessagingTests.kt index 971b79c78c..db3b212918 100644 --- a/src/test/kotlin/core/messaging/InMemoryMessagingTests.kt +++ b/src/test/kotlin/core/messaging/InMemoryMessagingTests.kt @@ -10,21 +10,23 @@ package core.messaging +import core.serialization.deserialize import org.junit.After import org.junit.Before import org.junit.Test import java.util.* import kotlin.test.assertEquals import kotlin.test.assertFails +import kotlin.test.assertFalse import kotlin.test.assertTrue open class TestWithInMemoryNetwork { - val nodes: MutableMap = HashMap() + val nodes: MutableMap = HashMap() lateinit var network: InMemoryNetwork - fun makeNode(): Pair { + fun makeNode(inBackground: Boolean = false): Pair { // The manuallyPumped = true bit means that we must call the pump method on the system in order to - val (address, builder) = network.createNode(manuallyPumped = true) + val (address, builder) = network.createNode(!inBackground) val node = builder.start().get() nodes[address] = node return Pair(address, node) @@ -41,12 +43,12 @@ open class TestWithInMemoryNetwork { network.stop() } - fun pumpAll() = nodes.values.map { it.pump(false) } + fun pumpAll(blocking: Boolean) = nodes.values.map { it.pump(blocking) } - // Keep calling "pump" in rounds until every node in the network reports that it had nothing to do. + // Keep calling "pump" in rounds until every node in the network reports that it had nothing to do fun runNetwork(body: () -> T): T { val result = body() - while (pumpAll().any { it }) {} + while (pumpAll(false).any { it }) {} return result } } @@ -111,8 +113,44 @@ class InMemoryMessagingTests : TestWithInMemoryNetwork() { var counter = 0 listOf(node1, node2, node3).forEach { it.addMessageHandler { msg, registration -> counter++ } } runNetwork { - node1.send(node2.createMessage("test.topic", bits), network.entireNetwork) + node1.send(node2.createMessage("test.topic", bits), network.everyoneOnline) } assertEquals(3, counter) } + + @Test + fun downAndUp() { + // Test (re)delivery of messages to nodes that aren't created yet, or were stopped and then restarted. + // The purpose of this functionality is to simulate a reliable messaging system that keeps trying until + // messages are delivered. + val (addr1, node1) = makeNode() + var (addr2, node2) = makeNode() + + node1.send("test.topic", addr2, "hello!") + node2.pump(false) // No handler registered, so the message goes into a holding area. + var runCount = 0 + node2.addMessageHandler("test.topic") { msg, registration -> + if (msg.data.deserialize() == "hello!") + runCount++ + } + node2.pump(false) // Try again now the handler is registered + assertEquals(1, runCount) + + // Shut node2 down for a while. Node 1 keeps sending it messages though. + node2.stop() + + node1.send("test.topic", addr2, "are you there?") + node1.send("test.topic", addr2, "wake up!") + + // Now re-create node2 with the same address as last time, and re-register a message handler. + // Check that the messages that were sent whilst it was gone are still there, waiting for it. + node2 = network.createNodeWithID(true, addr2.id).start().get() + node2.addMessageHandler("test.topic") { a, b -> runCount++ } + assertTrue(node2.pump(false)) + assertEquals(2, runCount) + assertTrue(node2.pump(false)) + assertEquals(3, runCount) + assertFalse(node2.pump(false)) + assertEquals(3, runCount) + } } \ No newline at end of file