mirror of
https://github.com/corda/corda.git
synced 2025-02-21 09:51:57 +00:00
Messaging: Rename MessagingSystem to MessagingService for consistency with other interfaces.
This commit is contained in:
parent
3f19e68b3f
commit
f15e24e7be
@ -8,7 +8,7 @@
|
|||||||
|
|
||||||
package core
|
package core
|
||||||
|
|
||||||
import core.messaging.MessagingSystem
|
import core.messaging.MessagingService
|
||||||
import java.security.KeyPair
|
import java.security.KeyPair
|
||||||
import java.security.PrivateKey
|
import java.security.PrivateKey
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
@ -101,5 +101,5 @@ interface ServiceHub {
|
|||||||
val identityService: IdentityService
|
val identityService: IdentityService
|
||||||
val timestampingService: TimestamperService
|
val timestampingService: TimestamperService
|
||||||
val storageService: StorageService
|
val storageService: StorageService
|
||||||
val networkService: MessagingSystem // TODO: Rename class to be consistent.
|
val networkService: MessagingService // TODO: Rename class to be consistent.
|
||||||
}
|
}
|
@ -49,7 +49,7 @@ public class InMemoryNetwork {
|
|||||||
* executor.
|
* executor.
|
||||||
*/
|
*/
|
||||||
@Synchronized
|
@Synchronized
|
||||||
fun createNode(manuallyPumped: Boolean): Pair<Handle, MessagingSystemBuilder<Node>> {
|
fun createNode(manuallyPumped: Boolean): Pair<Handle, MessagingServiceBuilder<Node>> {
|
||||||
check(counter >= 0) { "In memory network stopped: please recreate. "}
|
check(counter >= 0) { "In memory network stopped: please recreate. "}
|
||||||
val builder = createNodeWithID(manuallyPumped, counter) as Builder
|
val builder = createNodeWithID(manuallyPumped, counter) as Builder
|
||||||
counter++
|
counter++
|
||||||
@ -58,7 +58,7 @@ public class InMemoryNetwork {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Creates a node at the given address: useful if you want to recreate a node to simulate a restart */
|
/** Creates a node at the given address: useful if you want to recreate a node to simulate a restart */
|
||||||
fun createNodeWithID(manuallyPumped: Boolean, id: Int): MessagingSystemBuilder<Node> {
|
fun createNodeWithID(manuallyPumped: Boolean, id: Int): MessagingServiceBuilder<Node> {
|
||||||
return Builder(manuallyPumped, Handle(id))
|
return Builder(manuallyPumped, Handle(id))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,7 +99,7 @@ public class InMemoryNetwork {
|
|||||||
messageQueues.clear()
|
messageQueues.clear()
|
||||||
}
|
}
|
||||||
|
|
||||||
inner class Builder(val manuallyPumped: Boolean, val id: Handle) : MessagingSystemBuilder<Node> {
|
inner class Builder(val manuallyPumped: Boolean, val id: Handle) : MessagingServiceBuilder<Node> {
|
||||||
override fun start(): ListenableFuture<Node> {
|
override fun start(): ListenableFuture<Node> {
|
||||||
synchronized(this@InMemoryNetwork) {
|
synchronized(this@InMemoryNetwork) {
|
||||||
val node = Node(manuallyPumped, id)
|
val node = Node(manuallyPumped, id)
|
||||||
@ -116,13 +116,13 @@ public class InMemoryNetwork {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An [Node] provides a [MessagingSystem] that isn't backed by any kind of network or disk storage
|
* An [Node] provides a [MessagingService] that isn't backed by any kind of network or disk storage
|
||||||
* system, but just uses regular queues on the heap instead. It is intended for unit testing and developer convenience
|
* system, but just uses regular queues on the heap instead. It is intended for unit testing and developer convenience
|
||||||
* when all entities on 'the network' are being simulated in-process.
|
* when all entities on 'the network' are being simulated in-process.
|
||||||
*
|
*
|
||||||
* An instance can be obtained by creating a builder and then using the start method.
|
* An instance can be obtained by creating a builder and then using the start method.
|
||||||
*/
|
*/
|
||||||
inner class Node(private val manuallyPumped: Boolean, private val handle: Handle): MessagingSystem {
|
inner class Node(private val manuallyPumped: Boolean, private val handle: Handle): MessagingService {
|
||||||
inner class Handler(val executor: Executor?, val topic: String, val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
|
inner class Handler(val executor: Executor?, val topic: String, val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
|
||||||
@GuardedBy("this")
|
@GuardedBy("this")
|
||||||
protected val handlers: MutableList<Handler> = ArrayList()
|
protected val handlers: MutableList<Handler> = ArrayList()
|
||||||
|
@ -15,7 +15,7 @@ import java.util.concurrent.Executor
|
|||||||
import javax.annotation.concurrent.ThreadSafe
|
import javax.annotation.concurrent.ThreadSafe
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A [MessagingSystem] sits at the boundary between a message routing / networking layer and the core platform code.
|
* A [MessagingService] sits at the boundary between a message routing / networking layer and the core platform code.
|
||||||
*
|
*
|
||||||
* A messaging system must provide the ability to send 1:many messages, potentially to an abstract "group", the
|
* A messaging system must provide the ability to send 1:many messages, potentially to an abstract "group", the
|
||||||
* membership of which is defined elsewhere. Messages are atomic and the system guarantees that a sent message
|
* membership of which is defined elsewhere. Messages are atomic and the system guarantees that a sent message
|
||||||
@ -25,12 +25,12 @@ import javax.annotation.concurrent.ThreadSafe
|
|||||||
* is *reliable* and as such messages may be stored to disk once queued.
|
* is *reliable* and as such messages may be stored to disk once queued.
|
||||||
*/
|
*/
|
||||||
@ThreadSafe
|
@ThreadSafe
|
||||||
interface MessagingSystem {
|
interface MessagingService {
|
||||||
/**
|
/**
|
||||||
* The provided function will be invoked for each received message whose topic matches the given string, on the given
|
* The provided function will be invoked for each received message whose topic matches the given string, on the given
|
||||||
* executor. The topic can be the empty string to match all messages.
|
* executor. The topic can be the empty string to match all messages.
|
||||||
*
|
*
|
||||||
* If no executor is received then the callback will run on threads provided by the messaging system, and the
|
* If no executor is received then the callback will run on threads provided by the messaging service, and the
|
||||||
* callback is expected to be thread safe as a result.
|
* callback is expected to be thread safe as a result.
|
||||||
*
|
*
|
||||||
* The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler].
|
* The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler].
|
||||||
@ -43,7 +43,7 @@ interface MessagingSystem {
|
|||||||
* Removes a handler given the object returned from [addMessageHandler]. The callback will no longer be invoked once
|
* Removes a handler given the object returned from [addMessageHandler]. The callback will no longer be invoked once
|
||||||
* this method has returned, although executions that are currently in flight will not be interrupted.
|
* this method has returned, although executions that are currently in flight will not be interrupted.
|
||||||
*
|
*
|
||||||
* @throws IllegalArgumentException if the given registration isn't valid for this messaging system.
|
* @throws IllegalArgumentException if the given registration isn't valid for this messaging service.
|
||||||
* @throws IllegalStateException if the given registration was already de-registered.
|
* @throws IllegalStateException if the given registration was already de-registered.
|
||||||
*/
|
*/
|
||||||
fun removeMessageHandler(registration: MessageHandlerRegistration)
|
fun removeMessageHandler(registration: MessageHandlerRegistration)
|
||||||
@ -70,27 +70,27 @@ interface MessagingSystem {
|
|||||||
/**
|
/**
|
||||||
* Registers a handler for the given topic that runs the given callback with the message and then removes itself. This
|
* Registers a handler for the given topic that runs the given callback with the message and then removes itself. This
|
||||||
* is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback doesn't
|
* is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback doesn't
|
||||||
* take the registration object, unlike the callback to [MessagingSystem.addMessageHandler].
|
* take the registration object, unlike the callback to [MessagingService.addMessageHandler].
|
||||||
*/
|
*/
|
||||||
fun MessagingSystem.runOnNextMessage(topic: String = "", executor: Executor? = null, callback: (Message) -> Unit) {
|
fun MessagingService.runOnNextMessage(topic: String = "", executor: Executor? = null, callback: (Message) -> Unit) {
|
||||||
addMessageHandler(topic, executor) { msg, reg ->
|
addMessageHandler(topic, executor) { msg, reg ->
|
||||||
callback(msg)
|
callback(msg)
|
||||||
removeMessageHandler(reg)
|
removeMessageHandler(reg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fun MessagingSystem.send(topic: String, to: MessageRecipients, obj: Any) = send(createMessage(topic, obj.serialize()), to)
|
fun MessagingService.send(topic: String, to: MessageRecipients, obj: Any) = send(createMessage(topic, obj.serialize()), to)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class lets you start up a [MessagingSystem]. Its purpose is to stop you from getting access to the methods
|
* This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods
|
||||||
* on the messaging system interface until you have successfully started up the system. One of these objects should
|
* on the messaging service interface until you have successfully started up the system. One of these objects should
|
||||||
* be the only way to obtain a reference to a [MessagingSystem]. Startup may be a slow process: some implementations
|
* be the only way to obtain a reference to a [MessagingService]. Startup may be a slow process: some implementations
|
||||||
* may let you cast the returned future to an object that lets you get status info.
|
* may let you cast the returned future to an object that lets you get status info.
|
||||||
*
|
*
|
||||||
* A specific implementation of the controller class will have extra features that let you customise it before starting
|
* A specific implementation of the controller class will have extra features that let you customise it before starting
|
||||||
* it up.
|
* it up.
|
||||||
*/
|
*/
|
||||||
interface MessagingSystemBuilder<out T : MessagingSystem> {
|
interface MessagingServiceBuilder<out T : MessagingService> {
|
||||||
fun start(): ListenableFuture<out T>
|
fun start(): ListenableFuture<out T>
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,7 +124,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
|||||||
checkpointsMap[SecureHash.sha256(new)] = new
|
checkpointsMap[SecureHash.sha256(new)] = new
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun iterateStateMachine(c: Continuation, net: MessagingSystem, otherSide: MessageRecipients,
|
private fun iterateStateMachine(c: Continuation, net: MessagingService, otherSide: MessageRecipients,
|
||||||
continuationInput: Any?, logger: Logger,
|
continuationInput: Any?, logger: Logger,
|
||||||
prevPersistedBytes: ByteArray?): Continuation {
|
prevPersistedBytes: ByteArray?): Continuation {
|
||||||
// This will resume execution of the run() function inside the continuation at the place it left off.
|
// This will resume execution of the run() function inside the continuation at the place it left off.
|
||||||
@ -165,7 +165,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun setupNextMessageHandler(logger: Logger, net: MessagingSystem, nextState: Continuation,
|
private fun setupNextMessageHandler(logger: Logger, net: MessagingService, nextState: Continuation,
|
||||||
otherSide: MessageRecipients, responseType: Class<*>,
|
otherSide: MessageRecipients, responseType: Class<*>,
|
||||||
topic: String, prevPersistedBytes: ByteArray?) {
|
topic: String, prevPersistedBytes: ByteArray?) {
|
||||||
val checkpoint = Checkpoint(nextState, otherSide, logger.name, topic, responseType.name)
|
val checkpoint = Checkpoint(nextState, otherSide, logger.name, topic, responseType.name)
|
||||||
|
@ -13,7 +13,7 @@ package core.testutils
|
|||||||
import com.google.common.io.BaseEncoding
|
import com.google.common.io.BaseEncoding
|
||||||
import contracts.*
|
import contracts.*
|
||||||
import core.*
|
import core.*
|
||||||
import core.messaging.MessagingSystem
|
import core.messaging.MessagingService
|
||||||
import core.visualiser.GraphVisualiser
|
import core.visualiser.GraphVisualiser
|
||||||
import java.io.ByteArrayInputStream
|
import java.io.ByteArrayInputStream
|
||||||
import java.io.ByteArrayOutputStream
|
import java.io.ByteArrayOutputStream
|
||||||
@ -121,7 +121,7 @@ class MockStorageService : StorageService {
|
|||||||
class MockServices(
|
class MockServices(
|
||||||
val wallet: WalletService?,
|
val wallet: WalletService?,
|
||||||
val keyManagement: KeyManagementService?,
|
val keyManagement: KeyManagementService?,
|
||||||
val net: MessagingSystem?,
|
val net: MessagingService?,
|
||||||
val identity: IdentityService? = MockIdentityService,
|
val identity: IdentityService? = MockIdentityService,
|
||||||
val storage: StorageService? = MockStorageService(),
|
val storage: StorageService? = MockStorageService(),
|
||||||
val timestamping: TimestamperService? = DUMMY_TIMESTAMPER
|
val timestamping: TimestamperService? = DUMMY_TIMESTAMPER
|
||||||
@ -134,7 +134,7 @@ class MockServices(
|
|||||||
get() = identity ?: throw UnsupportedOperationException()
|
get() = identity ?: throw UnsupportedOperationException()
|
||||||
override val timestampingService: TimestamperService
|
override val timestampingService: TimestamperService
|
||||||
get() = timestamping ?: throw UnsupportedOperationException()
|
get() = timestamping ?: throw UnsupportedOperationException()
|
||||||
override val networkService: MessagingSystem
|
override val networkService: MessagingService
|
||||||
get() = net ?: throw UnsupportedOperationException()
|
get() = net ?: throw UnsupportedOperationException()
|
||||||
override val storageService: StorageService
|
override val storageService: StorageService
|
||||||
get() = storage ?: throw UnsupportedOperationException()
|
get() = storage ?: throw UnsupportedOperationException()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user