Messaging: (mock) in memory network implementation improvements.

- Now supports simulated restart of nodes
- Messages sent to non-running nodes are queued for delivery
- Messages received by a node that don't match any topics are queued until handlers are registered

These improvements help us unit test various robustness features and ensure things work in truly concurrent context where there can be race conditions like receiving a message before the node had a chance to register the right handlers.
This commit is contained in:
Mike Hearn 2015-12-11 14:54:23 +01:00
parent 0ca47156bc
commit 65c5fa7502
3 changed files with 151 additions and 59 deletions

View File

@ -12,8 +12,6 @@ import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors import com.google.common.util.concurrent.MoreExecutors
import core.sha256 import core.sha256
import core.utilities.loggerFor
import core.utilities.trace
import java.time.Instant import java.time.Instant
import java.util.* import java.util.*
import java.util.concurrent.Executor import java.util.concurrent.Executor
@ -22,6 +20,7 @@ import javax.annotation.concurrent.GuardedBy
import javax.annotation.concurrent.ThreadSafe import javax.annotation.concurrent.ThreadSafe
import kotlin.concurrent.currentThread import kotlin.concurrent.currentThread
import kotlin.concurrent.thread import kotlin.concurrent.thread
import kotlin.test.fail
/** /**
* An in-memory network allows you to manufacture [Node]s for a set of participants. Each * An in-memory network allows you to manufacture [Node]s for a set of participants. Each
@ -32,12 +31,13 @@ import kotlin.concurrent.thread
*/ */
@ThreadSafe @ThreadSafe
public class InMemoryNetwork { public class InMemoryNetwork {
companion object { private var counter = 0 // -1 means stopped.
private val L = loggerFor<InMemoryNetwork>() private val networkMap = HashMap<Handle, Node>()
} // All messages are kept here until the messages are pumped off the queue by a caller to the node class.
// Queues are created on-demand when a message is sent to an address: the receiving node doesn't have to have
@GuardedBy("this") private var counter = 0 // -1 means stopped. // been created yet. If the node identified by the given handle has gone away/been shut down then messages
private val networkMap: MutableMap<InMemoryNodeHandle, Node> = Collections.synchronizedMap(HashMap()) // stack up here waiting for it to come back. The intent of this is to simulate a reliable messaging network.
private val messageQueues = HashMap<Handle, LinkedBlockingQueue<Message>>()
/** /**
* Creates a node and returns the new object that identifies its location on the network to senders, and the * Creates a node and returns the new object that identifies its location on the network to senders, and the
@ -49,35 +49,69 @@ public class InMemoryNetwork {
* executor. * executor.
*/ */
@Synchronized @Synchronized
fun createNode(manuallyPumped: Boolean): Pair<SingleMessageRecipient, MessagingSystemBuilder<Node>> { fun createNode(manuallyPumped: Boolean): Pair<Handle, MessagingSystemBuilder<Node>> {
check(counter >= 0) { "In memory network stopped: please recreate. "} check(counter >= 0) { "In memory network stopped: please recreate. "}
val builder = createNodeWithID(manuallyPumped, counter) as Builder
val id = InMemoryNodeHandle(counter)
counter++ counter++
return Pair(id, Builder(manuallyPumped, id)) val id = builder.id
return Pair(id, builder)
} }
val entireNetwork: AllPossibleRecipients = object : AllPossibleRecipients {} /** Creates a node at the given address: useful if you want to recreate a node to simulate a restart */
fun createNodeWithID(manuallyPumped: Boolean, id: Int): MessagingSystemBuilder<Node> {
return Builder(manuallyPumped, Handle(id))
}
@Synchronized
private fun netSend(message: Message, recipients: MessageRecipients) {
when (recipients) {
is Handle -> getQueueForHandle(recipients).add(message)
is AllPossibleRecipients -> {
// This means all possible recipients _that the network knows about at the time_, not literally everyone
// who joins into the indefinite future.
for (handle in networkMap.keys)
getQueueForHandle(handle).add(message)
}
else -> fail("Unknown type of recipient handle")
}
}
@Synchronized
private fun netNodeHasShutdown(handle: Handle) {
networkMap.remove(handle)
}
@Synchronized
private fun getQueueForHandle(recipients: Handle) = messageQueues.getOrPut(recipients) { LinkedBlockingQueue() }
val everyoneOnline: AllPossibleRecipients = object : AllPossibleRecipients {}
@Synchronized @Synchronized
fun stop() { fun stop() {
for (node in networkMap.values) { // toArrayList here just copies the collection, which we need because node.stop() will delete itself from
// the network map by calling netNodeHasShutdown. So we would get a CoModException if we didn't copy first.
for (node in networkMap.values.toArrayList())
node.stop() node.stop()
}
counter = -1 counter = -1
networkMap.clear()
messageQueues.clear()
} }
private inner class Builder(val manuallyPumped: Boolean, val id: InMemoryNodeHandle) : MessagingSystemBuilder<Node> { inner class Builder(val manuallyPumped: Boolean, val id: Handle) : MessagingSystemBuilder<Node> {
override fun start(): ListenableFuture<Node> { override fun start(): ListenableFuture<Node> {
val node = Node(manuallyPumped) synchronized(this@InMemoryNetwork) {
networkMap[id] = node val node = Node(manuallyPumped, id)
return Futures.immediateFuture(node) networkMap[id] = node
return Futures.immediateFuture(node)
}
} }
} }
private class InMemoryNodeHandle(val id: Int) : SingleMessageRecipient { class Handle(val id: Int) : SingleMessageRecipient {
override fun toString() = "In memory node $id" override fun toString() = "In memory node $id"
override fun equals(other: Any?) = other is InMemoryNodeHandle && other.id == id override fun equals(other: Any?) = other is Handle && other.id == id
override fun hashCode() = id.hashCode() override fun hashCode() = id.hashCode()
} }
@ -88,52 +122,55 @@ public class InMemoryNetwork {
* *
* An instance can be obtained by creating a builder and then using the start method. * An instance can be obtained by creating a builder and then using the start method.
*/ */
inner class Node(private val manuallyPumped: Boolean): MessagingSystem { inner class Node(private val manuallyPumped: Boolean, private val handle: Handle): MessagingSystem {
inner class Handler(val executor: Executor?, val topic: String, val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration inner class Handler(val executor: Executor?, val topic: String, val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
@GuardedBy("this") @GuardedBy("this")
protected val handlers: MutableList<Handler> = ArrayList() protected val handlers: MutableList<Handler> = ArrayList()
@GuardedBy("this") @GuardedBy("this")
protected var running = true protected var running = true
protected val q = LinkedBlockingQueue<Message>() @GuardedBy("this")
protected val pendingRedelivery = LinkedList<Message>()
protected val backgroundThread = if (manuallyPumped) null else thread(isDaemon = true, name = "In-memory message dispatcher ") { protected val backgroundThread = if (manuallyPumped) null else thread(isDaemon = true, name = "In-memory message dispatcher ") {
while (!currentThread.isInterrupted) pumpInternal(true) while (!currentThread.isInterrupted) {
try {
pumpInternal(true)
} catch(e: InterruptedException) {
if (synchronized(this) { running })
throw e
}
}
} }
@Synchronized @Synchronized
override fun addMessageHandler(topic: String, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { override fun addMessageHandler(topic: String, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
check(running) checkRunning()
return Handler(executor, topic, callback).apply { handlers.add(this) } val handler = Handler(executor, topic, callback).apply { handlers.add(this) }
if (pendingRedelivery.isNotEmpty()) {
val items = ArrayList(pendingRedelivery)
pendingRedelivery.clear()
items.forEach { netSend(it, handle) }
}
return handler
} }
@Synchronized @Synchronized
override fun removeMessageHandler(registration: MessageHandlerRegistration) { override fun removeMessageHandler(registration: MessageHandlerRegistration) {
check(running) checkRunning()
check(handlers.remove(registration as Handler)) check(handlers.remove(registration as Handler))
} }
@Synchronized @Synchronized
override fun send(message: Message, target: MessageRecipients) { override fun send(message: Message, target: MessageRecipients) {
check(running) checkRunning()
L.trace { "Sending message of topic '${message.topic}' to '$target'" } netSend(message, target)
when (target) {
is InMemoryNodeHandle -> {
val node = networkMap[target] ?: throw IllegalArgumentException("Unknown message recipient: $target")
node.q.put(message)
}
entireNetwork -> {
for (node in networkMap.values) {
node.q.put(message)
}
}
else -> throw IllegalArgumentException("Unhandled type of target: $target")
}
} }
@Synchronized @Synchronized
override fun stop() { override fun stop() {
backgroundThread?.interrupt()
running = false running = false
backgroundThread?.interrupt()
netNodeHasShutdown(handle)
} }
/** Returns the given (topic, data) pair as a newly created message object.*/ /** Returns the given (topic, data) pair as a newly created message object.*/
@ -151,16 +188,22 @@ public class InMemoryNetwork {
/** /**
* Delivers a single message from the internal queue. If there are no messages waiting to be delivered and block * Delivers a single message from the internal queue. If there are no messages waiting to be delivered and block
* is true, waits until one has been provided on a different thread via send. If block is false, the return result * is true, waits until one has been provided on a different thread via send. If block is false, the return
* indicates whether a message was delivered or not. * result indicates whether a message was delivered or not.
*/ */
fun pump(block: Boolean): Boolean { fun pump(block: Boolean): Boolean {
check(manuallyPumped) check(manuallyPumped)
synchronized(this) { check(running) } checkRunning()
return pumpInternal(block) return pumpInternal(block)
} }
@Synchronized
private fun checkRunning() {
check(running)
}
private fun pumpInternal(block: Boolean): Boolean { private fun pumpInternal(block: Boolean): Boolean {
val q = getQueueForHandle(handle)
val message = if (block) q.take() else q.poll() val message = if (block) q.take() else q.poll()
if (message == null) if (message == null)
@ -170,6 +213,18 @@ public class InMemoryNetwork {
handlers.filter { if (it.topic.isBlank()) true else message.topic == it.topic } handlers.filter { if (it.topic.isBlank()) true else message.topic == it.topic }
} }
if (deliverTo.isEmpty()) {
// Got no handlers for this message yet. Keep the message around and attempt redelivery after a new
// handler has been registered. The purpose of this path is to make unit tests that have multi-threading
// reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting
// up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at
// least sometimes.
synchronized(this) {
pendingRedelivery.add(message)
}
return false
}
for (handler in deliverTo) { for (handler in deliverTo) {
// Now deliver via the requested executor, or on this thread if no executor was provided at registration time. // Now deliver via the requested executor, or on this thread if no executor was provided at registration time.
(handler.executor ?: MoreExecutors.directExecutor()).execute { handler.callback(message, handler) } (handler.executor ?: MoreExecutors.directExecutor()).execute { handler.callback(message, handler) }

View File

@ -9,7 +9,6 @@
package core.messaging package core.messaging
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import core.serialization.SerializeableWithKryo
import core.serialization.deserialize import core.serialization.deserialize
import core.serialization.serialize import core.serialization.serialize
import java.time.Duration import java.time.Duration
@ -85,15 +84,15 @@ fun MessagingSystem.runOnNextMessage(topic: String = "", executor: Executor? = n
} }
} }
fun MessagingSystem.send(topic: String, to: MessageRecipients, obj: SerializeableWithKryo) = send(createMessage(topic, obj.serialize()), to) fun MessagingSystem.send(topic: String, to: MessageRecipients, obj: Any) = send(createMessage(topic, obj.serialize()), to)
/** /**
* Registers a handler for the given topic that runs the given callback with the message content deserialised to the * Registers a handler for the given topic that runs the given callback with the message content deserialised to the
* given type, and then removes itself. * given type, and then removes itself.
*/ */
inline fun <reified T : SerializeableWithKryo> MessagingSystem.runOnNextMessageWith(topic: String = "", inline fun <reified T : Any> MessagingSystem.runOnNextMessageWith(topic: String = "",
executor: Executor? = null, executor: Executor? = null,
noinline callback: (T) -> Unit) { noinline callback: (T) -> Unit) {
addMessageHandler(topic, executor) { msg, reg -> addMessageHandler(topic, executor) { msg, reg ->
callback(msg.data.deserialize<T>()) callback(msg.data.deserialize<T>())
removeMessageHandler(reg) removeMessageHandler(reg)
@ -109,8 +108,8 @@ inline fun <reified T : SerializeableWithKryo> MessagingSystem.runOnNextMessageW
* A specific implementation of the controller class will have extra features that let you customise it before starting * A specific implementation of the controller class will have extra features that let you customise it before starting
* it up. * it up.
*/ */
interface MessagingSystemBuilder<T : MessagingSystem> { interface MessagingSystemBuilder<out T : MessagingSystem> {
fun start(): ListenableFuture<T> fun start(): ListenableFuture<out T>
} }
interface MessageHandlerRegistration interface MessageHandlerRegistration

View File

@ -10,21 +10,23 @@
package core.messaging package core.messaging
import core.serialization.deserialize
import org.junit.After import org.junit.After
import org.junit.Before import org.junit.Before
import org.junit.Test import org.junit.Test
import java.util.* import java.util.*
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertFails import kotlin.test.assertFails
import kotlin.test.assertFalse
import kotlin.test.assertTrue import kotlin.test.assertTrue
open class TestWithInMemoryNetwork { open class TestWithInMemoryNetwork {
val nodes: MutableMap<SingleMessageRecipient, InMemoryNetwork.Node> = HashMap() val nodes: MutableMap<InMemoryNetwork.Handle, InMemoryNetwork.Node> = HashMap()
lateinit var network: InMemoryNetwork lateinit var network: InMemoryNetwork
fun makeNode(): Pair<SingleMessageRecipient, InMemoryNetwork.Node> { fun makeNode(inBackground: Boolean = false): Pair<InMemoryNetwork.Handle, InMemoryNetwork.Node> {
// The manuallyPumped = true bit means that we must call the pump method on the system in order to // The manuallyPumped = true bit means that we must call the pump method on the system in order to
val (address, builder) = network.createNode(manuallyPumped = true) val (address, builder) = network.createNode(!inBackground)
val node = builder.start().get() val node = builder.start().get()
nodes[address] = node nodes[address] = node
return Pair(address, node) return Pair(address, node)
@ -41,12 +43,12 @@ open class TestWithInMemoryNetwork {
network.stop() network.stop()
} }
fun pumpAll() = nodes.values.map { it.pump(false) } fun pumpAll(blocking: Boolean) = nodes.values.map { it.pump(blocking) }
// Keep calling "pump" in rounds until every node in the network reports that it had nothing to do. // Keep calling "pump" in rounds until every node in the network reports that it had nothing to do
fun <T> runNetwork(body: () -> T): T { fun <T> runNetwork(body: () -> T): T {
val result = body() val result = body()
while (pumpAll().any { it }) {} while (pumpAll(false).any { it }) {}
return result return result
} }
} }
@ -111,8 +113,44 @@ class InMemoryMessagingTests : TestWithInMemoryNetwork() {
var counter = 0 var counter = 0
listOf(node1, node2, node3).forEach { it.addMessageHandler { msg, registration -> counter++ } } listOf(node1, node2, node3).forEach { it.addMessageHandler { msg, registration -> counter++ } }
runNetwork { runNetwork {
node1.send(node2.createMessage("test.topic", bits), network.entireNetwork) node1.send(node2.createMessage("test.topic", bits), network.everyoneOnline)
} }
assertEquals(3, counter) assertEquals(3, counter)
} }
@Test
fun downAndUp() {
// Test (re)delivery of messages to nodes that aren't created yet, or were stopped and then restarted.
// The purpose of this functionality is to simulate a reliable messaging system that keeps trying until
// messages are delivered.
val (addr1, node1) = makeNode()
var (addr2, node2) = makeNode()
node1.send("test.topic", addr2, "hello!")
node2.pump(false) // No handler registered, so the message goes into a holding area.
var runCount = 0
node2.addMessageHandler("test.topic") { msg, registration ->
if (msg.data.deserialize<String>() == "hello!")
runCount++
}
node2.pump(false) // Try again now the handler is registered
assertEquals(1, runCount)
// Shut node2 down for a while. Node 1 keeps sending it messages though.
node2.stop()
node1.send("test.topic", addr2, "are you there?")
node1.send("test.topic", addr2, "wake up!")
// Now re-create node2 with the same address as last time, and re-register a message handler.
// Check that the messages that were sent whilst it was gone are still there, waiting for it.
node2 = network.createNodeWithID(true, addr2.id).start().get()
node2.addMessageHandler("test.topic") { a, b -> runCount++ }
assertTrue(node2.pump(false))
assertEquals(2, runCount)
assertTrue(node2.pump(false))
assertEquals(3, runCount)
assertFalse(node2.pump(false))
assertEquals(3, runCount)
}
} }