diff --git a/src/main/kotlin/core/Services.kt b/src/main/kotlin/core/Services.kt index c8f185bd64..eee9ff96f3 100644 --- a/src/main/kotlin/core/Services.kt +++ b/src/main/kotlin/core/Services.kt @@ -8,7 +8,7 @@ package core -import core.messaging.MessagingSystem +import core.messaging.MessagingService import java.security.KeyPair import java.security.PrivateKey import java.security.PublicKey @@ -101,5 +101,5 @@ interface ServiceHub { val identityService: IdentityService val timestampingService: TimestamperService val storageService: StorageService - val networkService: MessagingSystem // TODO: Rename class to be consistent. + val networkService: MessagingService // TODO: Rename class to be consistent. } \ No newline at end of file diff --git a/src/main/kotlin/core/messaging/InMemoryNetwork.kt b/src/main/kotlin/core/messaging/InMemoryNetwork.kt index 7511c9e25c..7f8b65af22 100644 --- a/src/main/kotlin/core/messaging/InMemoryNetwork.kt +++ b/src/main/kotlin/core/messaging/InMemoryNetwork.kt @@ -49,7 +49,7 @@ public class InMemoryNetwork { * executor. */ @Synchronized - fun createNode(manuallyPumped: Boolean): Pair> { + fun createNode(manuallyPumped: Boolean): Pair> { check(counter >= 0) { "In memory network stopped: please recreate. "} val builder = createNodeWithID(manuallyPumped, counter) as Builder counter++ @@ -58,7 +58,7 @@ public class InMemoryNetwork { } /** Creates a node at the given address: useful if you want to recreate a node to simulate a restart */ - fun createNodeWithID(manuallyPumped: Boolean, id: Int): MessagingSystemBuilder { + fun createNodeWithID(manuallyPumped: Boolean, id: Int): MessagingServiceBuilder { return Builder(manuallyPumped, Handle(id)) } @@ -99,7 +99,7 @@ public class InMemoryNetwork { messageQueues.clear() } - inner class Builder(val manuallyPumped: Boolean, val id: Handle) : MessagingSystemBuilder { + inner class Builder(val manuallyPumped: Boolean, val id: Handle) : MessagingServiceBuilder { override fun start(): ListenableFuture { synchronized(this@InMemoryNetwork) { val node = Node(manuallyPumped, id) @@ -116,13 +116,13 @@ public class InMemoryNetwork { } /** - * An [Node] provides a [MessagingSystem] that isn't backed by any kind of network or disk storage + * An [Node] provides a [MessagingService] that isn't backed by any kind of network or disk storage * system, but just uses regular queues on the heap instead. It is intended for unit testing and developer convenience * when all entities on 'the network' are being simulated in-process. * * An instance can be obtained by creating a builder and then using the start method. */ - inner class Node(private val manuallyPumped: Boolean, private val handle: Handle): MessagingSystem { + inner class Node(private val manuallyPumped: Boolean, private val handle: Handle): MessagingService { inner class Handler(val executor: Executor?, val topic: String, val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration @GuardedBy("this") protected val handlers: MutableList = ArrayList() diff --git a/src/main/kotlin/core/messaging/Messaging.kt b/src/main/kotlin/core/messaging/Messaging.kt index 74b1e4c4ca..2e523102f6 100644 --- a/src/main/kotlin/core/messaging/Messaging.kt +++ b/src/main/kotlin/core/messaging/Messaging.kt @@ -15,7 +15,7 @@ import java.util.concurrent.Executor import javax.annotation.concurrent.ThreadSafe /** - * A [MessagingSystem] sits at the boundary between a message routing / networking layer and the core platform code. + * A [MessagingService] sits at the boundary between a message routing / networking layer and the core platform code. * * A messaging system must provide the ability to send 1:many messages, potentially to an abstract "group", the * membership of which is defined elsewhere. Messages are atomic and the system guarantees that a sent message @@ -25,12 +25,12 @@ import javax.annotation.concurrent.ThreadSafe * is *reliable* and as such messages may be stored to disk once queued. */ @ThreadSafe -interface MessagingSystem { +interface MessagingService { /** * The provided function will be invoked for each received message whose topic matches the given string, on the given * executor. The topic can be the empty string to match all messages. * - * If no executor is received then the callback will run on threads provided by the messaging system, and the + * If no executor is received then the callback will run on threads provided by the messaging service, and the * callback is expected to be thread safe as a result. * * The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler]. @@ -43,7 +43,7 @@ interface MessagingSystem { * Removes a handler given the object returned from [addMessageHandler]. The callback will no longer be invoked once * this method has returned, although executions that are currently in flight will not be interrupted. * - * @throws IllegalArgumentException if the given registration isn't valid for this messaging system. + * @throws IllegalArgumentException if the given registration isn't valid for this messaging service. * @throws IllegalStateException if the given registration was already de-registered. */ fun removeMessageHandler(registration: MessageHandlerRegistration) @@ -70,27 +70,27 @@ interface MessagingSystem { /** * Registers a handler for the given topic that runs the given callback with the message and then removes itself. This * is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback doesn't - * take the registration object, unlike the callback to [MessagingSystem.addMessageHandler]. + * take the registration object, unlike the callback to [MessagingService.addMessageHandler]. */ -fun MessagingSystem.runOnNextMessage(topic: String = "", executor: Executor? = null, callback: (Message) -> Unit) { +fun MessagingService.runOnNextMessage(topic: String = "", executor: Executor? = null, callback: (Message) -> Unit) { addMessageHandler(topic, executor) { msg, reg -> callback(msg) removeMessageHandler(reg) } } -fun MessagingSystem.send(topic: String, to: MessageRecipients, obj: Any) = send(createMessage(topic, obj.serialize()), to) +fun MessagingService.send(topic: String, to: MessageRecipients, obj: Any) = send(createMessage(topic, obj.serialize()), to) /** - * This class lets you start up a [MessagingSystem]. Its purpose is to stop you from getting access to the methods - * on the messaging system interface until you have successfully started up the system. One of these objects should - * be the only way to obtain a reference to a [MessagingSystem]. Startup may be a slow process: some implementations + * This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods + * on the messaging service interface until you have successfully started up the system. One of these objects should + * be the only way to obtain a reference to a [MessagingService]. Startup may be a slow process: some implementations * may let you cast the returned future to an object that lets you get status info. * * A specific implementation of the controller class will have extra features that let you customise it before starting * it up. */ -interface MessagingSystemBuilder { +interface MessagingServiceBuilder { fun start(): ListenableFuture } diff --git a/src/main/kotlin/core/messaging/StateMachines.kt b/src/main/kotlin/core/messaging/StateMachines.kt index 398253e96f..409be67968 100644 --- a/src/main/kotlin/core/messaging/StateMachines.kt +++ b/src/main/kotlin/core/messaging/StateMachines.kt @@ -124,7 +124,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor) checkpointsMap[SecureHash.sha256(new)] = new } - private fun iterateStateMachine(c: Continuation, net: MessagingSystem, otherSide: MessageRecipients, + private fun iterateStateMachine(c: Continuation, net: MessagingService, otherSide: MessageRecipients, continuationInput: Any?, logger: Logger, prevPersistedBytes: ByteArray?): Continuation { // This will resume execution of the run() function inside the continuation at the place it left off. @@ -165,7 +165,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor) } } - private fun setupNextMessageHandler(logger: Logger, net: MessagingSystem, nextState: Continuation, + private fun setupNextMessageHandler(logger: Logger, net: MessagingService, nextState: Continuation, otherSide: MessageRecipients, responseType: Class<*>, topic: String, prevPersistedBytes: ByteArray?) { val checkpoint = Checkpoint(nextState, otherSide, logger.name, topic, responseType.name) diff --git a/src/test/kotlin/core/testutils/TestUtils.kt b/src/test/kotlin/core/testutils/TestUtils.kt index 38714fe69b..d773365002 100644 --- a/src/test/kotlin/core/testutils/TestUtils.kt +++ b/src/test/kotlin/core/testutils/TestUtils.kt @@ -13,7 +13,7 @@ package core.testutils import com.google.common.io.BaseEncoding import contracts.* import core.* -import core.messaging.MessagingSystem +import core.messaging.MessagingService import core.visualiser.GraphVisualiser import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream @@ -121,7 +121,7 @@ class MockStorageService : StorageService { class MockServices( val wallet: WalletService?, val keyManagement: KeyManagementService?, - val net: MessagingSystem?, + val net: MessagingService?, val identity: IdentityService? = MockIdentityService, val storage: StorageService? = MockStorageService(), val timestamping: TimestamperService? = DUMMY_TIMESTAMPER @@ -134,7 +134,7 @@ class MockServices( get() = identity ?: throw UnsupportedOperationException() override val timestampingService: TimestamperService get() = timestamping ?: throw UnsupportedOperationException() - override val networkService: MessagingSystem + override val networkService: MessagingService get() = net ?: throw UnsupportedOperationException() override val storageService: StorageService get() = storage ?: throw UnsupportedOperationException()