First draft of a basic messaging module interface (VERY ROUGH). Ideally, is something that can have an e.g. Kafka backend, with a full P2P implementation later.

This commit is contained in:
Mike Hearn 2015-12-04 15:52:49 +00:00
parent 7881be07ed
commit 4c87dc2981
3 changed files with 423 additions and 0 deletions

View File

@ -0,0 +1,181 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.messaging
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import core.sha256
import core.utilities.loggerFor
import core.utilities.trace
import java.time.Instant
import java.util.*
import java.util.concurrent.Executor
import java.util.concurrent.LinkedBlockingQueue
import javax.annotation.concurrent.GuardedBy
import javax.annotation.concurrent.ThreadSafe
import kotlin.concurrent.currentThread
import kotlin.concurrent.thread
/**
* An in-memory network allows you to manufacture [Node]s for a set of participants. Each
* [Node] maintains a queue of messages it has received, and a background thread that dispatches
* messages one by one to registered handlers. Alternatively, a messaging system may be manually pumped, in which
* case no thread is created and a caller is expected to force delivery one at a time (this is useful for unit
* testing).
*/
@ThreadSafe
public class InMemoryNetwork {
companion object {
private val L = loggerFor<InMemoryNetwork>()
}
@GuardedBy("this") private var counter = 0 // -1 means stopped.
private val networkMap: MutableMap<InMemoryNodeHandle, Node> = Collections.synchronizedMap(HashMap())
/**
* Creates a node and returns the new object that identifies its location on the network to senders, and the
* [Node] that the recipient/in-memory node uses to receive messages and send messages itself.
*
* If [manuallyPumped] is set to true, then you are expected to call the [Node.pump] method on the [Node]
* in order to cause the delivery of a single message, which will occur on the thread of the caller. If set to false
* then this class will set up a background thread to deliver messages asynchronously, if the handler specifies no
* executor.
*/
@Synchronized
fun createNode(manuallyPumped: Boolean): Pair<SingleMessageRecipient, MessagingSystemBuilder<Node>> {
check(counter >= 0) { "In memory network stopped: please recreate. "}
val id = InMemoryNodeHandle(counter)
counter++
return Pair(id, Builder(manuallyPumped, id))
}
val entireNetwork: AllPossibleRecipients = object : AllPossibleRecipients {}
@Synchronized
fun stop() {
for (node in networkMap.values) {
node.stop()
}
counter = -1
}
private inner class Builder(val manuallyPumped: Boolean, val id: InMemoryNodeHandle) : MessagingSystemBuilder<Node> {
override fun start(): ListenableFuture<Node> {
val node = Node(manuallyPumped)
networkMap[id] = node
return Futures.immediateFuture(node)
}
}
private class InMemoryNodeHandle(val id: Int) : SingleMessageRecipient {
override fun toString() = "In memory node $id"
override fun equals(other: Any?) = other is InMemoryNodeHandle && other.id == id
override fun hashCode() = id.hashCode()
}
/**
* An [Node] provides a [MessagingSystem] that isn't backed by any kind of network or disk storage
* system, but just uses regular queues on the heap instead. It is intended for unit testing and developer convenience
* when all entities on 'the network' are being simulated in-process.
*
* An instance can be obtained by creating a builder and then using the start method.
*/
inner class Node(private val manuallyPumped: Boolean): MessagingSystem {
inner class Handler(val executor: Executor?, val topic: String, val callback: (Message) -> Unit) : MessageHandlerRegistration
@GuardedBy("this")
protected val handlers: MutableList<Handler> = ArrayList()
@GuardedBy("this")
protected var running = true
protected val q = LinkedBlockingQueue<Message>()
protected val backgroundThread = if (manuallyPumped) null else thread(isDaemon = true, name = "In-memory message dispatcher ") {
while (!currentThread.isInterrupted) pumpInternal(true)
}
@Synchronized
override fun addMessageHandler(executor: Executor?, topic: String, callback: (Message) -> Unit): MessageHandlerRegistration {
check(running)
return Handler(executor, topic, callback).apply { handlers.add(this) }
}
@Synchronized
override fun removeMessageHandler(registration: MessageHandlerRegistration) {
check(running)
check(handlers.remove(registration as Handler))
}
@Synchronized
override fun send(message: Message, target: MessageRecipients) {
check(running)
L.trace { "Sending $message to '$target'" }
when (target) {
is InMemoryNodeHandle -> {
val node = networkMap[target] ?: throw IllegalArgumentException("Unknown message recipient: $target")
node.q.put(message)
}
entireNetwork -> {
for (node in networkMap.values) {
node.q.put(message)
}
}
else -> throw IllegalArgumentException("Unhandled type of target: $target")
}
}
@Synchronized
override fun stop() {
backgroundThread?.interrupt()
running = false
}
/** Returns the given (topic, data) pair as a newly created message object.*/
override fun createMessage(topic: String, data: ByteArray): Message {
return object : Message {
override val topic: String get() = topic
override val data: ByteArray get() = data
override val debugTimestamp: Instant = Instant.now()
override fun serialise(): ByteArray = this.serialise()
override val debugMessageID: String get() = serialise().sha256().prefixChars()
override fun toString() = topic + "#" + String(data)
}
}
/**
* Delivers a single message from the internal queue. If there are no messages waiting to be delivered and block
* is true, waits until one has been provided on a different thread via send. If block is false, the return result
* indicates whether a message was delivered or not.
*/
fun pump(block: Boolean): Boolean {
check(manuallyPumped)
synchronized(this) { check(running) }
return pumpInternal(block)
}
private fun pumpInternal(block: Boolean): Boolean {
val message = if (block) q.take() else q.poll()
if (message == null)
return false
val deliverTo = synchronized(this) {
handlers.filter { if (it.topic.isBlank()) true else message.topic == it.topic }
}
for (handler in deliverTo) {
// Now deliver via the requested executor, or on this thread if no executor was provided at registration time.
(handler.executor ?: MoreExecutors.directExecutor()).execute { handler.callback(message) }
}
return true
}
}
}

