mirror of
synced 2025-03-04 05:02:38 +00:00
Remove Messaging service from service hub
Remove mention of MessagingService as being on ServiceHub.
This commit is contained in:
@ -1,205 +1,5 @@
package net.corda.core.messaging
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.catch
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.DeserializeAsKotlinObjectDef
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import org.bouncycastle.asn1.x500.X500Name
import java.time.Instant
import java.util.*
import java.util.concurrent.atomic.AtomicBoolean
import javax.annotation.concurrent.ThreadSafe
* A [MessagingService] sits at the boundary between a message routing / networking layer and the core platform code.
* A messaging system must provide the ability to send 1:many messages, potentially to an abstract "group", the
* membership of which is defined elsewhere. Messages are atomic and the system guarantees that a sent message
* _eventually_ will arrive in the exact form it was sent, however, messages can be arbitrarily re-ordered or delayed.
* Example implementations might be a custom P2P layer, Akka, Apache Kafka, etc. It is assumed that the message layer
* is *reliable* and as such messages may be stored to disk once queued.
interface MessagingService {
* The provided function will be invoked for each received message whose topic matches the given string. The callback
* will run on threads provided by the messaging service, and the callback is expected to be thread safe as a result.
* The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler].
* The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister
* itself and yet addMessageHandler hasn't returned the handle yet.
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
* The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]).
* @param sessionID identifier for the session the message is part of. For services listening before
* a session is established, use [DEFAULT_SESSION_ID].
fun addMessageHandler(topic: String = "", sessionID: Long = DEFAULT_SESSION_ID, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
* The provided function will be invoked for each received message whose topic and session matches. The callback
* will run on the main server thread provided when the messaging service is constructed, and a database
* transaction is set up for you automatically.
* The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler].
* The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister
* itself and yet addMessageHandler hasn't returned the handle yet.
* @param topicSession identifier for the topic and session to listen for messages arriving on.
fun addMessageHandler(topicSession: TopicSession, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
* Removes a handler given the object returned from [addMessageHandler]. The callback will no longer be invoked once
* this method has returned, although executions that are currently in flight will not be interrupted.
* @throws IllegalArgumentException if the given registration isn't valid for this messaging service.
* @throws IllegalStateException if the given registration was already de-registered.
fun removeMessageHandler(registration: MessageHandlerRegistration)
* Sends a message to the given receiver. The details of how receivers are identified is up to the messaging
* implementation: the type system provides an opaque high level view, with more fine grained control being
* available via type casting. Once this function returns the message is queued for delivery but not necessarily
* delivered: if the recipients are offline then the message could be queued hours or days later.
* There is no way to know if a message has been received. If your flow requires this, you need the recipient
* to send an ACK message back.
fun send(message: Message, target: MessageRecipients)
* Returns an initialised [Message] with the current time, etc, already filled in.
* @param topicSession identifier for the topic and session the message is sent to.
fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID = UUID.randomUUID()): Message
/** Given information about either a specific node or a service returns its corresponding address */
fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients
/** Returns an address that refers to this node. */
val myAddress: SingleMessageRecipient
* Returns an initialised [Message] with the current time, etc, already filled in.
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
* Must not be blank.
* @param sessionID identifier for the session the message is part of. For messages sent to services before the
* construction of a session, use [DEFAULT_SESSION_ID].
fun MessagingService.createMessage(topic: String, sessionID: Long = DEFAULT_SESSION_ID, data: ByteArray): Message
= createMessage(TopicSession(topic, sessionID), data)
* Registers a handler for the given topic and session ID that runs the given callback with the message and then removes
* itself. This is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback
* doesn't take the registration object, unlike the callback to [MessagingService.addMessageHandler], as the handler is
* automatically deregistered before the callback runs.
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
* The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]).
* @param sessionID identifier for the session the message is part of. For services listening before
* a session is established, use [DEFAULT_SESSION_ID].
fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, callback: (ReceivedMessage) -> Unit)
= runOnNextMessage(TopicSession(topic, sessionID), callback)
* Registers a handler for the given topic and session that runs the given callback with the message and then removes
* itself. This is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback
* doesn't take the registration object, unlike the callback to [MessagingService.addMessageHandler].
* @param topicSession identifier for the topic and session to listen for messages arriving on.
inline fun MessagingService.runOnNextMessage(topicSession: TopicSession, crossinline callback: (ReceivedMessage) -> Unit) {
val consumed = AtomicBoolean()
addMessageHandler(topicSession) { msg, reg ->
check(!consumed.getAndSet(true)) { "Called more than once" }
check(msg.topicSession == topicSession) { "Topic/session mismatch: ${msg.topicSession} vs $topicSession" }
* Returns a [ListenableFuture] of the next message payload ([Message.data]) which is received on the given topic and sessionId.
* The payload is deserialized to an object of type [M]. Any exceptions thrown will be captured by the future.
fun <M : Any> MessagingService.onNext(topic: String, sessionId: Long): ListenableFuture<M> {
val messageFuture = SettableFuture.create<M>()
runOnNextMessage(topic, sessionId) { message ->
messageFuture.catch {
return messageFuture
fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID())
= send(TopicSession(topic, sessionID), payload, to, uuid)
fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID())
= send(createMessage(topicSession, payload.serialize().bytes, uuid), to)
interface MessageHandlerRegistration
* An identifier for the endpoint [MessagingService] message handlers listen at.
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
* The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]).
* @param sessionID identifier for the session the message is part of. For services listening before
* a session is established, use [DEFAULT_SESSION_ID].
data class TopicSession(val topic: String, val sessionID: Long = DEFAULT_SESSION_ID) {
fun isBlank() = topic.isBlank() && sessionID == DEFAULT_SESSION_ID
override fun toString(): String = "$topic.$sessionID"
* A message is defined, at this level, to be a (topic, timestamp, byte arrays) triple, where the topic is a string in
* Java-style reverse dns form, with "platform." being a prefix reserved by the platform for its own use. Vendor
* specific messages can be defined, but use your domain name as the prefix e.g. "uk.co.bigbank.messages.SomeMessage".
* The debugTimestamp field is intended to aid in tracking messages as they flow across the network, likewise, the
* message ID is intended to be an ad-hoc way to identify a message sent in the system through debug logs and so on.
* These IDs and timestamps should not be assumed to be globally unique, although due to the nanosecond precision of
* the timestamp field they probably will be, even if an implementation just uses a hash prefix as the message id.
interface Message {
val topicSession: TopicSession
val data: ByteArray
val debugTimestamp: Instant
val uniqueMessageId: UUID
// TODO Have ReceivedMessage point to the TLS certificate of the peer, and [peer] would simply be the subject DN of that.
// The certificate would need to be serialised into the message header or just its fingerprint and then download it via RPC,
// or something like that.
interface ReceivedMessage : Message {
/** The authenticated sender. */
val peer: X500Name
/** Platform version of the sender's node. */
val platformVersion: Int
/** A singleton that's useful for validating topic strings */
object TopicStringValidator {
private val regex = "[a-zA-Z0-9.]+".toPattern()
/** @throws IllegalArgumentException if the given topic contains invalid characters */
fun check(tag: String) = require(regex.matcher(tag).matches())
/** The interface for a group of message recipients (which may contain only one recipient) */
interface MessageRecipients
@ -212,10 +12,3 @@ interface MessageRecipientGroup : MessageRecipients
/** A special base class for the set of all possible recipients, without having to identify who they all are. */
interface AllPossibleRecipients : MessageRecipients
* A general Ack message that conveys no content other than it's presence for use when you want an acknowledgement
* from a recipient. Using [Unit] can be ambiguous as it is similar to [Void] and so could mean no response.
object Ack : DeserializeAsKotlinObjectDef
@ -2,10 +2,6 @@ package net.corda.core.node
import net.corda.core.contracts.*
import net.corda.core.crypto.keys
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStateMachine
import net.corda.core.messaging.MessagingService
import net.corda.core.node.services.*
import net.corda.core.transactions.SignedTransaction
import java.security.KeyPair
@ -41,7 +37,6 @@ interface ServicesForResolution {
interface ServiceHub : ServicesForResolution {
val vaultService: VaultService
val keyManagementService: KeyManagementService
val networkService: MessagingService
override val storageService: StorageService
val networkMapCache: NetworkMapCache
val schedulerService: SchedulerService
@ -1,11 +1,8 @@
package net.corda.core.node.services
import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.contracts.Contract
import net.corda.core.crypto.Party
import net.corda.core.messaging.MessagingService
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NodeInfo
import net.corda.core.randomOrNull
import net.corda.core.serialization.CordaSerializable
@ -119,39 +116,4 @@ interface NetworkMapCache {
"Your options are: ${notaryNodes.map { "\"${it.notaryIdentity.name}\"" }.joinToString()}.")
return notary.advertisedServices.any { it.info.type.isValidatingNotary() }
* Add a network map service; fetches a copy of the latest map from the service and subscribes to any further
* updates.
* @param net the network messaging service.
* @param networkMapAddress the network map service to fetch current state from.
* @param subscribe if the cache should subscribe to updates.
* @param ifChangedSinceVer an optional version number to limit updating the map based on. If the latest map
* version is less than or equal to the given version, no update is fetched.
fun addMapService(net: MessagingService, networkMapAddress: SingleMessageRecipient,
subscribe: Boolean, ifChangedSinceVer: Int? = null): ListenableFuture<Unit>
/** Adds a node to the local cache (generally only used for adding ourselves). */
fun addNode(node: NodeInfo)
/** Removes a node from the local cache. */
fun removeNode(node: NodeInfo)
* Deregister from updates from the given map service.
* @param net the network messaging service.
* @param service the network map service to fetch current state from.
fun deregisterForUpdates(net: MessagingService, service: NodeInfo): ListenableFuture<Unit>
/** For testing where the network map cache is manipulated marks the service as immediately ready. */
fun runWithoutMapService()
sealed class NetworkCacheError : Exception() {
/** Indicates a failure to deregister, because of a rejected request from the remote node */
class DeregistrationFailed : NetworkCacheError()
@ -3,7 +3,7 @@ package net.corda.core.serialization
import com.esotericsoftware.kryo.Kryo
import com.google.common.primitives.Ints
import net.corda.core.crypto.*
import net.corda.core.messaging.Ack
import net.corda.node.services.messaging.Ack
import net.corda.node.services.persistence.NodeAttachmentService
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
@ -109,12 +109,3 @@ the validated user is the username itself and the RPC framework uses this to det
The broker also does host verification when connecting to another peer. It checks that the TLS certificate common name
matches with the advertised legal name from the network map service.
Messaging types
Every ``Message`` object has an associated *topic* and may have a *session ID*. These are wrapped in a ``TopicSession``.
An implementation of ``MessagingService`` can be used to create messages and send them. You can get access to the
messaging service via the ``ServiceHub`` object that is provided to your app. Endpoints on the network are
identified at the lowest level using ``SingleMessageRecipient`` which may be e.g. an IP address, or in future
versions perhaps a routing path through the network.
@ -5,16 +5,16 @@ import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.*
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.messaging.createMessage
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.node.services.ServiceInfo
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.*
import net.corda.flows.ServiceRequestMessage
import net.corda.flows.sendRequest
import net.corda.node.internal.Node
import net.corda.node.services.messaging.ServiceRequestMessage
import net.corda.node.services.messaging.createMessage
import net.corda.node.services.messaging.sendRequest
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.node.utilities.ServiceIdentityGenerator
@ -8,9 +8,9 @@ import net.corda.core.node.NodeInfo
import net.corda.core.random63BitValue
import net.corda.core.seconds
import net.corda.core.utilities.BOB
import net.corda.flows.sendRequest
import net.corda.node.internal.NetworkMapInfo
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.messaging.sendRequest
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.network.NetworkMapService.RegistrationRequest
import net.corda.node.services.network.NodeRegistration
@ -32,6 +32,8 @@ import net.corda.node.services.events.NodeSchedulerService
import net.corda.node.services.events.ScheduledActivityObserver
import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.sendRequest
import net.corda.node.services.network.InMemoryNetworkMapCache
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.network.NetworkMapService.RegistrationResponse
@ -112,8 +114,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
protected val partyKeys = mutableSetOf<KeyPair>()
val services = object : ServiceHubInternal() {
override val networkService: MessagingServiceInternal get() = net
override val networkMapCache: NetworkMapCache get() = netMapCache
override val networkService: MessagingService get() = net
override val networkMapCache: NetworkMapCacheInternal get() = netMapCache
override val storageService: TxWritableStorageService get() = storage
override val vaultService: VaultService get() = vault
override val keyManagementService: KeyManagementService get() = keyManagement
@ -162,8 +164,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
var inNodeNetworkMapService: NetworkMapService? = null
lateinit var txVerifierService: TransactionVerifierService
lateinit var identity: IdentityService
lateinit var net: MessagingServiceInternal
lateinit var netMapCache: NetworkMapCache
lateinit var net: MessagingService
lateinit var netMapCache: NetworkMapCacheInternal
lateinit var scheduler: NodeSchedulerService
lateinit var flowLogicFactory: FlowLogicRefFactory
lateinit var schemas: SchemaService
@ -528,7 +530,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
protected abstract fun makeMessagingService(): MessagingServiceInternal
protected abstract fun makeMessagingService(): MessagingService
protected abstract fun startMessagingService(rpcOps: RPCOps)
@ -19,9 +19,9 @@ import net.corda.node.printBasicNodeInfo
import net.corda.node.serialization.NodeClock
import net.corda.node.services.RPCUserService
import net.corda.node.services.RPCUserServiceImpl
import net.corda.node.services.api.MessagingServiceInternal
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.transactions.RaftNonValidatingNotaryService
@ -109,7 +109,7 @@ class Node(override val configuration: FullNodeConfiguration,
private lateinit var userService: RPCUserService
override fun makeMessagingService(): MessagingServiceInternal {
override fun makeMessagingService(): MessagingService {
userService = RPCUserServiceImpl(configuration.rpcUsers)
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker()
val myIdentityOrNullIfNetworkMapService = if (networkMapAddress != null) obtainLegalIdentity().owningKey else null
@ -1,13 +1,10 @@
package net.corda.node.services.api
import net.corda.core.messaging.Message
import net.corda.core.messaging.MessageHandlerRegistration
import net.corda.core.messaging.createMessage
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.flows.ServiceRequestMessage
import net.corda.node.services.messaging.*
import javax.annotation.concurrent.ThreadSafe
@ -16,7 +13,7 @@ import javax.annotation.concurrent.ThreadSafe
abstract class AbstractNodeService(val services: ServiceHubInternal) : SingletonSerializeAsToken() {
val net: MessagingServiceInternal get() = services.networkService
val net: MessagingService get() = services.networkService
* Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of
@ -6,38 +6,56 @@ import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.flows.FlowStateMachine
import net.corda.core.messaging.MessagingService
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NodeInfo
import net.corda.core.node.PluginServiceHub
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.TxWritableStorageService
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.ServiceFlowInfo
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.statemachine.FlowStateMachineImpl
interface MessagingServiceInternal : MessagingService {
interface NetworkMapCacheInternal : NetworkMapCache {
* Initiates shutdown: if called from a thread that isn't controlled by the executor passed to the constructor
* then this will block until all in-flight messages have finished being handled and acknowledged. If called
* from a thread that's a part of the [net.corda.node.utilities.AffinityExecutor] given to the constructor,
* it returns immediately and shutdown is asynchronous.
* Deregister from updates from the given map service.
* @param net the network messaging service.
* @param service the network map service to fetch current state from.
fun stop()
fun deregisterForUpdates(net: MessagingService, service: NodeInfo): ListenableFuture<Unit>
* Add a network map service; fetches a copy of the latest map from the service and subscribes to any further
* updates.
* @param net the network messaging service.
* @param networkMapAddress the network map service to fetch current state from.
* @param subscribe if the cache should subscribe to updates.
* @param ifChangedSinceVer an optional version number to limit updating the map based on. If the latest map
* version is less than or equal to the given version, no update is fetched.
fun addMapService(net: MessagingService, networkMapAddress: SingleMessageRecipient,
subscribe: Boolean, ifChangedSinceVer: Int? = null): ListenableFuture<Unit>
/** Adds a node to the local cache (generally only used for adding ourselves). */
fun addNode(node: NodeInfo)
/** Removes a node from the local cache. */
fun removeNode(node: NodeInfo)
/** For testing where the network map cache is manipulated marks the service as immediately ready. */
fun runWithoutMapService()
* This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods
* on the messaging service interface until you have successfully started up the system. One of these objects should
* be the only way to obtain a reference to a [MessagingService]. Startup may be a slow process: some implementations
* may let you cast the returned future to an object that lets you get status info.
* A specific implementation of the controller class will have extra features that let you customise it before starting
* it up.
interface MessagingServiceBuilder<out T : MessagingServiceInternal> {
fun start(): ListenableFuture<out T>
sealed class NetworkCacheError : Exception() {
/** Indicates a failure to deregister, because of a rejected request from the remote node */
class DeregistrationFailed : NetworkCacheError()
abstract class ServiceHubInternal : PluginServiceHub {
companion object {
private val log = loggerFor<ServiceHubInternal>()
@ -46,8 +64,9 @@ abstract class ServiceHubInternal : PluginServiceHub {
abstract val monitoringService: MonitoringService
abstract val flowLogicRefFactory: FlowLogicRefFactory
abstract val schemaService: SchemaService
abstract override val networkMapCache: NetworkMapCacheInternal
abstract override val networkService: MessagingServiceInternal
abstract val networkService: MessagingService
* Given a list of [SignedTransaction]s, writes them to the given storage for validated transactions and then
@ -0,0 +1,232 @@
package net.corda.node.services.messaging
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.catch
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.DeserializeAsKotlinObjectDef
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import org.bouncycastle.asn1.x500.X500Name
import java.time.Instant
import java.util.*
import java.util.concurrent.atomic.AtomicBoolean
import javax.annotation.concurrent.ThreadSafe
* A [MessagingService] sits at the boundary between a message routing / networking layer and the core platform code.
* A messaging system must provide the ability to send 1:many messages, potentially to an abstract "group", the
* membership of which is defined elsewhere. Messages are atomic and the system guarantees that a sent message
* _eventually_ will arrive in the exact form it was sent, however, messages can be arbitrarily re-ordered or delayed.
* Example implementations might be a custom P2P layer, Akka, Apache Kafka, etc. It is assumed that the message layer
* is *reliable* and as such messages may be stored to disk once queued.
interface MessagingService {
* The provided function will be invoked for each received message whose topic matches the given string. The callback
* will run on threads provided by the messaging service, and the callback is expected to be thread safe as a result.
* The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler].
* The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister
* itself and yet addMessageHandler hasn't returned the handle yet.
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
* The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]).
* @param sessionID identifier for the session the message is part of. For services listening before
* a session is established, use [DEFAULT_SESSION_ID].
fun addMessageHandler(topic: String = "", sessionID: Long = DEFAULT_SESSION_ID, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
* The provided function will be invoked for each received message whose topic and session matches. The callback
* will run on the main server thread provided when the messaging service is constructed, and a database
* transaction is set up for you automatically.
* The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler].
* The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister
* itself and yet addMessageHandler hasn't returned the handle yet.
* @param topicSession identifier for the topic and session to listen for messages arriving on.
fun addMessageHandler(topicSession: TopicSession, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
* Removes a handler given the object returned from [addMessageHandler]. The callback will no longer be invoked once
* this method has returned, although executions that are currently in flight will not be interrupted.
* @throws IllegalArgumentException if the given registration isn't valid for this messaging service.
* @throws IllegalStateException if the given registration was already de-registered.
fun removeMessageHandler(registration: MessageHandlerRegistration)
* Sends a message to the given receiver. The details of how receivers are identified is up to the messaging
* implementation: the type system provides an opaque high level view, with more fine grained control being
* available via type casting. Once this function returns the message is queued for delivery but not necessarily
* delivered: if the recipients are offline then the message could be queued hours or days later.
* There is no way to know if a message has been received. If your flow requires this, you need the recipient
* to send an ACK message back.
fun send(message: Message, target: MessageRecipients)
* Returns an initialised [Message] with the current time, etc, already filled in.
* @param topicSession identifier for the topic and session the message is sent to.
fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID = UUID.randomUUID()): Message
/** Given information about either a specific node or a service returns its corresponding address */
fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients
/** Returns an address that refers to this node. */
val myAddress: SingleMessageRecipient
* Initiates shutdown: if called from a thread that isn't controlled by the executor passed to the constructor
* then this will block until all in-flight messages have finished being handled and acknowledged. If called
* from a thread that's a part of the [net.corda.node.utilities.AffinityExecutor] given to the constructor,
* it returns immediately and shutdown is asynchronous.
fun stop()
* Returns an initialised [Message] with the current time, etc, already filled in.
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
* Must not be blank.
* @param sessionID identifier for the session the message is part of. For messages sent to services before the
* construction of a session, use [DEFAULT_SESSION_ID].
fun MessagingService.createMessage(topic: String, sessionID: Long = DEFAULT_SESSION_ID, data: ByteArray): Message
= createMessage(TopicSession(topic, sessionID), data)
* Registers a handler for the given topic and session ID that runs the given callback with the message and then removes
* itself. This is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback
* doesn't take the registration object, unlike the callback to [MessagingService.addMessageHandler], as the handler is
* automatically deregistered before the callback runs.
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
* The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]).
* @param sessionID identifier for the session the message is part of. For services listening before
* a session is established, use [DEFAULT_SESSION_ID].
fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, callback: (ReceivedMessage) -> Unit)
= runOnNextMessage(TopicSession(topic, sessionID), callback)
* Registers a handler for the given topic and session that runs the given callback with the message and then removes
* itself. This is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback
* doesn't take the registration object, unlike the callback to [MessagingService.addMessageHandler].
* @param topicSession identifier for the topic and session to listen for messages arriving on.
inline fun MessagingService.runOnNextMessage(topicSession: TopicSession, crossinline callback: (ReceivedMessage) -> Unit) {
val consumed = AtomicBoolean()
addMessageHandler(topicSession) { msg, reg ->
check(!consumed.getAndSet(true)) { "Called more than once" }
check(msg.topicSession == topicSession) { "Topic/session mismatch: ${msg.topicSession} vs $topicSession" }
* Returns a [ListenableFuture] of the next message payload ([Message.data]) which is received on the given topic and sessionId.
* The payload is deserialized to an object of type [M]. Any exceptions thrown will be captured by the future.
fun <M : Any> MessagingService.onNext(topic: String, sessionId: Long): ListenableFuture<M> {
val messageFuture = SettableFuture.create<M>()
runOnNextMessage(topic, sessionId) { message ->
messageFuture.catch {
return messageFuture
fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID())
= send(TopicSession(topic, sessionID), payload, to, uuid)
fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID())
= send(createMessage(topicSession, payload.serialize().bytes, uuid), to)
* This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods
* on the messaging service interface until you have successfully started up the system. One of these objects should
* be the only way to obtain a reference to a [MessagingService]. Startup may be a slow process: some implementations
* may let you cast the returned future to an object that lets you get status info.
* A specific implementation of the controller class will have extra features that let you customise it before starting
* it up.
interface MessagingServiceBuilder<out T : MessagingService> {
fun start(): ListenableFuture<out T>
interface MessageHandlerRegistration
* An identifier for the endpoint [MessagingService] message handlers listen at.
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
* The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]).
* @param sessionID identifier for the session the message is part of. For services listening before
* a session is established, use [DEFAULT_SESSION_ID].
data class TopicSession(val topic: String, val sessionID: Long = DEFAULT_SESSION_ID) {
fun isBlank() = topic.isBlank() && sessionID == DEFAULT_SESSION_ID
override fun toString(): String = "$topic.$sessionID"
* A message is defined, at this level, to be a (topic, timestamp, byte arrays) triple, where the topic is a string in
* Java-style reverse dns form, with "platform." being a prefix reserved by the platform for its own use. Vendor
* specific messages can be defined, but use your domain name as the prefix e.g. "uk.co.bigbank.messages.SomeMessage".
* The debugTimestamp field is intended to aid in tracking messages as they flow across the network, likewise, the
* message ID is intended to be an ad-hoc way to identify a message sent in the system through debug logs and so on.
* These IDs and timestamps should not be assumed to be globally unique, although due to the nanosecond precision of
* the timestamp field they probably will be, even if an implementation just uses a hash prefix as the message id.
interface Message {
val topicSession: TopicSession
val data: ByteArray
val debugTimestamp: Instant
val uniqueMessageId: UUID
// TODO Have ReceivedMessage point to the TLS certificate of the peer, and [peer] would simply be the subject DN of that.
// The certificate would need to be serialised into the message header or just its fingerprint and then download it via RPC,
// or something like that.
interface ReceivedMessage : Message {
/** The authenticated sender. */
val peer: X500Name
/** Platform version of the sender's node. */
val platformVersion: Int
/** A singleton that's useful for validating topic strings */
object TopicStringValidator {
private val regex = "[a-zA-Z0-9.]+".toPattern()
/** @throws IllegalArgumentException if the given topic contains invalid characters */
fun check(tag: String) = require(regex.matcher(tag).matches())
* A general Ack message that conveys no content other than it's presence for use when you want an acknowledgement
* from a recipient. Using [Unit] can be ambiguous as it is similar to [Void] and so could mean no response.
object Ack : DeserializeAsKotlinObjectDef
@ -3,7 +3,10 @@ package net.corda.node.services.messaging
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.ThreadBox
import net.corda.core.messaging.*
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.VersionInfo
import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.TransactionVerifierService
@ -15,7 +18,6 @@ import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
import net.corda.node.services.RPCUserService
import net.corda.node.services.api.MessagingServiceInternal
import net.corda.node.services.api.MonitoringService
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.VerifierType
@ -75,7 +77,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
val database: Database,
val networkMapRegistrationFuture: ListenableFuture<Unit>,
val monitoringService: MonitoringService
) : ArtemisMessagingComponent(), MessagingServiceInternal {
) : ArtemisMessagingComponent(), MessagingService {
companion object {
private val log = loggerFor<NodeMessagingClient>()
@ -1,7 +1,8 @@
package net.corda.flows
package net.corda.node.services.messaging
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.messaging.*
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.serialization.CordaSerializable
@ -6,20 +6,20 @@ import com.google.common.util.concurrent.SettableFuture
import net.corda.core.bufferUntilSubscribed
import net.corda.core.crypto.Party
import net.corda.core.map
import net.corda.core.messaging.MessagingService
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.messaging.createMessage
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.node.services.NetworkCacheError
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor
import net.corda.flows.sendRequest
import net.corda.node.services.api.NetworkCacheError
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.createMessage
import net.corda.node.services.messaging.sendRequest
import net.corda.node.services.network.NetworkMapService.FetchMapResponse
import net.corda.node.services.network.NetworkMapService.SubscribeResponse
import net.corda.node.utilities.AddOrRemove
@ -36,7 +36,7 @@ import javax.annotation.concurrent.ThreadSafe
* Extremely simple in-memory cache of the network map.
open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCache {
open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCacheInternal {
companion object {
val logger = loggerFor<InMemoryNetworkMapCache>()
@ -3,10 +3,8 @@ package net.corda.node.services.network
import com.google.common.annotations.VisibleForTesting
import net.corda.core.ThreadBox
import net.corda.core.crypto.*
import net.corda.core.messaging.MessageHandlerRegistration
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.messaging.createMessage
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.node.services.NetworkMapCache
@ -17,9 +15,11 @@ import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor
import net.corda.flows.ServiceRequestMessage
import net.corda.node.services.api.AbstractNodeService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.MessageHandlerRegistration
import net.corda.node.services.messaging.ServiceRequestMessage
import net.corda.node.services.messaging.createMessage
import net.corda.node.services.network.NetworkMapService.*
import net.corda.node.services.network.NetworkMapService.Companion.FETCH_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.PUSH_ACK_TOPIC
@ -18,9 +18,6 @@ import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.commonName
import net.corda.core.flows.*
import net.corda.core.messaging.ReceivedMessage
import net.corda.core.messaging.TopicSession
import net.corda.core.messaging.send
import net.corda.core.serialization.*
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
@ -29,6 +26,9 @@ import net.corda.node.internal.ServiceFlowInfo
import net.corda.node.services.api.Checkpoint
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.ReceivedMessage
import net.corda.node.services.messaging.TopicSession
import net.corda.node.services.messaging.send
import net.corda.node.utilities.*
import org.apache.activemq.artemis.utils.ReusableLatch
import org.jetbrains.exposed.sql.Database
@ -1,10 +1,10 @@
package net.corda.node.messaging
import net.corda.core.messaging.Message
import net.corda.core.messaging.TopicStringValidator
import net.corda.core.messaging.createMessage
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.node.services.ServiceInfo
import net.corda.node.services.messaging.Message
import net.corda.node.services.messaging.TopicStringValidator
import net.corda.node.services.messaging.createMessage
import net.corda.node.services.network.NetworkMapService
import net.corda.testing.node.MockNetwork
import org.junit.Test
@ -11,10 +11,11 @@ import net.corda.core.node.services.*
import net.corda.core.transactions.SignedTransaction
import net.corda.node.internal.ServiceFlowInfo
import net.corda.node.serialization.NodeClock
import net.corda.node.services.api.MessagingServiceInternal
import net.corda.node.services.api.MonitoringService
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.api.SchemaService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
@ -26,10 +27,10 @@ import java.time.Clock
open class MockServiceHubInternal(
val customVault: VaultService? = null,
val keyManagement: KeyManagementService? = null,
val net: MessagingServiceInternal? = null,
val net: MessagingService? = null,
val identity: IdentityService? = MOCK_IDENTITY_SERVICE,
val storage: TxWritableStorageService? = MockStorageService(),
val mapCache: NetworkMapCache? = MockNetworkMapCache(),
val mapCache: NetworkMapCacheInternal? = MockNetworkMapCache(),
val scheduler: SchedulerService? = null,
val overrideClock: Clock? = NodeClock(),
val flowFactory: FlowLogicRefFactory? = FlowLogicRefFactory(),
@ -44,9 +45,9 @@ open class MockServiceHubInternal(
get() = keyManagement ?: throw UnsupportedOperationException()
override val identityService: IdentityService
get() = identity ?: throw UnsupportedOperationException()
override val networkService: MessagingServiceInternal
override val networkService: MessagingService
get() = net ?: throw UnsupportedOperationException()
override val networkMapCache: NetworkMapCache
override val networkMapCache: NetworkMapCacheInternal
get() = mapCache ?: throw UnsupportedOperationException()
override val storageService: StorageService
get() = storage ?: throw UnsupportedOperationException()
@ -5,12 +5,8 @@ import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.typesafe.config.ConfigFactory.empty
import net.corda.core.crypto.X509Utilities
import net.corda.core.crypto.generateKeyPair
import net.corda.core.messaging.Message
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.createMessage
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.utilities.ALICE
import net.corda.core.utilities.LogHelper
@ -1,16 +1,19 @@
package net.corda.node.services.network
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.crypto.X509Utilities
import net.corda.core.getOrThrow
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.messaging.send
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.node.services.ServiceInfo
import net.corda.core.serialization.deserialize
import net.corda.flows.sendRequest
import net.corda.core.utilities.ALICE
import net.corda.core.utilities.BOB
import net.corda.core.utilities.CHARLIE
import net.corda.core.utilities.DUMMY_MAP
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.send
import net.corda.node.services.messaging.sendRequest
import net.corda.node.services.network.AbstractNetworkMapServiceTest.Changed.Added
import net.corda.node.services.network.AbstractNetworkMapServiceTest.Changed.Removed
import net.corda.node.services.network.NetworkMapService.*
@ -20,18 +23,12 @@ import net.corda.node.services.network.NetworkMapService.Companion.PUSH_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.QUERY_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_TOPIC
import net.corda.node.services.network.NodeRegistration
import net.corda.core.utilities.ALICE
import net.corda.core.utilities.BOB
import net.corda.core.utilities.CHARLIE
import net.corda.core.utilities.DUMMY_MAP
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.AddOrRemove.ADD
import net.corda.node.utilities.AddOrRemove.REMOVE
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetwork.MockNode
import org.assertj.core.api.Assertions.assertThat
import org.bouncycastle.asn1.x500.X500Name
import org.eclipse.jetty.util.BlockingArrayQueue
import org.junit.After
import org.junit.Before
@ -15,7 +15,6 @@ import net.corda.core.contracts.StateRef
import net.corda.core.crypto.AnonymousParty
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowLogic
import net.corda.core.messaging.Ack
import net.corda.core.node.PluginServiceHub
import net.corda.core.node.services.dealsWith
import net.corda.core.serialization.CordaSerializable
@ -24,6 +23,7 @@ import net.corda.core.utilities.unwrap
import net.corda.flows.AbstractStateReplacementFlow.Proposal
import net.corda.flows.StateReplacementException
import net.corda.flows.TwoPartyDealFlow
import net.corda.node.services.messaging.Ack
import net.corda.vega.analytics.*
import net.corda.vega.contracts.*
import net.corda.vega.portfolio.Portfolio
@ -6,14 +6,16 @@ import com.google.common.util.concurrent.SettableFuture
import net.corda.core.ThreadBox
import net.corda.core.crypto.X509Utilities
import net.corda.core.getOrThrow
import net.corda.core.messaging.*
import net.corda.core.messaging.AllPossibleRecipients
import net.corda.core.messaging.MessageRecipientGroup
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.ServiceEntry
import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.trace
import net.corda.node.services.api.MessagingServiceBuilder
import net.corda.node.services.api.MessagingServiceInternal
import net.corda.node.services.messaging.*
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.JDBCHashSet
import net.corda.node.utilities.transaction
@ -298,7 +300,7 @@ class InMemoryMessagingNetwork(
inner class InMemoryMessaging(private val manuallyPumped: Boolean,
private val peerHandle: PeerHandle,
private val executor: AffinityExecutor,
private val database: Database) : SingletonSerializeAsToken(), MessagingServiceInternal {
private val database: Database) : SingletonSerializeAsToken(), MessagingService {
inner class Handler(val topicSession: TopicSession,
val callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
@ -19,9 +19,9 @@ import net.corda.core.utilities.DUMMY_NOTARY_KEY
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.AbstractNode
import net.corda.node.internal.ServiceFlowInfo
import net.corda.node.services.api.MessagingServiceInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.keys.E2ETestKeyManagementService
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.network.InMemoryNetworkMapService
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.statemachine.flowVersion
@ -151,7 +151,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
// We only need to override the messaging service here, as currently everything that hits disk does so
// through the java.nio API which we are already mocking via Jimfs.
override fun makeMessagingService(): MessagingServiceInternal {
override fun makeMessagingService(): MessagingService {
require(id >= 0) { "Node ID must be zero or positive, was passed: " + id }
return mockNet.messagingNetwork.createNodeWithID(
@ -4,7 +4,6 @@ import net.corda.core.contracts.Attachment
import net.corda.core.contracts.PartyAndReference
import net.corda.core.crypto.*
import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.MessagingService
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
@ -58,7 +57,6 @@ open class MockServices(val key: KeyPair = generateKeyPair()) : ServiceHub {
override val keyManagementService: MockKeyManagementService = MockKeyManagementService(key)
override val vaultService: VaultService get() = throw UnsupportedOperationException()
override val networkService: MessagingService get() = throw UnsupportedOperationException()
override val networkMapCache: NetworkMapCache get() = throw UnsupportedOperationException()
override val clock: Clock get() = Clock.systemUTC()
override val schedulerService: SchedulerService get() = throw UnsupportedOperationException()
Reference in New Issue
Block a user