mirror of
https://github.com/corda/corda.git
synced 2025-01-26 14:19:23 +00:00
Merge pull request #624 from corda/mnesbit-ServiceHub-refactor
Remove Messaging service from service hub
This commit is contained in:
commit
99bf98c0d8
@ -1,205 +1,5 @@
|
||||
package net.corda.core.messaging
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import net.corda.core.catch
|
||||
import net.corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import net.corda.core.node.services.PartyInfo
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.DeserializeAsKotlinObjectDef
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* A [MessagingService] sits at the boundary between a message routing / networking layer and the core platform code.
|
||||
*
|
||||
* A messaging system must provide the ability to send 1:many messages, potentially to an abstract "group", the
|
||||
* membership of which is defined elsewhere. Messages are atomic and the system guarantees that a sent message
|
||||
* _eventually_ will arrive in the exact form it was sent, however, messages can be arbitrarily re-ordered or delayed.
|
||||
*
|
||||
* Example implementations might be a custom P2P layer, Akka, Apache Kafka, etc. It is assumed that the message layer
|
||||
* is *reliable* and as such messages may be stored to disk once queued.
|
||||
*/
|
||||
@ThreadSafe
|
||||
interface MessagingService {
|
||||
/**
|
||||
* The provided function will be invoked for each received message whose topic matches the given string. The callback
|
||||
* will run on threads provided by the messaging service, and the callback is expected to be thread safe as a result.
|
||||
*
|
||||
* The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler].
|
||||
* The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister
|
||||
* itself and yet addMessageHandler hasn't returned the handle yet.
|
||||
*
|
||||
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
|
||||
* The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]).
|
||||
* @param sessionID identifier for the session the message is part of. For services listening before
|
||||
* a session is established, use [DEFAULT_SESSION_ID].
|
||||
*/
|
||||
fun addMessageHandler(topic: String = "", sessionID: Long = DEFAULT_SESSION_ID, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
|
||||
|
||||
/**
|
||||
* The provided function will be invoked for each received message whose topic and session matches. The callback
|
||||
* will run on the main server thread provided when the messaging service is constructed, and a database
|
||||
* transaction is set up for you automatically.
|
||||
*
|
||||
* The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler].
|
||||
* The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister
|
||||
* itself and yet addMessageHandler hasn't returned the handle yet.
|
||||
*
|
||||
* @param topicSession identifier for the topic and session to listen for messages arriving on.
|
||||
*/
|
||||
fun addMessageHandler(topicSession: TopicSession, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
|
||||
|
||||
/**
|
||||
* Removes a handler given the object returned from [addMessageHandler]. The callback will no longer be invoked once
|
||||
* this method has returned, although executions that are currently in flight will not be interrupted.
|
||||
*
|
||||
* @throws IllegalArgumentException if the given registration isn't valid for this messaging service.
|
||||
* @throws IllegalStateException if the given registration was already de-registered.
|
||||
*/
|
||||
fun removeMessageHandler(registration: MessageHandlerRegistration)
|
||||
|
||||
/**
|
||||
* Sends a message to the given receiver. The details of how receivers are identified is up to the messaging
|
||||
* implementation: the type system provides an opaque high level view, with more fine grained control being
|
||||
* available via type casting. Once this function returns the message is queued for delivery but not necessarily
|
||||
* delivered: if the recipients are offline then the message could be queued hours or days later.
|
||||
*
|
||||
* There is no way to know if a message has been received. If your flow requires this, you need the recipient
|
||||
* to send an ACK message back.
|
||||
*/
|
||||
fun send(message: Message, target: MessageRecipients)
|
||||
|
||||
/**
|
||||
* Returns an initialised [Message] with the current time, etc, already filled in.
|
||||
*
|
||||
* @param topicSession identifier for the topic and session the message is sent to.
|
||||
*/
|
||||
fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID = UUID.randomUUID()): Message
|
||||
|
||||
/** Given information about either a specific node or a service returns its corresponding address */
|
||||
fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients
|
||||
|
||||
/** Returns an address that refers to this node. */
|
||||
val myAddress: SingleMessageRecipient
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an initialised [Message] with the current time, etc, already filled in.
|
||||
*
|
||||
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
|
||||
* Must not be blank.
|
||||
* @param sessionID identifier for the session the message is part of. For messages sent to services before the
|
||||
* construction of a session, use [DEFAULT_SESSION_ID].
|
||||
*/
|
||||
fun MessagingService.createMessage(topic: String, sessionID: Long = DEFAULT_SESSION_ID, data: ByteArray): Message
|
||||
= createMessage(TopicSession(topic, sessionID), data)
|
||||
|
||||
/**
|
||||
* Registers a handler for the given topic and session ID that runs the given callback with the message and then removes
|
||||
* itself. This is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback
|
||||
* doesn't take the registration object, unlike the callback to [MessagingService.addMessageHandler], as the handler is
|
||||
* automatically deregistered before the callback runs.
|
||||
*
|
||||
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
|
||||
* The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]).
|
||||
* @param sessionID identifier for the session the message is part of. For services listening before
|
||||
* a session is established, use [DEFAULT_SESSION_ID].
|
||||
*/
|
||||
fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, callback: (ReceivedMessage) -> Unit)
|
||||
= runOnNextMessage(TopicSession(topic, sessionID), callback)
|
||||
|
||||
/**
|
||||
* Registers a handler for the given topic and session that runs the given callback with the message and then removes
|
||||
* itself. This is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback
|
||||
* doesn't take the registration object, unlike the callback to [MessagingService.addMessageHandler].
|
||||
*
|
||||
* @param topicSession identifier for the topic and session to listen for messages arriving on.
|
||||
*/
|
||||
inline fun MessagingService.runOnNextMessage(topicSession: TopicSession, crossinline callback: (ReceivedMessage) -> Unit) {
|
||||
val consumed = AtomicBoolean()
|
||||
addMessageHandler(topicSession) { msg, reg ->
|
||||
removeMessageHandler(reg)
|
||||
check(!consumed.getAndSet(true)) { "Called more than once" }
|
||||
check(msg.topicSession == topicSession) { "Topic/session mismatch: ${msg.topicSession} vs $topicSession" }
|
||||
callback(msg)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a [ListenableFuture] of the next message payload ([Message.data]) which is received on the given topic and sessionId.
|
||||
* The payload is deserialized to an object of type [M]. Any exceptions thrown will be captured by the future.
|
||||
*/
|
||||
fun <M : Any> MessagingService.onNext(topic: String, sessionId: Long): ListenableFuture<M> {
|
||||
val messageFuture = SettableFuture.create<M>()
|
||||
runOnNextMessage(topic, sessionId) { message ->
|
||||
messageFuture.catch {
|
||||
message.data.deserialize<M>()
|
||||
}
|
||||
}
|
||||
return messageFuture
|
||||
}
|
||||
|
||||
fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID())
|
||||
= send(TopicSession(topic, sessionID), payload, to, uuid)
|
||||
|
||||
fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID())
|
||||
= send(createMessage(topicSession, payload.serialize().bytes, uuid), to)
|
||||
|
||||
interface MessageHandlerRegistration
|
||||
|
||||
/**
|
||||
* An identifier for the endpoint [MessagingService] message handlers listen at.
|
||||
*
|
||||
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
|
||||
* The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]).
|
||||
* @param sessionID identifier for the session the message is part of. For services listening before
|
||||
* a session is established, use [DEFAULT_SESSION_ID].
|
||||
*/
|
||||
@CordaSerializable
|
||||
data class TopicSession(val topic: String, val sessionID: Long = DEFAULT_SESSION_ID) {
|
||||
fun isBlank() = topic.isBlank() && sessionID == DEFAULT_SESSION_ID
|
||||
override fun toString(): String = "$topic.$sessionID"
|
||||
}
|
||||
|
||||
/**
|
||||
* A message is defined, at this level, to be a (topic, timestamp, byte arrays) triple, where the topic is a string in
|
||||
* Java-style reverse dns form, with "platform." being a prefix reserved by the platform for its own use. Vendor
|
||||
* specific messages can be defined, but use your domain name as the prefix e.g. "uk.co.bigbank.messages.SomeMessage".
|
||||
*
|
||||
* The debugTimestamp field is intended to aid in tracking messages as they flow across the network, likewise, the
|
||||
* message ID is intended to be an ad-hoc way to identify a message sent in the system through debug logs and so on.
|
||||
* These IDs and timestamps should not be assumed to be globally unique, although due to the nanosecond precision of
|
||||
* the timestamp field they probably will be, even if an implementation just uses a hash prefix as the message id.
|
||||
*/
|
||||
interface Message {
|
||||
val topicSession: TopicSession
|
||||
val data: ByteArray
|
||||
val debugTimestamp: Instant
|
||||
val uniqueMessageId: UUID
|
||||
}
|
||||
|
||||
// TODO Have ReceivedMessage point to the TLS certificate of the peer, and [peer] would simply be the subject DN of that.
|
||||
// The certificate would need to be serialised into the message header or just its fingerprint and then download it via RPC,
|
||||
// or something like that.
|
||||
interface ReceivedMessage : Message {
|
||||
/** The authenticated sender. */
|
||||
val peer: X500Name
|
||||
/** Platform version of the sender's node. */
|
||||
val platformVersion: Int
|
||||
}
|
||||
|
||||
/** A singleton that's useful for validating topic strings */
|
||||
object TopicStringValidator {
|
||||
private val regex = "[a-zA-Z0-9.]+".toPattern()
|
||||
/** @throws IllegalArgumentException if the given topic contains invalid characters */
|
||||
fun check(tag: String) = require(regex.matcher(tag).matches())
|
||||
}
|
||||
|
||||
/** The interface for a group of message recipients (which may contain only one recipient) */
|
||||
interface MessageRecipients
|
||||
@ -212,10 +12,3 @@ interface MessageRecipientGroup : MessageRecipients
|
||||
|
||||
/** A special base class for the set of all possible recipients, without having to identify who they all are. */
|
||||
interface AllPossibleRecipients : MessageRecipients
|
||||
|
||||
/**
|
||||
* A general Ack message that conveys no content other than it's presence for use when you want an acknowledgement
|
||||
* from a recipient. Using [Unit] can be ambiguous as it is similar to [Void] and so could mean no response.
|
||||
*/
|
||||
@CordaSerializable
|
||||
object Ack : DeserializeAsKotlinObjectDef
|
||||
|
@ -2,10 +2,6 @@ package net.corda.core.node
|
||||
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.crypto.keys
|
||||
import net.corda.core.flows.FlowInitiator
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowStateMachine
|
||||
import net.corda.core.messaging.MessagingService
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import java.security.KeyPair
|
||||
@ -41,7 +37,6 @@ interface ServicesForResolution {
|
||||
interface ServiceHub : ServicesForResolution {
|
||||
val vaultService: VaultService
|
||||
val keyManagementService: KeyManagementService
|
||||
val networkService: MessagingService
|
||||
override val storageService: StorageService
|
||||
val networkMapCache: NetworkMapCache
|
||||
val schedulerService: SchedulerService
|
||||
|
@ -1,11 +1,8 @@
|
||||
package net.corda.core.node.services
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import net.corda.core.contracts.Contract
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.messaging.MessagingService
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.randomOrNull
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
@ -119,39 +116,4 @@ interface NetworkMapCache {
|
||||
"Your options are: ${notaryNodes.map { "\"${it.notaryIdentity.name}\"" }.joinToString()}.")
|
||||
return notary.advertisedServices.any { it.info.type.isValidatingNotary() }
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a network map service; fetches a copy of the latest map from the service and subscribes to any further
|
||||
* updates.
|
||||
* @param net the network messaging service.
|
||||
* @param networkMapAddress the network map service to fetch current state from.
|
||||
* @param subscribe if the cache should subscribe to updates.
|
||||
* @param ifChangedSinceVer an optional version number to limit updating the map based on. If the latest map
|
||||
* version is less than or equal to the given version, no update is fetched.
|
||||
*/
|
||||
fun addMapService(net: MessagingService, networkMapAddress: SingleMessageRecipient,
|
||||
subscribe: Boolean, ifChangedSinceVer: Int? = null): ListenableFuture<Unit>
|
||||
|
||||
/** Adds a node to the local cache (generally only used for adding ourselves). */
|
||||
fun addNode(node: NodeInfo)
|
||||
|
||||
/** Removes a node from the local cache. */
|
||||
fun removeNode(node: NodeInfo)
|
||||
|
||||
/**
|
||||
* Deregister from updates from the given map service.
|
||||
* @param net the network messaging service.
|
||||
* @param service the network map service to fetch current state from.
|
||||
*/
|
||||
fun deregisterForUpdates(net: MessagingService, service: NodeInfo): ListenableFuture<Unit>
|
||||
|
||||
/** For testing where the network map cache is manipulated marks the service as immediately ready. */
|
||||
@VisibleForTesting
|
||||
fun runWithoutMapService()
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
sealed class NetworkCacheError : Exception() {
|
||||
/** Indicates a failure to deregister, because of a rejected request from the remote node */
|
||||
class DeregistrationFailed : NetworkCacheError()
|
||||
}
|
||||
|
@ -3,7 +3,7 @@ package net.corda.core.serialization
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.google.common.primitives.Ints
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.messaging.Ack
|
||||
import net.corda.node.services.messaging.Ack
|
||||
import net.corda.node.services.persistence.NodeAttachmentService
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
|
@ -109,12 +109,3 @@ the validated user is the username itself and the RPC framework uses this to det
|
||||
|
||||
The broker also does host verification when connecting to another peer. It checks that the TLS certificate common name
|
||||
matches with the advertised legal name from the network map service.
|
||||
|
||||
Messaging types
|
||||
---------------
|
||||
|
||||
Every ``Message`` object has an associated *topic* and may have a *session ID*. These are wrapped in a ``TopicSession``.
|
||||
An implementation of ``MessagingService`` can be used to create messages and send them. You can get access to the
|
||||
messaging service via the ``ServiceHub`` object that is provided to your app. Endpoints on the network are
|
||||
identified at the lowest level using ``SingleMessageRecipient`` which may be e.g. an IP address, or in future
|
||||
versions perhaps a routing path through the network.
|
||||
|
@ -5,16 +5,16 @@ import com.google.common.util.concurrent.ListenableFuture
|
||||
import net.corda.core.*
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.messaging.createMessage
|
||||
import net.corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.flows.ServiceRequestMessage
|
||||
import net.corda.flows.sendRequest
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.services.messaging.ServiceRequestMessage
|
||||
import net.corda.node.services.messaging.createMessage
|
||||
import net.corda.node.services.messaging.sendRequest
|
||||
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
||||
import net.corda.node.services.transactions.SimpleNotaryService
|
||||
import net.corda.node.utilities.ServiceIdentityGenerator
|
||||
|
@ -8,9 +8,9 @@ import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.random63BitValue
|
||||
import net.corda.core.seconds
|
||||
import net.corda.core.utilities.BOB
|
||||
import net.corda.flows.sendRequest
|
||||
import net.corda.node.internal.NetworkMapInfo
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.node.services.messaging.sendRequest
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.network.NetworkMapService.RegistrationRequest
|
||||
import net.corda.node.services.network.NodeRegistration
|
||||
|
@ -32,6 +32,8 @@ import net.corda.node.services.events.NodeSchedulerService
|
||||
import net.corda.node.services.events.ScheduledActivityObserver
|
||||
import net.corda.node.services.identity.InMemoryIdentityService
|
||||
import net.corda.node.services.keys.PersistentKeyManagementService
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.messaging.sendRequest
|
||||
import net.corda.node.services.network.InMemoryNetworkMapCache
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.network.NetworkMapService.RegistrationResponse
|
||||
@ -112,8 +114,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
protected val partyKeys = mutableSetOf<KeyPair>()
|
||||
|
||||
val services = object : ServiceHubInternal() {
|
||||
override val networkService: MessagingServiceInternal get() = net
|
||||
override val networkMapCache: NetworkMapCache get() = netMapCache
|
||||
override val networkService: MessagingService get() = net
|
||||
override val networkMapCache: NetworkMapCacheInternal get() = netMapCache
|
||||
override val storageService: TxWritableStorageService get() = storage
|
||||
override val vaultService: VaultService get() = vault
|
||||
override val keyManagementService: KeyManagementService get() = keyManagement
|
||||
@ -162,8 +164,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
var inNodeNetworkMapService: NetworkMapService? = null
|
||||
lateinit var txVerifierService: TransactionVerifierService
|
||||
lateinit var identity: IdentityService
|
||||
lateinit var net: MessagingServiceInternal
|
||||
lateinit var netMapCache: NetworkMapCache
|
||||
lateinit var net: MessagingService
|
||||
lateinit var netMapCache: NetworkMapCacheInternal
|
||||
lateinit var scheduler: NodeSchedulerService
|
||||
lateinit var flowLogicFactory: FlowLogicRefFactory
|
||||
lateinit var schemas: SchemaService
|
||||
@ -528,7 +530,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
runOnStop.clear()
|
||||
}
|
||||
|
||||
protected abstract fun makeMessagingService(): MessagingServiceInternal
|
||||
protected abstract fun makeMessagingService(): MessagingService
|
||||
|
||||
protected abstract fun startMessagingService(rpcOps: RPCOps)
|
||||
|
||||
|
@ -19,9 +19,9 @@ import net.corda.node.printBasicNodeInfo
|
||||
import net.corda.node.serialization.NodeClock
|
||||
import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.RPCUserServiceImpl
|
||||
import net.corda.node.services.api.MessagingServiceInternal
|
||||
import net.corda.node.services.config.FullNodeConfiguration
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.messaging.NodeMessagingClient
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.services.transactions.RaftNonValidatingNotaryService
|
||||
@ -109,7 +109,7 @@ class Node(override val configuration: FullNodeConfiguration,
|
||||
|
||||
private lateinit var userService: RPCUserService
|
||||
|
||||
override fun makeMessagingService(): MessagingServiceInternal {
|
||||
override fun makeMessagingService(): MessagingService {
|
||||
userService = RPCUserServiceImpl(configuration.rpcUsers)
|
||||
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker()
|
||||
val myIdentityOrNullIfNetworkMapService = if (networkMapAddress != null) obtainLegalIdentity().owningKey else null
|
||||
|
@ -1,13 +1,10 @@
|
||||
package net.corda.node.services.api
|
||||
|
||||
import net.corda.core.messaging.Message
|
||||
import net.corda.core.messaging.MessageHandlerRegistration
|
||||
import net.corda.core.messaging.createMessage
|
||||
import net.corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.flows.ServiceRequestMessage
|
||||
import net.corda.node.services.messaging.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
@ -16,7 +13,7 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
@ThreadSafe
|
||||
abstract class AbstractNodeService(val services: ServiceHubInternal) : SingletonSerializeAsToken() {
|
||||
|
||||
val net: MessagingServiceInternal get() = services.networkService
|
||||
val net: MessagingService get() = services.networkService
|
||||
|
||||
/**
|
||||
* Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of
|
||||
|
@ -6,38 +6,56 @@ import net.corda.core.flows.FlowInitiator
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowLogicRefFactory
|
||||
import net.corda.core.flows.FlowStateMachine
|
||||
import net.corda.core.messaging.MessagingService
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.PluginServiceHub
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.core.node.services.TxWritableStorageService
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.internal.ServiceFlowInfo
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
|
||||
interface MessagingServiceInternal : MessagingService {
|
||||
interface NetworkMapCacheInternal : NetworkMapCache {
|
||||
/**
|
||||
* Initiates shutdown: if called from a thread that isn't controlled by the executor passed to the constructor
|
||||
* then this will block until all in-flight messages have finished being handled and acknowledged. If called
|
||||
* from a thread that's a part of the [net.corda.node.utilities.AffinityExecutor] given to the constructor,
|
||||
* it returns immediately and shutdown is asynchronous.
|
||||
* Deregister from updates from the given map service.
|
||||
* @param net the network messaging service.
|
||||
* @param service the network map service to fetch current state from.
|
||||
*/
|
||||
fun stop()
|
||||
fun deregisterForUpdates(net: MessagingService, service: NodeInfo): ListenableFuture<Unit>
|
||||
|
||||
/**
|
||||
* Add a network map service; fetches a copy of the latest map from the service and subscribes to any further
|
||||
* updates.
|
||||
* @param net the network messaging service.
|
||||
* @param networkMapAddress the network map service to fetch current state from.
|
||||
* @param subscribe if the cache should subscribe to updates.
|
||||
* @param ifChangedSinceVer an optional version number to limit updating the map based on. If the latest map
|
||||
* version is less than or equal to the given version, no update is fetched.
|
||||
*/
|
||||
fun addMapService(net: MessagingService, networkMapAddress: SingleMessageRecipient,
|
||||
subscribe: Boolean, ifChangedSinceVer: Int? = null): ListenableFuture<Unit>
|
||||
|
||||
/** Adds a node to the local cache (generally only used for adding ourselves). */
|
||||
fun addNode(node: NodeInfo)
|
||||
|
||||
/** Removes a node from the local cache. */
|
||||
fun removeNode(node: NodeInfo)
|
||||
|
||||
/** For testing where the network map cache is manipulated marks the service as immediately ready. */
|
||||
@VisibleForTesting
|
||||
fun runWithoutMapService()
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods
|
||||
* on the messaging service interface until you have successfully started up the system. One of these objects should
|
||||
* be the only way to obtain a reference to a [MessagingService]. Startup may be a slow process: some implementations
|
||||
* may let you cast the returned future to an object that lets you get status info.
|
||||
*
|
||||
* A specific implementation of the controller class will have extra features that let you customise it before starting
|
||||
* it up.
|
||||
*/
|
||||
interface MessagingServiceBuilder<out T : MessagingServiceInternal> {
|
||||
fun start(): ListenableFuture<out T>
|
||||
@CordaSerializable
|
||||
sealed class NetworkCacheError : Exception() {
|
||||
/** Indicates a failure to deregister, because of a rejected request from the remote node */
|
||||
class DeregistrationFailed : NetworkCacheError()
|
||||
}
|
||||
|
||||
|
||||
abstract class ServiceHubInternal : PluginServiceHub {
|
||||
companion object {
|
||||
private val log = loggerFor<ServiceHubInternal>()
|
||||
@ -46,8 +64,9 @@ abstract class ServiceHubInternal : PluginServiceHub {
|
||||
abstract val monitoringService: MonitoringService
|
||||
abstract val flowLogicRefFactory: FlowLogicRefFactory
|
||||
abstract val schemaService: SchemaService
|
||||
abstract override val networkMapCache: NetworkMapCacheInternal
|
||||
|
||||
abstract override val networkService: MessagingServiceInternal
|
||||
abstract val networkService: MessagingService
|
||||
|
||||
/**
|
||||
* Given a list of [SignedTransaction]s, writes them to the given storage for validated transactions and then
|
||||
|
@ -0,0 +1,232 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import net.corda.core.catch
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import net.corda.core.node.services.PartyInfo
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.DeserializeAsKotlinObjectDef
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* A [MessagingService] sits at the boundary between a message routing / networking layer and the core platform code.
|
||||
*
|
||||
* A messaging system must provide the ability to send 1:many messages, potentially to an abstract "group", the
|
||||
* membership of which is defined elsewhere. Messages are atomic and the system guarantees that a sent message
|
||||
* _eventually_ will arrive in the exact form it was sent, however, messages can be arbitrarily re-ordered or delayed.
|
||||
*
|
||||
* Example implementations might be a custom P2P layer, Akka, Apache Kafka, etc. It is assumed that the message layer
|
||||
* is *reliable* and as such messages may be stored to disk once queued.
|
||||
*/
|
||||
@ThreadSafe
|
||||
interface MessagingService {
|
||||
/**
|
||||
* The provided function will be invoked for each received message whose topic matches the given string. The callback
|
||||
* will run on threads provided by the messaging service, and the callback is expected to be thread safe as a result.
|
||||
*
|
||||
* The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler].
|
||||
* The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister
|
||||
* itself and yet addMessageHandler hasn't returned the handle yet.
|
||||
*
|
||||
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
|
||||
* The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]).
|
||||
* @param sessionID identifier for the session the message is part of. For services listening before
|
||||
* a session is established, use [DEFAULT_SESSION_ID].
|
||||
*/
|
||||
fun addMessageHandler(topic: String = "", sessionID: Long = DEFAULT_SESSION_ID, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
|
||||
|
||||
/**
|
||||
* The provided function will be invoked for each received message whose topic and session matches. The callback
|
||||
* will run on the main server thread provided when the messaging service is constructed, and a database
|
||||
* transaction is set up for you automatically.
|
||||
*
|
||||
* The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler].
|
||||
* The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister
|
||||
* itself and yet addMessageHandler hasn't returned the handle yet.
|
||||
*
|
||||
* @param topicSession identifier for the topic and session to listen for messages arriving on.
|
||||
*/
|
||||
fun addMessageHandler(topicSession: TopicSession, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
|
||||
|
||||
/**
|
||||
* Removes a handler given the object returned from [addMessageHandler]. The callback will no longer be invoked once
|
||||
* this method has returned, although executions that are currently in flight will not be interrupted.
|
||||
*
|
||||
* @throws IllegalArgumentException if the given registration isn't valid for this messaging service.
|
||||
* @throws IllegalStateException if the given registration was already de-registered.
|
||||
*/
|
||||
fun removeMessageHandler(registration: MessageHandlerRegistration)
|
||||
|
||||
/**
|
||||
* Sends a message to the given receiver. The details of how receivers are identified is up to the messaging
|
||||
* implementation: the type system provides an opaque high level view, with more fine grained control being
|
||||
* available via type casting. Once this function returns the message is queued for delivery but not necessarily
|
||||
* delivered: if the recipients are offline then the message could be queued hours or days later.
|
||||
*
|
||||
* There is no way to know if a message has been received. If your flow requires this, you need the recipient
|
||||
* to send an ACK message back.
|
||||
*/
|
||||
fun send(message: Message, target: MessageRecipients)
|
||||
|
||||
/**
|
||||
* Returns an initialised [Message] with the current time, etc, already filled in.
|
||||
*
|
||||
* @param topicSession identifier for the topic and session the message is sent to.
|
||||
*/
|
||||
fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID = UUID.randomUUID()): Message
|
||||
|
||||
/** Given information about either a specific node or a service returns its corresponding address */
|
||||
fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients
|
||||
|
||||
/** Returns an address that refers to this node. */
|
||||
val myAddress: SingleMessageRecipient
|
||||
|
||||
/**
|
||||
* Initiates shutdown: if called from a thread that isn't controlled by the executor passed to the constructor
|
||||
* then this will block until all in-flight messages have finished being handled and acknowledged. If called
|
||||
* from a thread that's a part of the [net.corda.node.utilities.AffinityExecutor] given to the constructor,
|
||||
* it returns immediately and shutdown is asynchronous.
|
||||
*/
|
||||
fun stop()
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an initialised [Message] with the current time, etc, already filled in.
|
||||
*
|
||||
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
|
||||
* Must not be blank.
|
||||
* @param sessionID identifier for the session the message is part of. For messages sent to services before the
|
||||
* construction of a session, use [DEFAULT_SESSION_ID].
|
||||
*/
|
||||
fun MessagingService.createMessage(topic: String, sessionID: Long = DEFAULT_SESSION_ID, data: ByteArray): Message
|
||||
= createMessage(TopicSession(topic, sessionID), data)
|
||||
|
||||
/**
|
||||
* Registers a handler for the given topic and session ID that runs the given callback with the message and then removes
|
||||
* itself. This is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback
|
||||
* doesn't take the registration object, unlike the callback to [MessagingService.addMessageHandler], as the handler is
|
||||
* automatically deregistered before the callback runs.
|
||||
*
|
||||
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
|
||||
* The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]).
|
||||
* @param sessionID identifier for the session the message is part of. For services listening before
|
||||
* a session is established, use [DEFAULT_SESSION_ID].
|
||||
*/
|
||||
fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, callback: (ReceivedMessage) -> Unit)
|
||||
= runOnNextMessage(TopicSession(topic, sessionID), callback)
|
||||
|
||||
/**
|
||||
* Registers a handler for the given topic and session that runs the given callback with the message and then removes
|
||||
* itself. This is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback
|
||||
* doesn't take the registration object, unlike the callback to [MessagingService.addMessageHandler].
|
||||
*
|
||||
* @param topicSession identifier for the topic and session to listen for messages arriving on.
|
||||
*/
|
||||
inline fun MessagingService.runOnNextMessage(topicSession: TopicSession, crossinline callback: (ReceivedMessage) -> Unit) {
|
||||
val consumed = AtomicBoolean()
|
||||
addMessageHandler(topicSession) { msg, reg ->
|
||||
removeMessageHandler(reg)
|
||||
check(!consumed.getAndSet(true)) { "Called more than once" }
|
||||
check(msg.topicSession == topicSession) { "Topic/session mismatch: ${msg.topicSession} vs $topicSession" }
|
||||
callback(msg)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a [ListenableFuture] of the next message payload ([Message.data]) which is received on the given topic and sessionId.
|
||||
* The payload is deserialized to an object of type [M]. Any exceptions thrown will be captured by the future.
|
||||
*/
|
||||
fun <M : Any> MessagingService.onNext(topic: String, sessionId: Long): ListenableFuture<M> {
|
||||
val messageFuture = SettableFuture.create<M>()
|
||||
runOnNextMessage(topic, sessionId) { message ->
|
||||
messageFuture.catch {
|
||||
message.data.deserialize<M>()
|
||||
}
|
||||
}
|
||||
return messageFuture
|
||||
}
|
||||
|
||||
fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID())
|
||||
= send(TopicSession(topic, sessionID), payload, to, uuid)
|
||||
|
||||
fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID())
|
||||
= send(createMessage(topicSession, payload.serialize().bytes, uuid), to)
|
||||
|
||||
/**
|
||||
* This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods
|
||||
* on the messaging service interface until you have successfully started up the system. One of these objects should
|
||||
* be the only way to obtain a reference to a [MessagingService]. Startup may be a slow process: some implementations
|
||||
* may let you cast the returned future to an object that lets you get status info.
|
||||
*
|
||||
* A specific implementation of the controller class will have extra features that let you customise it before starting
|
||||
* it up.
|
||||
*/
|
||||
interface MessagingServiceBuilder<out T : MessagingService> {
|
||||
fun start(): ListenableFuture<out T>
|
||||
}
|
||||
|
||||
interface MessageHandlerRegistration
|
||||
|
||||
/**
|
||||
* An identifier for the endpoint [MessagingService] message handlers listen at.
|
||||
*
|
||||
* @param topic identifier for the general subject of the message, for example "platform.network_map.fetch".
|
||||
* The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]).
|
||||
* @param sessionID identifier for the session the message is part of. For services listening before
|
||||
* a session is established, use [DEFAULT_SESSION_ID].
|
||||
*/
|
||||
@CordaSerializable
|
||||
data class TopicSession(val topic: String, val sessionID: Long = DEFAULT_SESSION_ID) {
|
||||
fun isBlank() = topic.isBlank() && sessionID == DEFAULT_SESSION_ID
|
||||
override fun toString(): String = "$topic.$sessionID"
|
||||
}
|
||||
|
||||
/**
|
||||
* A message is defined, at this level, to be a (topic, timestamp, byte arrays) triple, where the topic is a string in
|
||||
* Java-style reverse dns form, with "platform." being a prefix reserved by the platform for its own use. Vendor
|
||||
* specific messages can be defined, but use your domain name as the prefix e.g. "uk.co.bigbank.messages.SomeMessage".
|
||||
*
|
||||
* The debugTimestamp field is intended to aid in tracking messages as they flow across the network, likewise, the
|
||||
* message ID is intended to be an ad-hoc way to identify a message sent in the system through debug logs and so on.
|
||||
* These IDs and timestamps should not be assumed to be globally unique, although due to the nanosecond precision of
|
||||
* the timestamp field they probably will be, even if an implementation just uses a hash prefix as the message id.
|
||||
*/
|
||||
interface Message {
|
||||
val topicSession: TopicSession
|
||||
val data: ByteArray
|
||||
val debugTimestamp: Instant
|
||||
val uniqueMessageId: UUID
|
||||
}
|
||||
|
||||
// TODO Have ReceivedMessage point to the TLS certificate of the peer, and [peer] would simply be the subject DN of that.
|
||||
// The certificate would need to be serialised into the message header or just its fingerprint and then download it via RPC,
|
||||
// or something like that.
|
||||
interface ReceivedMessage : Message {
|
||||
/** The authenticated sender. */
|
||||
val peer: X500Name
|
||||
/** Platform version of the sender's node. */
|
||||
val platformVersion: Int
|
||||
}
|
||||
|
||||
/** A singleton that's useful for validating topic strings */
|
||||
object TopicStringValidator {
|
||||
private val regex = "[a-zA-Z0-9.]+".toPattern()
|
||||
/** @throws IllegalArgumentException if the given topic contains invalid characters */
|
||||
fun check(tag: String) = require(regex.matcher(tag).matches())
|
||||
}
|
||||
|
||||
/**
|
||||
* A general Ack message that conveys no content other than it's presence for use when you want an acknowledgement
|
||||
* from a recipient. Using [Unit] can be ambiguous as it is similar to [Void] and so could mean no response.
|
||||
*/
|
||||
@CordaSerializable
|
||||
object Ack : DeserializeAsKotlinObjectDef
|
@ -3,7 +3,10 @@ package net.corda.node.services.messaging
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import net.corda.core.ThreadBox
|
||||
import net.corda.core.messaging.*
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.VersionInfo
|
||||
import net.corda.core.node.services.PartyInfo
|
||||
import net.corda.core.node.services.TransactionVerifierService
|
||||
@ -15,7 +18,6 @@ import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.api.MessagingServiceInternal
|
||||
import net.corda.node.services.api.MonitoringService
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.VerifierType
|
||||
@ -75,7 +77,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
||||
val database: Database,
|
||||
val networkMapRegistrationFuture: ListenableFuture<Unit>,
|
||||
val monitoringService: MonitoringService
|
||||
) : ArtemisMessagingComponent(), MessagingServiceInternal {
|
||||
) : ArtemisMessagingComponent(), MessagingService {
|
||||
companion object {
|
||||
private val log = loggerFor<NodeMessagingClient>()
|
||||
|
||||
|
@ -1,7 +1,8 @@
|
||||
package net.corda.flows
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import net.corda.core.messaging.*
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
|
@ -6,20 +6,20 @@ import com.google.common.util.concurrent.SettableFuture
|
||||
import net.corda.core.bufferUntilSubscribed
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.map
|
||||
import net.corda.core.messaging.MessagingService
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.messaging.createMessage
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import net.corda.core.node.services.NetworkCacheError
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.core.node.services.NetworkMapCache.MapChange
|
||||
import net.corda.core.node.services.PartyInfo
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.flows.sendRequest
|
||||
import net.corda.node.services.api.NetworkCacheError
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.messaging.createMessage
|
||||
import net.corda.node.services.messaging.sendRequest
|
||||
import net.corda.node.services.network.NetworkMapService.FetchMapResponse
|
||||
import net.corda.node.services.network.NetworkMapService.SubscribeResponse
|
||||
import net.corda.node.utilities.AddOrRemove
|
||||
@ -36,7 +36,7 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
* Extremely simple in-memory cache of the network map.
|
||||
*/
|
||||
@ThreadSafe
|
||||
open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCache {
|
||||
open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCacheInternal {
|
||||
companion object {
|
||||
val logger = loggerFor<InMemoryNetworkMapCache>()
|
||||
}
|
||||
|
@ -3,10 +3,8 @@ package net.corda.node.services.network
|
||||
import com.google.common.annotations.VisibleForTesting
|
||||
import net.corda.core.ThreadBox
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.messaging.MessageHandlerRegistration
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.messaging.createMessage
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
@ -17,9 +15,11 @@ import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.flows.ServiceRequestMessage
|
||||
import net.corda.node.services.api.AbstractNodeService
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.messaging.MessageHandlerRegistration
|
||||
import net.corda.node.services.messaging.ServiceRequestMessage
|
||||
import net.corda.node.services.messaging.createMessage
|
||||
import net.corda.node.services.network.NetworkMapService.*
|
||||
import net.corda.node.services.network.NetworkMapService.Companion.FETCH_TOPIC
|
||||
import net.corda.node.services.network.NetworkMapService.Companion.PUSH_ACK_TOPIC
|
||||
|
@ -18,9 +18,6 @@ import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.commonName
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.messaging.ReceivedMessage
|
||||
import net.corda.core.messaging.TopicSession
|
||||
import net.corda.core.messaging.send
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.loggerFor
|
||||
@ -29,6 +26,9 @@ import net.corda.node.internal.ServiceFlowInfo
|
||||
import net.corda.node.services.api.Checkpoint
|
||||
import net.corda.node.services.api.CheckpointStorage
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.messaging.ReceivedMessage
|
||||
import net.corda.node.services.messaging.TopicSession
|
||||
import net.corda.node.services.messaging.send
|
||||
import net.corda.node.utilities.*
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
|
@ -1,10 +1,10 @@
|
||||
package net.corda.node.messaging
|
||||
|
||||
import net.corda.core.messaging.Message
|
||||
import net.corda.core.messaging.TopicStringValidator
|
||||
import net.corda.core.messaging.createMessage
|
||||
import net.corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.node.services.messaging.Message
|
||||
import net.corda.node.services.messaging.TopicStringValidator
|
||||
import net.corda.node.services.messaging.createMessage
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import org.junit.Test
|
||||
|
@ -11,10 +11,11 @@ import net.corda.core.node.services.*
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.node.internal.ServiceFlowInfo
|
||||
import net.corda.node.serialization.NodeClock
|
||||
import net.corda.node.services.api.MessagingServiceInternal
|
||||
import net.corda.node.services.api.MonitoringService
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
||||
@ -26,10 +27,10 @@ import java.time.Clock
|
||||
open class MockServiceHubInternal(
|
||||
val customVault: VaultService? = null,
|
||||
val keyManagement: KeyManagementService? = null,
|
||||
val net: MessagingServiceInternal? = null,
|
||||
val net: MessagingService? = null,
|
||||
val identity: IdentityService? = MOCK_IDENTITY_SERVICE,
|
||||
val storage: TxWritableStorageService? = MockStorageService(),
|
||||
val mapCache: NetworkMapCache? = MockNetworkMapCache(),
|
||||
val mapCache: NetworkMapCacheInternal? = MockNetworkMapCache(),
|
||||
val scheduler: SchedulerService? = null,
|
||||
val overrideClock: Clock? = NodeClock(),
|
||||
val flowFactory: FlowLogicRefFactory? = FlowLogicRefFactory(),
|
||||
@ -44,9 +45,9 @@ open class MockServiceHubInternal(
|
||||
get() = keyManagement ?: throw UnsupportedOperationException()
|
||||
override val identityService: IdentityService
|
||||
get() = identity ?: throw UnsupportedOperationException()
|
||||
override val networkService: MessagingServiceInternal
|
||||
override val networkService: MessagingService
|
||||
get() = net ?: throw UnsupportedOperationException()
|
||||
override val networkMapCache: NetworkMapCache
|
||||
override val networkMapCache: NetworkMapCacheInternal
|
||||
get() = mapCache ?: throw UnsupportedOperationException()
|
||||
override val storageService: StorageService
|
||||
get() = storage ?: throw UnsupportedOperationException()
|
||||
|
@ -5,12 +5,8 @@ import com.google.common.net.HostAndPort
|
||||
import com.google.common.util.concurrent.Futures
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import com.typesafe.config.ConfigFactory.empty
|
||||
import net.corda.core.crypto.X509Utilities
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.messaging.Message
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.messaging.createMessage
|
||||
import net.corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import net.corda.core.utilities.ALICE
|
||||
import net.corda.core.utilities.LogHelper
|
||||
|
@ -1,16 +1,19 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import net.corda.core.crypto.X509Utilities
|
||||
import net.corda.core.getOrThrow
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.messaging.send
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.flows.sendRequest
|
||||
import net.corda.core.utilities.ALICE
|
||||
import net.corda.core.utilities.BOB
|
||||
import net.corda.core.utilities.CHARLIE
|
||||
import net.corda.core.utilities.DUMMY_MAP
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.messaging.send
|
||||
import net.corda.node.services.messaging.sendRequest
|
||||
import net.corda.node.services.network.AbstractNetworkMapServiceTest.Changed.Added
|
||||
import net.corda.node.services.network.AbstractNetworkMapServiceTest.Changed.Removed
|
||||
import net.corda.node.services.network.NetworkMapService.*
|
||||
@ -20,18 +23,12 @@ import net.corda.node.services.network.NetworkMapService.Companion.PUSH_TOPIC
|
||||
import net.corda.node.services.network.NetworkMapService.Companion.QUERY_TOPIC
|
||||
import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_TOPIC
|
||||
import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_TOPIC
|
||||
import net.corda.node.services.network.NodeRegistration
|
||||
import net.corda.core.utilities.ALICE
|
||||
import net.corda.core.utilities.BOB
|
||||
import net.corda.core.utilities.CHARLIE
|
||||
import net.corda.core.utilities.DUMMY_MAP
|
||||
import net.corda.node.utilities.AddOrRemove
|
||||
import net.corda.node.utilities.AddOrRemove.ADD
|
||||
import net.corda.node.utilities.AddOrRemove.REMOVE
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import net.corda.testing.node.MockNetwork.MockNode
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.eclipse.jetty.util.BlockingArrayQueue
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
|
@ -15,7 +15,6 @@ import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.AnonymousParty
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.messaging.Ack
|
||||
import net.corda.core.node.PluginServiceHub
|
||||
import net.corda.core.node.services.dealsWith
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
@ -24,6 +23,7 @@ import net.corda.core.utilities.unwrap
|
||||
import net.corda.flows.AbstractStateReplacementFlow.Proposal
|
||||
import net.corda.flows.StateReplacementException
|
||||
import net.corda.flows.TwoPartyDealFlow
|
||||
import net.corda.node.services.messaging.Ack
|
||||
import net.corda.vega.analytics.*
|
||||
import net.corda.vega.contracts.*
|
||||
import net.corda.vega.portfolio.Portfolio
|
||||
|
@ -6,14 +6,16 @@ import com.google.common.util.concurrent.SettableFuture
|
||||
import net.corda.core.ThreadBox
|
||||
import net.corda.core.crypto.X509Utilities
|
||||
import net.corda.core.getOrThrow
|
||||
import net.corda.core.messaging.*
|
||||
import net.corda.core.messaging.AllPossibleRecipients
|
||||
import net.corda.core.messaging.MessageRecipientGroup
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.ServiceEntry
|
||||
import net.corda.core.node.services.PartyInfo
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.api.MessagingServiceBuilder
|
||||
import net.corda.node.services.api.MessagingServiceInternal
|
||||
import net.corda.node.services.messaging.*
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.JDBCHashSet
|
||||
import net.corda.node.utilities.transaction
|
||||
@ -298,7 +300,7 @@ class InMemoryMessagingNetwork(
|
||||
inner class InMemoryMessaging(private val manuallyPumped: Boolean,
|
||||
private val peerHandle: PeerHandle,
|
||||
private val executor: AffinityExecutor,
|
||||
private val database: Database) : SingletonSerializeAsToken(), MessagingServiceInternal {
|
||||
private val database: Database) : SingletonSerializeAsToken(), MessagingService {
|
||||
inner class Handler(val topicSession: TopicSession,
|
||||
val callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
|
||||
|
||||
|
@ -19,9 +19,9 @@ import net.corda.core.utilities.DUMMY_NOTARY_KEY
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.internal.AbstractNode
|
||||
import net.corda.node.internal.ServiceFlowInfo
|
||||
import net.corda.node.services.api.MessagingServiceInternal
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.keys.E2ETestKeyManagementService
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.network.InMemoryNetworkMapService
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.statemachine.flowVersion
|
||||
@ -151,7 +151,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
|
||||
// We only need to override the messaging service here, as currently everything that hits disk does so
|
||||
// through the java.nio API which we are already mocking via Jimfs.
|
||||
override fun makeMessagingService(): MessagingServiceInternal {
|
||||
override fun makeMessagingService(): MessagingService {
|
||||
require(id >= 0) { "Node ID must be zero or positive, was passed: " + id }
|
||||
return mockNet.messagingNetwork.createNodeWithID(
|
||||
!mockNet.threadPerNode,
|
||||
|
@ -4,7 +4,6 @@ import net.corda.core.contracts.Attachment
|
||||
import net.corda.core.contracts.PartyAndReference
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.messaging.MessagingService
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.ServiceHub
|
||||
@ -58,7 +57,6 @@ open class MockServices(val key: KeyPair = generateKeyPair()) : ServiceHub {
|
||||
override val keyManagementService: MockKeyManagementService = MockKeyManagementService(key)
|
||||
|
||||
override val vaultService: VaultService get() = throw UnsupportedOperationException()
|
||||
override val networkService: MessagingService get() = throw UnsupportedOperationException()
|
||||
override val networkMapCache: NetworkMapCache get() = throw UnsupportedOperationException()
|
||||
override val clock: Clock get() = Clock.systemUTC()
|
||||
override val schedulerService: SchedulerService get() = throw UnsupportedOperationException()
|
||||
|
Loading…
x
Reference in New Issue
Block a user