View File

@ -0,0 +1,123 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
package core.messaging
import com.google.common.util.concurrent.ListenableFuture
import java.time.Duration
import java.time.Instant
import java.util.concurrent.Executor
import javax.annotation.concurrent.ThreadSafe
/**
* A [MessagingSystem] sits at the boundary between a message routing / networking layer and the core platform code.
*
* A messaging system must provide the ability to send 1:many messages, potentially to an abstract "group", the
* membership of which is defined elsewhere. Messages are atomic and the system guarantees that a sent message
* _eventually_ will arrive in the exact form it was sent, however, messages can be arbitrarily re-ordered or delayed.
*
* Example implementations might be a custom P2P layer, Akka, Apache Kafka, etc. It is assumed that the message layer
* is *reliable* and as such messages may be stored to disk once queued.
*/
@ThreadSafe
interface MessagingSystem {
/**
* The provided function will be invoked for each received message whose topic matches the given string, on the given
* executor. The topic can be the empty string to match all messages.
*
* If no executor is received then the callback will run on threads provided by the messaging system, and the
* callback is expected to be thread safe as a result.
*
* The returned object is an opaque handle that may be used to un-register handlers later with [addMessageHandler].
*
* If the callback throws an exception then the message is discarded and will not be retried, unless the exception
* is a subclass of [RetryMessageLaterException], in which case the message will be queued and attempted later.
*/
fun addMessageHandler(executor: Executor? = null, topic: String = "", callback: (Message) -> Unit): MessageHandlerRegistration
/**
* Removes a handler given the object returned from [addMessageHandler]. The callback will no longer be invoked once
* this method has returned, although executions that are currently in flight will not be interrupted.
*
* @throws IllegalArgumentException if the given registration isn't valid for this messaging system.
* @throws IllegalStateException if the given registration was already de-registered.
*/
fun removeMessageHandler(registration: MessageHandlerRegistration)
/**
* Sends a message to the given receiver. The details of how receivers are identified is up to the messaging
* implementation: the type system provides an opaque high level view, with more fine grained control being
* available via type casting. Once this function returns the message is queued for delivery but not necessarily
* delivered: if the recipients are offline then the message could be queued hours or days later.
*
* There is no way to know if a message has been received. If your protocol requires this, you need the recipient
* to send an ACK message back.
*/
fun send(message: Message, target: MessageRecipients)
fun stop()
/**
* Returns an initialised [Message] with the current time, etc, already filled in.
*/
fun createMessage(topic: String, data: ByteArray): Message
}
/**
* This class lets you start up a [MessagingSystem]. Its purpose is to stop you from getting access to the methods
* on the messaging system interface until you have successfully started up the system. One of these objects should
* be the only way to obtain a reference to a [MessagingSystem]. Startup may be a slow process: some implementations
* may let you cast the returned future to an object that lets you get status info.
*
* A specific implementation of the controller class will have extra features that let you customise it before starting
* it up.
*/
interface MessagingSystemBuilder<T : MessagingSystem> {
fun start(): ListenableFuture<T>
}
interface MessageHandlerRegistration
class RetryMessageLaterException : Exception() {
/** If set, the message will be re-queued and retried after the requested interval. */
var delayPeriod: Duration? = null
}
/**
* A message is defined, at this level, to be a (topic, timestamp, byte arrays) triple, where the topic is a string in
* Java-style reverse dns form, with "platform." being a prefix reserved by the platform for its own use. Vendor
* specific messages can be defined, but use your domain name as the prefix e.g. "uk.co.bigbank.messages.SomeMessage".
*
* The debugTimestamp field is intended to aid in tracking messages as they flow across the network, likewise, the
* message ID is intended to be an ad-hoc way to identify a message sent in the system through debug logs and so on.
* These IDs and timestamps should not be assumed to be globally unique, although due to the nanosecond precision of
* the timestamp field they probably will be, even if an implementation just uses a hash prefix as the message id.
*/
interface Message {
val topic: String
val data: ByteArray
val debugTimestamp: Instant
val debugMessageID: String
fun serialise(): ByteArray
}
/** A singleton that's useful for validating topic strings */
object TopicStringValidator {
private val regex = "[a-zA-Z0-9.]+".toPattern()
/** @throws IllegalArgumentException if the given topic contains invalid characters */
fun check(tag: String) = require(regex.matcher(tag).matches())
}
/** The interface for a group of message recipients (which may contain only one recipient) */
interface MessageRecipients
/** A base class for the case of point-to-point messages */
interface SingleMessageRecipient : MessageRecipients
/** A base class for a set of recipients specifically identified by the sender. */
interface MessageRecipientGroup : MessageRecipients
/** A special base class for the set of all possible recipients, without having to identify who they all are. */
interface AllPossibleRecipients : MessageRecipients

