From 4c87dc29817060bb2080c948ed05ebf14694ba6a Mon Sep 17 00:00:00 2001 From: Mike Hearn Date: Fri, 4 Dec 2015 15:52:49 +0000 Subject: [PATCH] First draft of a basic messaging module interface (VERY ROUGH). Ideally, is something that can have an e.g. Kafka backend, with a full P2P implementation later. --- .../kotlin/core/messaging/InMemoryNetwork.kt | 181 ++++++++++++++++++ src/main/kotlin/core/messaging/Messaging.kt | 123 ++++++++++++ .../core/messaging/InMemoryMessagingTests.kt | 119 ++++++++++++ 3 files changed, 423 insertions(+) create mode 100644 src/main/kotlin/core/messaging/InMemoryNetwork.kt create mode 100644 src/main/kotlin/core/messaging/Messaging.kt create mode 100644 src/test/kotlin/core/messaging/InMemoryMessagingTests.kt diff --git a/src/main/kotlin/core/messaging/InMemoryNetwork.kt b/src/main/kotlin/core/messaging/InMemoryNetwork.kt new file mode 100644 index 0000000000..45da29a228 --- /dev/null +++ b/src/main/kotlin/core/messaging/InMemoryNetwork.kt @@ -0,0 +1,181 @@ +/* + * Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members + * pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms + * set forth therein. + * + * All other rights reserved. + */ + +package core.messaging + +import com.google.common.util.concurrent.Futures +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.MoreExecutors +import core.sha256 +import core.utilities.loggerFor +import core.utilities.trace +import java.time.Instant +import java.util.* +import java.util.concurrent.Executor +import java.util.concurrent.LinkedBlockingQueue +import javax.annotation.concurrent.GuardedBy +import javax.annotation.concurrent.ThreadSafe +import kotlin.concurrent.currentThread +import kotlin.concurrent.thread + +/** + * An in-memory network allows you to manufacture [Node]s for a set of participants. Each + * [Node] maintains a queue of messages it has received, and a background thread that dispatches + * messages one by one to registered handlers. Alternatively, a messaging system may be manually pumped, in which + * case no thread is created and a caller is expected to force delivery one at a time (this is useful for unit + * testing). + */ +@ThreadSafe +public class InMemoryNetwork { + companion object { + private val L = loggerFor() + } + + @GuardedBy("this") private var counter = 0 // -1 means stopped. + private val networkMap: MutableMap = Collections.synchronizedMap(HashMap()) + + /** + * Creates a node and returns the new object that identifies its location on the network to senders, and the + * [Node] that the recipient/in-memory node uses to receive messages and send messages itself. + * + * If [manuallyPumped] is set to true, then you are expected to call the [Node.pump] method on the [Node] + * in order to cause the delivery of a single message, which will occur on the thread of the caller. If set to false + * then this class will set up a background thread to deliver messages asynchronously, if the handler specifies no + * executor. + */ + @Synchronized + fun createNode(manuallyPumped: Boolean): Pair> { + check(counter >= 0) { "In memory network stopped: please recreate. "} + + val id = InMemoryNodeHandle(counter) + counter++ + return Pair(id, Builder(manuallyPumped, id)) + } + + val entireNetwork: AllPossibleRecipients = object : AllPossibleRecipients {} + + @Synchronized + fun stop() { + for (node in networkMap.values) { + node.stop() + } + counter = -1 + } + + private inner class Builder(val manuallyPumped: Boolean, val id: InMemoryNodeHandle) : MessagingSystemBuilder { + override fun start(): ListenableFuture { + val node = Node(manuallyPumped) + networkMap[id] = node + return Futures.immediateFuture(node) + } + } + + private class InMemoryNodeHandle(val id: Int) : SingleMessageRecipient { + override fun toString() = "In memory node $id" + override fun equals(other: Any?) = other is InMemoryNodeHandle && other.id == id + override fun hashCode() = id.hashCode() + } + + /** + * An [Node] provides a [MessagingSystem] that isn't backed by any kind of network or disk storage + * system, but just uses regular queues on the heap instead. It is intended for unit testing and developer convenience + * when all entities on 'the network' are being simulated in-process. + * + * An instance can be obtained by creating a builder and then using the start method. + */ + inner class Node(private val manuallyPumped: Boolean): MessagingSystem { + inner class Handler(val executor: Executor?, val topic: String, val callback: (Message) -> Unit) : MessageHandlerRegistration + @GuardedBy("this") + protected val handlers: MutableList = ArrayList() + @GuardedBy("this") + protected var running = true + protected val q = LinkedBlockingQueue() + + protected val backgroundThread = if (manuallyPumped) null else thread(isDaemon = true, name = "In-memory message dispatcher ") { + while (!currentThread.isInterrupted) pumpInternal(true) + } + + @Synchronized + override fun addMessageHandler(executor: Executor?, topic: String, callback: (Message) -> Unit): MessageHandlerRegistration { + check(running) + return Handler(executor, topic, callback).apply { handlers.add(this) } + } + + @Synchronized + override fun removeMessageHandler(registration: MessageHandlerRegistration) { + check(running) + check(handlers.remove(registration as Handler)) + } + + @Synchronized + override fun send(message: Message, target: MessageRecipients) { + check(running) + L.trace { "Sending $message to '$target'" } + when (target) { + is InMemoryNodeHandle -> { + val node = networkMap[target] ?: throw IllegalArgumentException("Unknown message recipient: $target") + node.q.put(message) + } + entireNetwork -> { + for (node in networkMap.values) { + node.q.put(message) + } + } + else -> throw IllegalArgumentException("Unhandled type of target: $target") + } + } + + @Synchronized + override fun stop() { + backgroundThread?.interrupt() + running = false + } + + /** Returns the given (topic, data) pair as a newly created message object.*/ + override fun createMessage(topic: String, data: ByteArray): Message { + return object : Message { + override val topic: String get() = topic + override val data: ByteArray get() = data + override val debugTimestamp: Instant = Instant.now() + override fun serialise(): ByteArray = this.serialise() + override val debugMessageID: String get() = serialise().sha256().prefixChars() + + override fun toString() = topic + "#" + String(data) + } + } + + /** + * Delivers a single message from the internal queue. If there are no messages waiting to be delivered and block + * is true, waits until one has been provided on a different thread via send. If block is false, the return result + * indicates whether a message was delivered or not. + */ + fun pump(block: Boolean): Boolean { + check(manuallyPumped) + synchronized(this) { check(running) } + return pumpInternal(block) + } + + private fun pumpInternal(block: Boolean): Boolean { + val message = if (block) q.take() else q.poll() + + if (message == null) + return false + + val deliverTo = synchronized(this) { + handlers.filter { if (it.topic.isBlank()) true else message.topic == it.topic } + } + + for (handler in deliverTo) { + // Now deliver via the requested executor, or on this thread if no executor was provided at registration time. + (handler.executor ?: MoreExecutors.directExecutor()).execute { handler.callback(message) } + } + + return true + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/core/messaging/Messaging.kt b/src/main/kotlin/core/messaging/Messaging.kt new file mode 100644 index 0000000000..854a8f24d1 --- /dev/null +++ b/src/main/kotlin/core/messaging/Messaging.kt @@ -0,0 +1,123 @@ +/* + * Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members + * pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms + * set forth therein. + * + * All other rights reserved. + */ + +package core.messaging + +import com.google.common.util.concurrent.ListenableFuture +import java.time.Duration +import java.time.Instant +import java.util.concurrent.Executor +import javax.annotation.concurrent.ThreadSafe + +/** + * A [MessagingSystem] sits at the boundary between a message routing / networking layer and the core platform code. + * + * A messaging system must provide the ability to send 1:many messages, potentially to an abstract "group", the + * membership of which is defined elsewhere. Messages are atomic and the system guarantees that a sent message + * _eventually_ will arrive in the exact form it was sent, however, messages can be arbitrarily re-ordered or delayed. + * + * Example implementations might be a custom P2P layer, Akka, Apache Kafka, etc. It is assumed that the message layer + * is *reliable* and as such messages may be stored to disk once queued. + */ +@ThreadSafe +interface MessagingSystem { + /** + * The provided function will be invoked for each received message whose topic matches the given string, on the given + * executor. The topic can be the empty string to match all messages. + * + * If no executor is received then the callback will run on threads provided by the messaging system, and the + * callback is expected to be thread safe as a result. + * + * The returned object is an opaque handle that may be used to un-register handlers later with [addMessageHandler]. + * + * If the callback throws an exception then the message is discarded and will not be retried, unless the exception + * is a subclass of [RetryMessageLaterException], in which case the message will be queued and attempted later. + */ + fun addMessageHandler(executor: Executor? = null, topic: String = "", callback: (Message) -> Unit): MessageHandlerRegistration + + /** + * Removes a handler given the object returned from [addMessageHandler]. The callback will no longer be invoked once + * this method has returned, although executions that are currently in flight will not be interrupted. + * + * @throws IllegalArgumentException if the given registration isn't valid for this messaging system. + * @throws IllegalStateException if the given registration was already de-registered. + */ + fun removeMessageHandler(registration: MessageHandlerRegistration) + + /** + * Sends a message to the given receiver. The details of how receivers are identified is up to the messaging + * implementation: the type system provides an opaque high level view, with more fine grained control being + * available via type casting. Once this function returns the message is queued for delivery but not necessarily + * delivered: if the recipients are offline then the message could be queued hours or days later. + * + * There is no way to know if a message has been received. If your protocol requires this, you need the recipient + * to send an ACK message back. + */ + fun send(message: Message, target: MessageRecipients) + + fun stop() + + /** + * Returns an initialised [Message] with the current time, etc, already filled in. + */ + fun createMessage(topic: String, data: ByteArray): Message +} + +/** + * This class lets you start up a [MessagingSystem]. Its purpose is to stop you from getting access to the methods + * on the messaging system interface until you have successfully started up the system. One of these objects should + * be the only way to obtain a reference to a [MessagingSystem]. Startup may be a slow process: some implementations + * may let you cast the returned future to an object that lets you get status info. + * + * A specific implementation of the controller class will have extra features that let you customise it before starting + * it up. + */ +interface MessagingSystemBuilder { + fun start(): ListenableFuture +} + +interface MessageHandlerRegistration + +class RetryMessageLaterException : Exception() { + /** If set, the message will be re-queued and retried after the requested interval. */ + var delayPeriod: Duration? = null +} + +/** + * A message is defined, at this level, to be a (topic, timestamp, byte arrays) triple, where the topic is a string in + * Java-style reverse dns form, with "platform." being a prefix reserved by the platform for its own use. Vendor + * specific messages can be defined, but use your domain name as the prefix e.g. "uk.co.bigbank.messages.SomeMessage". + * + * The debugTimestamp field is intended to aid in tracking messages as they flow across the network, likewise, the + * message ID is intended to be an ad-hoc way to identify a message sent in the system through debug logs and so on. + * These IDs and timestamps should not be assumed to be globally unique, although due to the nanosecond precision of + * the timestamp field they probably will be, even if an implementation just uses a hash prefix as the message id. + */ +interface Message { + val topic: String + val data: ByteArray + val debugTimestamp: Instant + val debugMessageID: String + fun serialise(): ByteArray +} + +/** A singleton that's useful for validating topic strings */ +object TopicStringValidator { + private val regex = "[a-zA-Z0-9.]+".toPattern() + /** @throws IllegalArgumentException if the given topic contains invalid characters */ + fun check(tag: String) = require(regex.matcher(tag).matches()) +} + +/** The interface for a group of message recipients (which may contain only one recipient) */ +interface MessageRecipients +/** A base class for the case of point-to-point messages */ +interface SingleMessageRecipient : MessageRecipients +/** A base class for a set of recipients specifically identified by the sender. */ +interface MessageRecipientGroup : MessageRecipients +/** A special base class for the set of all possible recipients, without having to identify who they all are. */ +interface AllPossibleRecipients : MessageRecipients \ No newline at end of file diff --git a/src/test/kotlin/core/messaging/InMemoryMessagingTests.kt b/src/test/kotlin/core/messaging/InMemoryMessagingTests.kt new file mode 100644 index 0000000000..7a4a2d3ad1 --- /dev/null +++ b/src/test/kotlin/core/messaging/InMemoryMessagingTests.kt @@ -0,0 +1,119 @@ +/* + * Copyright 2015 Distributed Ledger Group LLC. Distributed as Licensed Company IP to DLG Group Members + * pursuant to the August 7, 2015 Advisory Services Agreement and subject to the Company IP License terms + * set forth therein. + * + * All other rights reserved. + */ + +@file:Suppress("UNUSED_VARIABLE") + +package core.messaging + +import org.junit.After +import org.junit.Before +import org.junit.Test +import java.util.* +import kotlin.test.assertEquals +import kotlin.test.assertFails +import kotlin.test.assertTrue + +class InMemoryMessagingTests { + val nodes: MutableMap = HashMap() + lateinit var network: InMemoryNetwork + + init { + // BriefLogFormatter.initVerbose() + } + + fun makeNode(): Pair { + // The manuallyPumped = true bit means that we must call the pump method on the system in order to + val (address, builder) = network.createNode(manuallyPumped = true) + val node = builder.start().get() + nodes[address] = node + return Pair(address, node) + } + + fun pumpAll() { + nodes.values.forEach { it.pump(false) } + } + + // Utilities to help define messaging rounds. + fun roundWithPumpings(times: Int, body: () -> Unit) { + body() + repeat(times) { pumpAll() } + } + + fun round(body: () -> Unit) = roundWithPumpings(1, body) + + @Before + fun before() { + network = InMemoryNetwork() + nodes.clear() + } + + @After + fun after() { + network.stop() + } + + @Test + fun topicStringValidation() { + TopicStringValidator.check("this.is.ok") + TopicStringValidator.check("this.is.OkAlso") + assertFails { + TopicStringValidator.check("this.is.not-ok") + } + assertFails { + TopicStringValidator.check("") + } + assertFails { + TopicStringValidator.check("this.is not ok") // Spaces + } + } + + @Test + fun basics() { + val (addr1, node1) = makeNode() + val (addr2, node2) = makeNode() + val (addr3, node3) = makeNode() + + val bits = "test-content".toByteArray() + var finalDelivery: Message? = null + + with(node2) { + addMessageHandler { + send(it, addr3) + } + } + + with(node3) { + addMessageHandler { + finalDelivery = it + } + } + + // Node 1 sends a message and it should end up in finalDelivery, after we pump each node. + roundWithPumpings(2) { + node1.send(node1.createMessage("test.topic", bits), addr2) + } + + assertTrue(Arrays.equals(finalDelivery!!.data, bits)) + } + + @Test + fun broadcast() { + val (addr1, node1) = makeNode() + val (addr2, node2) = makeNode() + val (addr3, node3) = makeNode() + + val bits = "test-content".toByteArray() + + var counter = 0 + listOf(node1, node2, node3).forEach { it.addMessageHandler { counter++ } } + round { + node1.send(node2.createMessage("test.topic", bits), network.entireNetwork) + } + assertEquals(3, counter) + } +} \ No newline at end of file