From 7e8aa1d706257b35405d8fdcc758a881fdb17479 Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Wed, 3 May 2017 14:24:31 +0100 Subject: [PATCH] Remove Messaging service from service hub Remove mention of MessagingService as being on ServiceHub. --- .../net/corda/core/messaging/Messaging.kt | 207 ---------------- .../kotlin/net/corda/core/node/ServiceHub.kt | 5 - .../core/node/services/NetworkMapCache.kt | 38 --- .../net/corda/core/serialization/KryoTests.kt | 2 +- docs/source/messaging.rst | 9 - .../services/messaging/P2PMessagingTest.kt | 6 +- .../services/messaging/P2PSecurityTest.kt | 2 +- .../net/corda/node/internal/AbstractNode.kt | 12 +- .../kotlin/net/corda/node/internal/Node.kt | 4 +- .../node/services/api/AbstractNodeService.kt | 7 +- .../node/services/api/ServiceHubInternal.kt | 59 +++-- .../node/services/messaging/Messaging.kt | 232 ++++++++++++++++++ .../services/messaging/NodeMessagingClient.kt | 8 +- .../messaging}/ServiceRequestMessage.kt | 5 +- .../network/InMemoryNetworkMapCache.kt | 12 +- .../services/network/NetworkMapService.kt | 6 +- .../statemachine/StateMachineManager.kt | 6 +- .../node/messaging/InMemoryMessagingTests.kt | 6 +- .../node/services/MockServiceHubInternal.kt | 11 +- .../messaging/ArtemisMessagingTests.kt | 4 - .../network/AbstractNetworkMapServiceTest.kt | 15 +- .../kotlin/net/corda/vega/flows/SimmFlow.kt | 2 +- .../testing/node/InMemoryMessagingNetwork.kt | 10 +- .../kotlin/net/corda/testing/node/MockNode.kt | 4 +- .../net/corda/testing/node/MockServices.kt | 2 - 25 files changed, 331 insertions(+), 343 deletions(-) create mode 100644 node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt rename {core/src/main/kotlin/net/corda/flows => node/src/main/kotlin/net/corda/node/services/messaging}/ServiceRequestMessage.kt (85%) diff --git a/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt b/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt index ea948ca162..44a297e979 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt @@ -1,205 +1,5 @@ package net.corda.core.messaging -import com.google.common.util.concurrent.ListenableFuture -import com.google.common.util.concurrent.SettableFuture -import net.corda.core.catch -import net.corda.core.node.services.DEFAULT_SESSION_ID -import net.corda.core.node.services.PartyInfo -import net.corda.core.serialization.CordaSerializable -import net.corda.core.serialization.DeserializeAsKotlinObjectDef -import net.corda.core.serialization.deserialize -import net.corda.core.serialization.serialize -import org.bouncycastle.asn1.x500.X500Name -import java.time.Instant -import java.util.* -import java.util.concurrent.atomic.AtomicBoolean -import javax.annotation.concurrent.ThreadSafe - -/** - * A [MessagingService] sits at the boundary between a message routing / networking layer and the core platform code. - * - * A messaging system must provide the ability to send 1:many messages, potentially to an abstract "group", the - * membership of which is defined elsewhere. Messages are atomic and the system guarantees that a sent message - * _eventually_ will arrive in the exact form it was sent, however, messages can be arbitrarily re-ordered or delayed. - * - * Example implementations might be a custom P2P layer, Akka, Apache Kafka, etc. It is assumed that the message layer - * is *reliable* and as such messages may be stored to disk once queued. - */ -@ThreadSafe -interface MessagingService { - /** - * The provided function will be invoked for each received message whose topic matches the given string. The callback - * will run on threads provided by the messaging service, and the callback is expected to be thread safe as a result. - * - * The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler]. - * The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister - * itself and yet addMessageHandler hasn't returned the handle yet. - * - * @param topic identifier for the general subject of the message, for example "platform.network_map.fetch". - * The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]). - * @param sessionID identifier for the session the message is part of. For services listening before - * a session is established, use [DEFAULT_SESSION_ID]. - */ - fun addMessageHandler(topic: String = "", sessionID: Long = DEFAULT_SESSION_ID, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration - - /** - * The provided function will be invoked for each received message whose topic and session matches. The callback - * will run on the main server thread provided when the messaging service is constructed, and a database - * transaction is set up for you automatically. - * - * The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler]. - * The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister - * itself and yet addMessageHandler hasn't returned the handle yet. - * - * @param topicSession identifier for the topic and session to listen for messages arriving on. - */ - fun addMessageHandler(topicSession: TopicSession, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration - - /** - * Removes a handler given the object returned from [addMessageHandler]. The callback will no longer be invoked once - * this method has returned, although executions that are currently in flight will not be interrupted. - * - * @throws IllegalArgumentException if the given registration isn't valid for this messaging service. - * @throws IllegalStateException if the given registration was already de-registered. - */ - fun removeMessageHandler(registration: MessageHandlerRegistration) - - /** - * Sends a message to the given receiver. The details of how receivers are identified is up to the messaging - * implementation: the type system provides an opaque high level view, with more fine grained control being - * available via type casting. Once this function returns the message is queued for delivery but not necessarily - * delivered: if the recipients are offline then the message could be queued hours or days later. - * - * There is no way to know if a message has been received. If your flow requires this, you need the recipient - * to send an ACK message back. - */ - fun send(message: Message, target: MessageRecipients) - - /** - * Returns an initialised [Message] with the current time, etc, already filled in. - * - * @param topicSession identifier for the topic and session the message is sent to. - */ - fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID = UUID.randomUUID()): Message - - /** Given information about either a specific node or a service returns its corresponding address */ - fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients - - /** Returns an address that refers to this node. */ - val myAddress: SingleMessageRecipient -} - -/** - * Returns an initialised [Message] with the current time, etc, already filled in. - * - * @param topic identifier for the general subject of the message, for example "platform.network_map.fetch". - * Must not be blank. - * @param sessionID identifier for the session the message is part of. For messages sent to services before the - * construction of a session, use [DEFAULT_SESSION_ID]. - */ -fun MessagingService.createMessage(topic: String, sessionID: Long = DEFAULT_SESSION_ID, data: ByteArray): Message - = createMessage(TopicSession(topic, sessionID), data) - -/** - * Registers a handler for the given topic and session ID that runs the given callback with the message and then removes - * itself. This is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback - * doesn't take the registration object, unlike the callback to [MessagingService.addMessageHandler], as the handler is - * automatically deregistered before the callback runs. - * - * @param topic identifier for the general subject of the message, for example "platform.network_map.fetch". - * The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]). - * @param sessionID identifier for the session the message is part of. For services listening before - * a session is established, use [DEFAULT_SESSION_ID]. - */ -fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, callback: (ReceivedMessage) -> Unit) - = runOnNextMessage(TopicSession(topic, sessionID), callback) - -/** - * Registers a handler for the given topic and session that runs the given callback with the message and then removes - * itself. This is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback - * doesn't take the registration object, unlike the callback to [MessagingService.addMessageHandler]. - * - * @param topicSession identifier for the topic and session to listen for messages arriving on. - */ -inline fun MessagingService.runOnNextMessage(topicSession: TopicSession, crossinline callback: (ReceivedMessage) -> Unit) { - val consumed = AtomicBoolean() - addMessageHandler(topicSession) { msg, reg -> - removeMessageHandler(reg) - check(!consumed.getAndSet(true)) { "Called more than once" } - check(msg.topicSession == topicSession) { "Topic/session mismatch: ${msg.topicSession} vs $topicSession" } - callback(msg) - } -} - -/** - * Returns a [ListenableFuture] of the next message payload ([Message.data]) which is received on the given topic and sessionId. - * The payload is deserialized to an object of type [M]. Any exceptions thrown will be captured by the future. - */ -fun MessagingService.onNext(topic: String, sessionId: Long): ListenableFuture { - val messageFuture = SettableFuture.create() - runOnNextMessage(topic, sessionId) { message -> - messageFuture.catch { - message.data.deserialize() - } - } - return messageFuture -} - -fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID()) - = send(TopicSession(topic, sessionID), payload, to, uuid) - -fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID()) - = send(createMessage(topicSession, payload.serialize().bytes, uuid), to) - -interface MessageHandlerRegistration - -/** - * An identifier for the endpoint [MessagingService] message handlers listen at. - * - * @param topic identifier for the general subject of the message, for example "platform.network_map.fetch". - * The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]). - * @param sessionID identifier for the session the message is part of. For services listening before - * a session is established, use [DEFAULT_SESSION_ID]. - */ -@CordaSerializable -data class TopicSession(val topic: String, val sessionID: Long = DEFAULT_SESSION_ID) { - fun isBlank() = topic.isBlank() && sessionID == DEFAULT_SESSION_ID - override fun toString(): String = "$topic.$sessionID" -} - -/** - * A message is defined, at this level, to be a (topic, timestamp, byte arrays) triple, where the topic is a string in - * Java-style reverse dns form, with "platform." being a prefix reserved by the platform for its own use. Vendor - * specific messages can be defined, but use your domain name as the prefix e.g. "uk.co.bigbank.messages.SomeMessage". - * - * The debugTimestamp field is intended to aid in tracking messages as they flow across the network, likewise, the - * message ID is intended to be an ad-hoc way to identify a message sent in the system through debug logs and so on. - * These IDs and timestamps should not be assumed to be globally unique, although due to the nanosecond precision of - * the timestamp field they probably will be, even if an implementation just uses a hash prefix as the message id. - */ -interface Message { - val topicSession: TopicSession - val data: ByteArray - val debugTimestamp: Instant - val uniqueMessageId: UUID -} - -// TODO Have ReceivedMessage point to the TLS certificate of the peer, and [peer] would simply be the subject DN of that. -// The certificate would need to be serialised into the message header or just its fingerprint and then download it via RPC, -// or something like that. -interface ReceivedMessage : Message { - /** The authenticated sender. */ - val peer: X500Name - /** Platform version of the sender's node. */ - val platformVersion: Int -} - -/** A singleton that's useful for validating topic strings */ -object TopicStringValidator { - private val regex = "[a-zA-Z0-9.]+".toPattern() - /** @throws IllegalArgumentException if the given topic contains invalid characters */ - fun check(tag: String) = require(regex.matcher(tag).matches()) -} /** The interface for a group of message recipients (which may contain only one recipient) */ interface MessageRecipients @@ -212,10 +12,3 @@ interface MessageRecipientGroup : MessageRecipients /** A special base class for the set of all possible recipients, without having to identify who they all are. */ interface AllPossibleRecipients : MessageRecipients - -/** - * A general Ack message that conveys no content other than it's presence for use when you want an acknowledgement - * from a recipient. Using [Unit] can be ambiguous as it is similar to [Void] and so could mean no response. - */ -@CordaSerializable -object Ack : DeserializeAsKotlinObjectDef diff --git a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt index 1064d537ed..3cc519c088 100644 --- a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt +++ b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt @@ -2,10 +2,6 @@ package net.corda.core.node import net.corda.core.contracts.* import net.corda.core.crypto.keys -import net.corda.core.flows.FlowInitiator -import net.corda.core.flows.FlowLogic -import net.corda.core.flows.FlowStateMachine -import net.corda.core.messaging.MessagingService import net.corda.core.node.services.* import net.corda.core.transactions.SignedTransaction import java.security.KeyPair @@ -41,7 +37,6 @@ interface ServicesForResolution { interface ServiceHub : ServicesForResolution { val vaultService: VaultService val keyManagementService: KeyManagementService - val networkService: MessagingService override val storageService: StorageService val networkMapCache: NetworkMapCache val schedulerService: SchedulerService diff --git a/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt b/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt index 480957e4ac..d8dacf4611 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/NetworkMapCache.kt @@ -1,11 +1,8 @@ package net.corda.core.node.services -import com.google.common.annotations.VisibleForTesting import com.google.common.util.concurrent.ListenableFuture import net.corda.core.contracts.Contract import net.corda.core.crypto.Party -import net.corda.core.messaging.MessagingService -import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.NodeInfo import net.corda.core.randomOrNull import net.corda.core.serialization.CordaSerializable @@ -119,39 +116,4 @@ interface NetworkMapCache { "Your options are: ${notaryNodes.map { "\"${it.notaryIdentity.name}\"" }.joinToString()}.") return notary.advertisedServices.any { it.info.type.isValidatingNotary() } } - - /** - * Add a network map service; fetches a copy of the latest map from the service and subscribes to any further - * updates. - * @param net the network messaging service. - * @param networkMapAddress the network map service to fetch current state from. - * @param subscribe if the cache should subscribe to updates. - * @param ifChangedSinceVer an optional version number to limit updating the map based on. If the latest map - * version is less than or equal to the given version, no update is fetched. - */ - fun addMapService(net: MessagingService, networkMapAddress: SingleMessageRecipient, - subscribe: Boolean, ifChangedSinceVer: Int? = null): ListenableFuture - - /** Adds a node to the local cache (generally only used for adding ourselves). */ - fun addNode(node: NodeInfo) - - /** Removes a node from the local cache. */ - fun removeNode(node: NodeInfo) - - /** - * Deregister from updates from the given map service. - * @param net the network messaging service. - * @param service the network map service to fetch current state from. - */ - fun deregisterForUpdates(net: MessagingService, service: NodeInfo): ListenableFuture - - /** For testing where the network map cache is manipulated marks the service as immediately ready. */ - @VisibleForTesting - fun runWithoutMapService() -} - -@CordaSerializable -sealed class NetworkCacheError : Exception() { - /** Indicates a failure to deregister, because of a rejected request from the remote node */ - class DeregistrationFailed : NetworkCacheError() } diff --git a/core/src/test/kotlin/net/corda/core/serialization/KryoTests.kt b/core/src/test/kotlin/net/corda/core/serialization/KryoTests.kt index e030fee171..ddde679fd8 100644 --- a/core/src/test/kotlin/net/corda/core/serialization/KryoTests.kt +++ b/core/src/test/kotlin/net/corda/core/serialization/KryoTests.kt @@ -3,7 +3,7 @@ package net.corda.core.serialization import com.esotericsoftware.kryo.Kryo import com.google.common.primitives.Ints import net.corda.core.crypto.* -import net.corda.core.messaging.Ack +import net.corda.node.services.messaging.Ack import net.corda.node.services.persistence.NodeAttachmentService import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy diff --git a/docs/source/messaging.rst b/docs/source/messaging.rst index 9823df1f50..a62d6802c7 100644 --- a/docs/source/messaging.rst +++ b/docs/source/messaging.rst @@ -109,12 +109,3 @@ the validated user is the username itself and the RPC framework uses this to det The broker also does host verification when connecting to another peer. It checks that the TLS certificate common name matches with the advertised legal name from the network map service. - -Messaging types ---------------- - -Every ``Message`` object has an associated *topic* and may have a *session ID*. These are wrapped in a ``TopicSession``. -An implementation of ``MessagingService`` can be used to create messages and send them. You can get access to the -messaging service via the ``ServiceHub`` object that is provided to your app. Endpoints on the network are -identified at the lowest level using ``SingleMessageRecipient`` which may be e.g. an IP address, or in future -versions perhaps a routing path through the network. diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt index 5bc8deafaa..9ee1c38a8d 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt @@ -5,16 +5,16 @@ import com.google.common.util.concurrent.ListenableFuture import net.corda.core.* import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.SingleMessageRecipient -import net.corda.core.messaging.createMessage import net.corda.core.node.services.DEFAULT_SESSION_ID import net.corda.core.node.services.ServiceInfo import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.utilities.* -import net.corda.flows.ServiceRequestMessage -import net.corda.flows.sendRequest import net.corda.node.internal.Node +import net.corda.node.services.messaging.ServiceRequestMessage +import net.corda.node.services.messaging.createMessage +import net.corda.node.services.messaging.sendRequest import net.corda.node.services.transactions.RaftValidatingNotaryService import net.corda.node.services.transactions.SimpleNotaryService import net.corda.node.utilities.ServiceIdentityGenerator diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt index c44e697caf..df615e7165 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt @@ -8,9 +8,9 @@ import net.corda.core.node.NodeInfo import net.corda.core.random63BitValue import net.corda.core.seconds import net.corda.core.utilities.BOB -import net.corda.flows.sendRequest import net.corda.node.internal.NetworkMapInfo import net.corda.node.services.config.configureWithDevSSLCertificate +import net.corda.node.services.messaging.sendRequest import net.corda.node.services.network.NetworkMapService import net.corda.node.services.network.NetworkMapService.RegistrationRequest import net.corda.node.services.network.NodeRegistration diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index ab08631366..6b65d9444c 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -32,6 +32,8 @@ import net.corda.node.services.events.NodeSchedulerService import net.corda.node.services.events.ScheduledActivityObserver import net.corda.node.services.identity.InMemoryIdentityService import net.corda.node.services.keys.PersistentKeyManagementService +import net.corda.node.services.messaging.MessagingService +import net.corda.node.services.messaging.sendRequest import net.corda.node.services.network.InMemoryNetworkMapCache import net.corda.node.services.network.NetworkMapService import net.corda.node.services.network.NetworkMapService.RegistrationResponse @@ -112,8 +114,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, protected val partyKeys = mutableSetOf() val services = object : ServiceHubInternal() { - override val networkService: MessagingServiceInternal get() = net - override val networkMapCache: NetworkMapCache get() = netMapCache + override val networkService: MessagingService get() = net + override val networkMapCache: NetworkMapCacheInternal get() = netMapCache override val storageService: TxWritableStorageService get() = storage override val vaultService: VaultService get() = vault override val keyManagementService: KeyManagementService get() = keyManagement @@ -162,8 +164,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, var inNodeNetworkMapService: NetworkMapService? = null lateinit var txVerifierService: TransactionVerifierService lateinit var identity: IdentityService - lateinit var net: MessagingServiceInternal - lateinit var netMapCache: NetworkMapCache + lateinit var net: MessagingService + lateinit var netMapCache: NetworkMapCacheInternal lateinit var scheduler: NodeSchedulerService lateinit var flowLogicFactory: FlowLogicRefFactory lateinit var schemas: SchemaService @@ -528,7 +530,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, runOnStop.clear() } - protected abstract fun makeMessagingService(): MessagingServiceInternal + protected abstract fun makeMessagingService(): MessagingService protected abstract fun startMessagingService(rpcOps: RPCOps) diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 7242ce1f08..1063d2b6ca 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -19,9 +19,9 @@ import net.corda.node.printBasicNodeInfo import net.corda.node.serialization.NodeClock import net.corda.node.services.RPCUserService import net.corda.node.services.RPCUserServiceImpl -import net.corda.node.services.api.MessagingServiceInternal import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.messaging.ArtemisMessagingServer +import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.NodeMessagingClient import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.services.transactions.RaftNonValidatingNotaryService @@ -109,7 +109,7 @@ class Node(override val configuration: FullNodeConfiguration, private lateinit var userService: RPCUserService - override fun makeMessagingService(): MessagingServiceInternal { + override fun makeMessagingService(): MessagingService { userService = RPCUserServiceImpl(configuration.rpcUsers) val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker() val myIdentityOrNullIfNetworkMapService = if (networkMapAddress != null) obtainLegalIdentity().owningKey else null diff --git a/node/src/main/kotlin/net/corda/node/services/api/AbstractNodeService.kt b/node/src/main/kotlin/net/corda/node/services/api/AbstractNodeService.kt index 633dc2ddca..13edf41170 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/AbstractNodeService.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/AbstractNodeService.kt @@ -1,13 +1,10 @@ package net.corda.node.services.api -import net.corda.core.messaging.Message -import net.corda.core.messaging.MessageHandlerRegistration -import net.corda.core.messaging.createMessage import net.corda.core.node.services.DEFAULT_SESSION_ID import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize -import net.corda.flows.ServiceRequestMessage +import net.corda.node.services.messaging.* import javax.annotation.concurrent.ThreadSafe /** @@ -16,7 +13,7 @@ import javax.annotation.concurrent.ThreadSafe @ThreadSafe abstract class AbstractNodeService(val services: ServiceHubInternal) : SingletonSerializeAsToken() { - val net: MessagingServiceInternal get() = services.networkService + val net: MessagingService get() = services.networkService /** * Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index acff817c4e..7320b8ffcf 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -6,38 +6,56 @@ import net.corda.core.flows.FlowInitiator import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogicRefFactory import net.corda.core.flows.FlowStateMachine -import net.corda.core.messaging.MessagingService +import net.corda.core.messaging.SingleMessageRecipient +import net.corda.core.node.NodeInfo import net.corda.core.node.PluginServiceHub +import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.TxWritableStorageService +import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.loggerFor import net.corda.node.internal.ServiceFlowInfo +import net.corda.node.services.messaging.MessagingService import net.corda.node.services.statemachine.FlowStateMachineImpl -interface MessagingServiceInternal : MessagingService { +interface NetworkMapCacheInternal : NetworkMapCache { /** - * Initiates shutdown: if called from a thread that isn't controlled by the executor passed to the constructor - * then this will block until all in-flight messages have finished being handled and acknowledged. If called - * from a thread that's a part of the [net.corda.node.utilities.AffinityExecutor] given to the constructor, - * it returns immediately and shutdown is asynchronous. + * Deregister from updates from the given map service. + * @param net the network messaging service. + * @param service the network map service to fetch current state from. */ - fun stop() + fun deregisterForUpdates(net: MessagingService, service: NodeInfo): ListenableFuture + + /** + * Add a network map service; fetches a copy of the latest map from the service and subscribes to any further + * updates. + * @param net the network messaging service. + * @param networkMapAddress the network map service to fetch current state from. + * @param subscribe if the cache should subscribe to updates. + * @param ifChangedSinceVer an optional version number to limit updating the map based on. If the latest map + * version is less than or equal to the given version, no update is fetched. + */ + fun addMapService(net: MessagingService, networkMapAddress: SingleMessageRecipient, + subscribe: Boolean, ifChangedSinceVer: Int? = null): ListenableFuture + + /** Adds a node to the local cache (generally only used for adding ourselves). */ + fun addNode(node: NodeInfo) + + /** Removes a node from the local cache. */ + fun removeNode(node: NodeInfo) + + /** For testing where the network map cache is manipulated marks the service as immediately ready. */ + @VisibleForTesting + fun runWithoutMapService() + } -/** - * This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods - * on the messaging service interface until you have successfully started up the system. One of these objects should - * be the only way to obtain a reference to a [MessagingService]. Startup may be a slow process: some implementations - * may let you cast the returned future to an object that lets you get status info. - * - * A specific implementation of the controller class will have extra features that let you customise it before starting - * it up. - */ -interface MessagingServiceBuilder { - fun start(): ListenableFuture +@CordaSerializable +sealed class NetworkCacheError : Exception() { + /** Indicates a failure to deregister, because of a rejected request from the remote node */ + class DeregistrationFailed : NetworkCacheError() } - abstract class ServiceHubInternal : PluginServiceHub { companion object { private val log = loggerFor() @@ -46,8 +64,9 @@ abstract class ServiceHubInternal : PluginServiceHub { abstract val monitoringService: MonitoringService abstract val flowLogicRefFactory: FlowLogicRefFactory abstract val schemaService: SchemaService + abstract override val networkMapCache: NetworkMapCacheInternal - abstract override val networkService: MessagingServiceInternal + abstract val networkService: MessagingService /** * Given a list of [SignedTransaction]s, writes them to the given storage for validated transactions and then diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt new file mode 100644 index 0000000000..8699fc0257 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt @@ -0,0 +1,232 @@ +package net.corda.node.services.messaging + +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.SettableFuture +import net.corda.core.catch +import net.corda.core.messaging.MessageRecipients +import net.corda.core.messaging.SingleMessageRecipient +import net.corda.core.node.services.DEFAULT_SESSION_ID +import net.corda.core.node.services.PartyInfo +import net.corda.core.serialization.CordaSerializable +import net.corda.core.serialization.DeserializeAsKotlinObjectDef +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize +import org.bouncycastle.asn1.x500.X500Name +import java.time.Instant +import java.util.* +import java.util.concurrent.atomic.AtomicBoolean +import javax.annotation.concurrent.ThreadSafe + +/** + * A [MessagingService] sits at the boundary between a message routing / networking layer and the core platform code. + * + * A messaging system must provide the ability to send 1:many messages, potentially to an abstract "group", the + * membership of which is defined elsewhere. Messages are atomic and the system guarantees that a sent message + * _eventually_ will arrive in the exact form it was sent, however, messages can be arbitrarily re-ordered or delayed. + * + * Example implementations might be a custom P2P layer, Akka, Apache Kafka, etc. It is assumed that the message layer + * is *reliable* and as such messages may be stored to disk once queued. + */ +@ThreadSafe +interface MessagingService { + /** + * The provided function will be invoked for each received message whose topic matches the given string. The callback + * will run on threads provided by the messaging service, and the callback is expected to be thread safe as a result. + * + * The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler]. + * The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister + * itself and yet addMessageHandler hasn't returned the handle yet. + * + * @param topic identifier for the general subject of the message, for example "platform.network_map.fetch". + * The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]). + * @param sessionID identifier for the session the message is part of. For services listening before + * a session is established, use [DEFAULT_SESSION_ID]. + */ + fun addMessageHandler(topic: String = "", sessionID: Long = DEFAULT_SESSION_ID, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration + + /** + * The provided function will be invoked for each received message whose topic and session matches. The callback + * will run on the main server thread provided when the messaging service is constructed, and a database + * transaction is set up for you automatically. + * + * The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler]. + * The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister + * itself and yet addMessageHandler hasn't returned the handle yet. + * + * @param topicSession identifier for the topic and session to listen for messages arriving on. + */ + fun addMessageHandler(topicSession: TopicSession, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration + + /** + * Removes a handler given the object returned from [addMessageHandler]. The callback will no longer be invoked once + * this method has returned, although executions that are currently in flight will not be interrupted. + * + * @throws IllegalArgumentException if the given registration isn't valid for this messaging service. + * @throws IllegalStateException if the given registration was already de-registered. + */ + fun removeMessageHandler(registration: MessageHandlerRegistration) + + /** + * Sends a message to the given receiver. The details of how receivers are identified is up to the messaging + * implementation: the type system provides an opaque high level view, with more fine grained control being + * available via type casting. Once this function returns the message is queued for delivery but not necessarily + * delivered: if the recipients are offline then the message could be queued hours or days later. + * + * There is no way to know if a message has been received. If your flow requires this, you need the recipient + * to send an ACK message back. + */ + fun send(message: Message, target: MessageRecipients) + + /** + * Returns an initialised [Message] with the current time, etc, already filled in. + * + * @param topicSession identifier for the topic and session the message is sent to. + */ + fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID = UUID.randomUUID()): Message + + /** Given information about either a specific node or a service returns its corresponding address */ + fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients + + /** Returns an address that refers to this node. */ + val myAddress: SingleMessageRecipient + + /** + * Initiates shutdown: if called from a thread that isn't controlled by the executor passed to the constructor + * then this will block until all in-flight messages have finished being handled and acknowledged. If called + * from a thread that's a part of the [net.corda.node.utilities.AffinityExecutor] given to the constructor, + * it returns immediately and shutdown is asynchronous. + */ + fun stop() +} + +/** + * Returns an initialised [Message] with the current time, etc, already filled in. + * + * @param topic identifier for the general subject of the message, for example "platform.network_map.fetch". + * Must not be blank. + * @param sessionID identifier for the session the message is part of. For messages sent to services before the + * construction of a session, use [DEFAULT_SESSION_ID]. + */ +fun MessagingService.createMessage(topic: String, sessionID: Long = DEFAULT_SESSION_ID, data: ByteArray): Message + = createMessage(TopicSession(topic, sessionID), data) + +/** + * Registers a handler for the given topic and session ID that runs the given callback with the message and then removes + * itself. This is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback + * doesn't take the registration object, unlike the callback to [MessagingService.addMessageHandler], as the handler is + * automatically deregistered before the callback runs. + * + * @param topic identifier for the general subject of the message, for example "platform.network_map.fetch". + * The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]). + * @param sessionID identifier for the session the message is part of. For services listening before + * a session is established, use [DEFAULT_SESSION_ID]. + */ +fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, callback: (ReceivedMessage) -> Unit) + = runOnNextMessage(TopicSession(topic, sessionID), callback) + +/** + * Registers a handler for the given topic and session that runs the given callback with the message and then removes + * itself. This is useful for one-shot handlers that aren't supposed to stick around permanently. Note that this callback + * doesn't take the registration object, unlike the callback to [MessagingService.addMessageHandler]. + * + * @param topicSession identifier for the topic and session to listen for messages arriving on. + */ +inline fun MessagingService.runOnNextMessage(topicSession: TopicSession, crossinline callback: (ReceivedMessage) -> Unit) { + val consumed = AtomicBoolean() + addMessageHandler(topicSession) { msg, reg -> + removeMessageHandler(reg) + check(!consumed.getAndSet(true)) { "Called more than once" } + check(msg.topicSession == topicSession) { "Topic/session mismatch: ${msg.topicSession} vs $topicSession" } + callback(msg) + } +} + +/** + * Returns a [ListenableFuture] of the next message payload ([Message.data]) which is received on the given topic and sessionId. + * The payload is deserialized to an object of type [M]. Any exceptions thrown will be captured by the future. + */ +fun MessagingService.onNext(topic: String, sessionId: Long): ListenableFuture { + val messageFuture = SettableFuture.create() + runOnNextMessage(topic, sessionId) { message -> + messageFuture.catch { + message.data.deserialize() + } + } + return messageFuture +} + +fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID()) + = send(TopicSession(topic, sessionID), payload, to, uuid) + +fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID()) + = send(createMessage(topicSession, payload.serialize().bytes, uuid), to) + +/** + * This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods + * on the messaging service interface until you have successfully started up the system. One of these objects should + * be the only way to obtain a reference to a [MessagingService]. Startup may be a slow process: some implementations + * may let you cast the returned future to an object that lets you get status info. + * + * A specific implementation of the controller class will have extra features that let you customise it before starting + * it up. + */ +interface MessagingServiceBuilder { + fun start(): ListenableFuture +} + +interface MessageHandlerRegistration + +/** + * An identifier for the endpoint [MessagingService] message handlers listen at. + * + * @param topic identifier for the general subject of the message, for example "platform.network_map.fetch". + * The topic can be the empty string to match all messages (session ID must be [DEFAULT_SESSION_ID]). + * @param sessionID identifier for the session the message is part of. For services listening before + * a session is established, use [DEFAULT_SESSION_ID]. + */ +@CordaSerializable +data class TopicSession(val topic: String, val sessionID: Long = DEFAULT_SESSION_ID) { + fun isBlank() = topic.isBlank() && sessionID == DEFAULT_SESSION_ID + override fun toString(): String = "$topic.$sessionID" +} + +/** + * A message is defined, at this level, to be a (topic, timestamp, byte arrays) triple, where the topic is a string in + * Java-style reverse dns form, with "platform." being a prefix reserved by the platform for its own use. Vendor + * specific messages can be defined, but use your domain name as the prefix e.g. "uk.co.bigbank.messages.SomeMessage". + * + * The debugTimestamp field is intended to aid in tracking messages as they flow across the network, likewise, the + * message ID is intended to be an ad-hoc way to identify a message sent in the system through debug logs and so on. + * These IDs and timestamps should not be assumed to be globally unique, although due to the nanosecond precision of + * the timestamp field they probably will be, even if an implementation just uses a hash prefix as the message id. + */ +interface Message { + val topicSession: TopicSession + val data: ByteArray + val debugTimestamp: Instant + val uniqueMessageId: UUID +} + +// TODO Have ReceivedMessage point to the TLS certificate of the peer, and [peer] would simply be the subject DN of that. +// The certificate would need to be serialised into the message header or just its fingerprint and then download it via RPC, +// or something like that. +interface ReceivedMessage : Message { + /** The authenticated sender. */ + val peer: X500Name + /** Platform version of the sender's node. */ + val platformVersion: Int +} + +/** A singleton that's useful for validating topic strings */ +object TopicStringValidator { + private val regex = "[a-zA-Z0-9.]+".toPattern() + /** @throws IllegalArgumentException if the given topic contains invalid characters */ + fun check(tag: String) = require(regex.matcher(tag).matches()) +} + +/** + * A general Ack message that conveys no content other than it's presence for use when you want an acknowledgement + * from a recipient. Using [Unit] can be ambiguous as it is similar to [Void] and so could mean no response. + */ +@CordaSerializable +object Ack : DeserializeAsKotlinObjectDef diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt index e4683ca31b..92ae5b644a 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt @@ -3,7 +3,10 @@ package net.corda.node.services.messaging import com.google.common.net.HostAndPort import com.google.common.util.concurrent.ListenableFuture import net.corda.core.ThreadBox -import net.corda.core.messaging.* +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.MessageRecipients +import net.corda.core.messaging.RPCOps +import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.VersionInfo import net.corda.core.node.services.PartyInfo import net.corda.core.node.services.TransactionVerifierService @@ -15,7 +18,6 @@ import net.corda.core.transactions.LedgerTransaction import net.corda.core.utilities.loggerFor import net.corda.core.utilities.trace import net.corda.node.services.RPCUserService -import net.corda.node.services.api.MessagingServiceInternal import net.corda.node.services.api.MonitoringService import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.VerifierType @@ -75,7 +77,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, val database: Database, val networkMapRegistrationFuture: ListenableFuture, val monitoringService: MonitoringService -) : ArtemisMessagingComponent(), MessagingServiceInternal { +) : ArtemisMessagingComponent(), MessagingService { companion object { private val log = loggerFor() diff --git a/core/src/main/kotlin/net/corda/flows/ServiceRequestMessage.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ServiceRequestMessage.kt similarity index 85% rename from core/src/main/kotlin/net/corda/flows/ServiceRequestMessage.kt rename to node/src/main/kotlin/net/corda/node/services/messaging/ServiceRequestMessage.kt index 95db4bfd9b..35ce5218d1 100644 --- a/core/src/main/kotlin/net/corda/flows/ServiceRequestMessage.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ServiceRequestMessage.kt @@ -1,7 +1,8 @@ -package net.corda.flows +package net.corda.node.services.messaging import com.google.common.util.concurrent.ListenableFuture -import net.corda.core.messaging.* +import net.corda.core.messaging.MessageRecipients +import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.services.DEFAULT_SESSION_ID import net.corda.core.serialization.CordaSerializable diff --git a/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt index fcc13b5d90..89761813a3 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/InMemoryNetworkMapCache.kt @@ -6,20 +6,20 @@ import com.google.common.util.concurrent.SettableFuture import net.corda.core.bufferUntilSubscribed import net.corda.core.crypto.Party import net.corda.core.map -import net.corda.core.messaging.MessagingService import net.corda.core.messaging.SingleMessageRecipient -import net.corda.core.messaging.createMessage import net.corda.core.node.NodeInfo import net.corda.core.node.services.DEFAULT_SESSION_ID -import net.corda.core.node.services.NetworkCacheError -import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.NetworkMapCache.MapChange import net.corda.core.node.services.PartyInfo import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.utilities.loggerFor -import net.corda.flows.sendRequest +import net.corda.node.services.api.NetworkCacheError +import net.corda.node.services.api.NetworkMapCacheInternal +import net.corda.node.services.messaging.MessagingService +import net.corda.node.services.messaging.createMessage +import net.corda.node.services.messaging.sendRequest import net.corda.node.services.network.NetworkMapService.FetchMapResponse import net.corda.node.services.network.NetworkMapService.SubscribeResponse import net.corda.node.utilities.AddOrRemove @@ -36,7 +36,7 @@ import javax.annotation.concurrent.ThreadSafe * Extremely simple in-memory cache of the network map. */ @ThreadSafe -open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCache { +open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCacheInternal { companion object { val logger = loggerFor() } diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt index ed15e45ef4..3610931e43 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapService.kt @@ -3,10 +3,8 @@ package net.corda.node.services.network import com.google.common.annotations.VisibleForTesting import net.corda.core.ThreadBox import net.corda.core.crypto.* -import net.corda.core.messaging.MessageHandlerRegistration import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.SingleMessageRecipient -import net.corda.core.messaging.createMessage import net.corda.core.node.NodeInfo import net.corda.core.node.services.DEFAULT_SESSION_ID import net.corda.core.node.services.NetworkMapCache @@ -17,9 +15,11 @@ import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.utilities.loggerFor -import net.corda.flows.ServiceRequestMessage import net.corda.node.services.api.AbstractNodeService import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.messaging.MessageHandlerRegistration +import net.corda.node.services.messaging.ServiceRequestMessage +import net.corda.node.services.messaging.createMessage import net.corda.node.services.network.NetworkMapService.* import net.corda.node.services.network.NetworkMapService.Companion.FETCH_TOPIC import net.corda.node.services.network.NetworkMapService.Companion.PUSH_ACK_TOPIC diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 3baa199eed..a43228337c 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -18,9 +18,6 @@ import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash import net.corda.core.crypto.commonName import net.corda.core.flows.* -import net.corda.core.messaging.ReceivedMessage -import net.corda.core.messaging.TopicSession -import net.corda.core.messaging.send import net.corda.core.serialization.* import net.corda.core.utilities.debug import net.corda.core.utilities.loggerFor @@ -29,6 +26,9 @@ import net.corda.node.internal.ServiceFlowInfo import net.corda.node.services.api.Checkpoint import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.messaging.ReceivedMessage +import net.corda.node.services.messaging.TopicSession +import net.corda.node.services.messaging.send import net.corda.node.utilities.* import org.apache.activemq.artemis.utils.ReusableLatch import org.jetbrains.exposed.sql.Database diff --git a/node/src/test/kotlin/net/corda/node/messaging/InMemoryMessagingTests.kt b/node/src/test/kotlin/net/corda/node/messaging/InMemoryMessagingTests.kt index 187e4c4cd7..e62045e315 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/InMemoryMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/InMemoryMessagingTests.kt @@ -1,10 +1,10 @@ package net.corda.node.messaging -import net.corda.core.messaging.Message -import net.corda.core.messaging.TopicStringValidator -import net.corda.core.messaging.createMessage import net.corda.core.node.services.DEFAULT_SESSION_ID import net.corda.core.node.services.ServiceInfo +import net.corda.node.services.messaging.Message +import net.corda.node.services.messaging.TopicStringValidator +import net.corda.node.services.messaging.createMessage import net.corda.node.services.network.NetworkMapService import net.corda.testing.node.MockNetwork import org.junit.Test diff --git a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt b/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt index a89f2d05a4..959118275d 100644 --- a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt +++ b/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt @@ -11,10 +11,11 @@ import net.corda.core.node.services.* import net.corda.core.transactions.SignedTransaction import net.corda.node.internal.ServiceFlowInfo import net.corda.node.serialization.NodeClock -import net.corda.node.services.api.MessagingServiceInternal import net.corda.node.services.api.MonitoringService +import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.services.api.SchemaService import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.messaging.MessagingService import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.transactions.InMemoryTransactionVerifierService @@ -26,10 +27,10 @@ import java.time.Clock open class MockServiceHubInternal( val customVault: VaultService? = null, val keyManagement: KeyManagementService? = null, - val net: MessagingServiceInternal? = null, + val net: MessagingService? = null, val identity: IdentityService? = MOCK_IDENTITY_SERVICE, val storage: TxWritableStorageService? = MockStorageService(), - val mapCache: NetworkMapCache? = MockNetworkMapCache(), + val mapCache: NetworkMapCacheInternal? = MockNetworkMapCache(), val scheduler: SchedulerService? = null, val overrideClock: Clock? = NodeClock(), val flowFactory: FlowLogicRefFactory? = FlowLogicRefFactory(), @@ -44,9 +45,9 @@ open class MockServiceHubInternal( get() = keyManagement ?: throw UnsupportedOperationException() override val identityService: IdentityService get() = identity ?: throw UnsupportedOperationException() - override val networkService: MessagingServiceInternal + override val networkService: MessagingService get() = net ?: throw UnsupportedOperationException() - override val networkMapCache: NetworkMapCache + override val networkMapCache: NetworkMapCacheInternal get() = mapCache ?: throw UnsupportedOperationException() override val storageService: StorageService get() = storage ?: throw UnsupportedOperationException() diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt index 0e95ba7337..8365e53de6 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt @@ -5,12 +5,8 @@ import com.google.common.net.HostAndPort import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.SettableFuture -import com.typesafe.config.ConfigFactory.empty -import net.corda.core.crypto.X509Utilities import net.corda.core.crypto.generateKeyPair -import net.corda.core.messaging.Message import net.corda.core.messaging.RPCOps -import net.corda.core.messaging.createMessage import net.corda.core.node.services.DEFAULT_SESSION_ID import net.corda.core.utilities.ALICE import net.corda.core.utilities.LogHelper diff --git a/node/src/test/kotlin/net/corda/node/services/network/AbstractNetworkMapServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/network/AbstractNetworkMapServiceTest.kt index d62b704bb5..85c2f85a76 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/AbstractNetworkMapServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/AbstractNetworkMapServiceTest.kt @@ -1,16 +1,19 @@ package net.corda.node.services.network import com.google.common.util.concurrent.ListenableFuture -import net.corda.core.crypto.X509Utilities import net.corda.core.getOrThrow import net.corda.core.messaging.SingleMessageRecipient -import net.corda.core.messaging.send import net.corda.core.node.NodeInfo import net.corda.core.node.services.DEFAULT_SESSION_ID import net.corda.core.node.services.ServiceInfo import net.corda.core.serialization.deserialize -import net.corda.flows.sendRequest +import net.corda.core.utilities.ALICE +import net.corda.core.utilities.BOB +import net.corda.core.utilities.CHARLIE +import net.corda.core.utilities.DUMMY_MAP import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.messaging.send +import net.corda.node.services.messaging.sendRequest import net.corda.node.services.network.AbstractNetworkMapServiceTest.Changed.Added import net.corda.node.services.network.AbstractNetworkMapServiceTest.Changed.Removed import net.corda.node.services.network.NetworkMapService.* @@ -20,18 +23,12 @@ import net.corda.node.services.network.NetworkMapService.Companion.PUSH_TOPIC import net.corda.node.services.network.NetworkMapService.Companion.QUERY_TOPIC import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_TOPIC import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_TOPIC -import net.corda.node.services.network.NodeRegistration -import net.corda.core.utilities.ALICE -import net.corda.core.utilities.BOB -import net.corda.core.utilities.CHARLIE -import net.corda.core.utilities.DUMMY_MAP import net.corda.node.utilities.AddOrRemove import net.corda.node.utilities.AddOrRemove.ADD import net.corda.node.utilities.AddOrRemove.REMOVE import net.corda.testing.node.MockNetwork import net.corda.testing.node.MockNetwork.MockNode import org.assertj.core.api.Assertions.assertThat -import org.bouncycastle.asn1.x500.X500Name import org.eclipse.jetty.util.BlockingArrayQueue import org.junit.After import org.junit.Before diff --git a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt index f660b6a102..f226b0bdef 100644 --- a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt +++ b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt @@ -15,7 +15,6 @@ import net.corda.core.contracts.StateRef import net.corda.core.crypto.AnonymousParty import net.corda.core.crypto.Party import net.corda.core.flows.FlowLogic -import net.corda.core.messaging.Ack import net.corda.core.node.PluginServiceHub import net.corda.core.node.services.dealsWith import net.corda.core.serialization.CordaSerializable @@ -24,6 +23,7 @@ import net.corda.core.utilities.unwrap import net.corda.flows.AbstractStateReplacementFlow.Proposal import net.corda.flows.StateReplacementException import net.corda.flows.TwoPartyDealFlow +import net.corda.node.services.messaging.Ack import net.corda.vega.analytics.* import net.corda.vega.contracts.* import net.corda.vega.portfolio.Portfolio diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt b/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt index 33f1bcb25d..59a76536eb 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt @@ -6,14 +6,16 @@ import com.google.common.util.concurrent.SettableFuture import net.corda.core.ThreadBox import net.corda.core.crypto.X509Utilities import net.corda.core.getOrThrow -import net.corda.core.messaging.* +import net.corda.core.messaging.AllPossibleRecipients +import net.corda.core.messaging.MessageRecipientGroup +import net.corda.core.messaging.MessageRecipients +import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.ServiceEntry import net.corda.core.node.services.PartyInfo import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.trace -import net.corda.node.services.api.MessagingServiceBuilder -import net.corda.node.services.api.MessagingServiceInternal +import net.corda.node.services.messaging.* import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.JDBCHashSet import net.corda.node.utilities.transaction @@ -298,7 +300,7 @@ class InMemoryMessagingNetwork( inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val peerHandle: PeerHandle, private val executor: AffinityExecutor, - private val database: Database) : SingletonSerializeAsToken(), MessagingServiceInternal { + private val database: Database) : SingletonSerializeAsToken(), MessagingService { inner class Handler(val topicSession: TopicSession, val callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt index 6ecba4a8f4..f868df14b3 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -19,9 +19,9 @@ import net.corda.core.utilities.DUMMY_NOTARY_KEY import net.corda.core.utilities.loggerFor import net.corda.node.internal.AbstractNode import net.corda.node.internal.ServiceFlowInfo -import net.corda.node.services.api.MessagingServiceInternal import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.keys.E2ETestKeyManagementService +import net.corda.node.services.messaging.MessagingService import net.corda.node.services.network.InMemoryNetworkMapService import net.corda.node.services.network.NetworkMapService import net.corda.node.services.statemachine.flowVersion @@ -151,7 +151,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, // We only need to override the messaging service here, as currently everything that hits disk does so // through the java.nio API which we are already mocking via Jimfs. - override fun makeMessagingService(): MessagingServiceInternal { + override fun makeMessagingService(): MessagingService { require(id >= 0) { "Node ID must be zero or positive, was passed: " + id } return mockNet.messagingNetwork.createNodeWithID( !mockNet.threadPerNode, diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt index f434f7750a..aedd0a14f1 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -4,7 +4,6 @@ import net.corda.core.contracts.Attachment import net.corda.core.contracts.PartyAndReference import net.corda.core.crypto.* import net.corda.core.flows.StateMachineRunId -import net.corda.core.messaging.MessagingService import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.NodeInfo import net.corda.core.node.ServiceHub @@ -58,7 +57,6 @@ open class MockServices(val key: KeyPair = generateKeyPair()) : ServiceHub { override val keyManagementService: MockKeyManagementService = MockKeyManagementService(key) override val vaultService: VaultService get() = throw UnsupportedOperationException() - override val networkService: MessagingService get() = throw UnsupportedOperationException() override val networkMapCache: NetworkMapCache get() = throw UnsupportedOperationException() override val clock: Clock get() = Clock.systemUTC() override val schedulerService: SchedulerService get() = throw UnsupportedOperationException()