View File

@ -0,0 +1,119 @@
/*
* Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members
* pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms
* set forth therein.
*
* All other rights reserved.
*/
@file:Suppress("UNUSED_VARIABLE")
package core.messaging
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertFails
import kotlin.test.assertTrue
class InMemoryMessagingTests {
val nodes: MutableMap<SingleMessageRecipient, InMemoryNetwork.Node> = HashMap()
lateinit var network: InMemoryNetwork
init {
// BriefLogFormatter.initVerbose()
}
fun makeNode(): Pair<SingleMessageRecipient, InMemoryNetwork.Node> {
// The manuallyPumped = true bit means that we must call the pump method on the system in order to
val (address, builder) = network.createNode(manuallyPumped = true)
val node = builder.start().get()
nodes[address] = node
return Pair(address, node)
}
fun pumpAll() {
nodes.values.forEach { it.pump(false) }
}
// Utilities to help define messaging rounds.
fun roundWithPumpings(times: Int, body: () -> Unit) {
body()
repeat(times) { pumpAll() }
}
fun round(body: () -> Unit) = roundWithPumpings(1, body)
@Before
fun before() {
network = InMemoryNetwork()
nodes.clear()
}
@After
fun after() {
network.stop()
}
@Test
fun topicStringValidation() {
TopicStringValidator.check("this.is.ok")
TopicStringValidator.check("this.is.OkAlso")
assertFails {
TopicStringValidator.check("this.is.not-ok")
}
assertFails {
TopicStringValidator.check("")
}
assertFails {
TopicStringValidator.check("this.is not ok") // Spaces
}
}
@Test
fun basics() {
val (addr1, node1) = makeNode()
val (addr2, node2) = makeNode()
val (addr3, node3) = makeNode()
val bits = "test-content".toByteArray()
var finalDelivery: Message? = null
with(node2) {
addMessageHandler {
send(it, addr3)
}
}
with(node3) {
addMessageHandler {
finalDelivery = it
}
}
// Node 1 sends a message and it should end up in finalDelivery, after we pump each node.
roundWithPumpings(2) {
node1.send(node1.createMessage("test.topic", bits), addr2)
}
assertTrue(Arrays.equals(finalDelivery!!.data, bits))
}
@Test
fun broadcast() {
val (addr1, node1) = makeNode()
val (addr2, node2) = makeNode()
val (addr3, node3) = makeNode()
val bits = "test-content".toByteArray()
var counter = 0
listOf(node1, node2, node3).forEach { it.addMessageHandler { counter++ } }
round {
node1.send(node2.createMessage("test.topic", bits), network.entireNetwork)
}
assertEquals(3, counter)
}
}