Rename ArtemisMessagingClient to NodeMessagingClient to make the purpose clearer once we start landing the clientrpc framework.

This commit is contained in:
Mike Hearn 2016-09-12 13:16:08 +01:00
parent dab883dcba
commit 0e37547af0
8 changed files with 35 additions and 35 deletions

View File

@ -10,7 +10,7 @@ import com.r3corda.core.node.services.ServiceType
import com.r3corda.node.services.config.FullNodeConfiguration import com.r3corda.node.services.config.FullNodeConfiguration
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.config.NodeConfigurationFromConfig import com.r3corda.node.services.config.NodeConfigurationFromConfig
import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.node.services.messaging.ArtemisMessagingComponent import com.r3corda.node.services.messaging.ArtemisMessagingComponent
import com.r3corda.node.services.messaging.ArtemisMessagingServer import com.r3corda.node.services.messaging.ArtemisMessagingServer
import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.InMemoryNetworkMapCache
@ -59,12 +59,12 @@ interface DriverDSLExposedInterface {
fun startNode(providedName: String? = null, advertisedServices: Set<ServiceType> = setOf()): Future<NodeInfo> fun startNode(providedName: String? = null, advertisedServices: Set<ServiceType> = setOf()): Future<NodeInfo>
/** /**
* Starts an [ArtemisMessagingClient]. * Starts an [NodeMessagingClient].
* *
* @param providedName name of the client, which will be used for creating its directory. * @param providedName name of the client, which will be used for creating its directory.
* @param serverAddress the artemis server to connect to, for example a [Node]. * @param serverAddress the artemis server to connect to, for example a [Node].
*/ */
fun startClient(providedName: String, serverAddress: HostAndPort): Future<ArtemisMessagingClient> fun startClient(providedName: String, serverAddress: HostAndPort): Future<NodeMessagingClient>
/** /**
* Starts a local [ArtemisMessagingServer] of which there may only be one. * Starts a local [ArtemisMessagingServer] of which there may only be one.
*/ */
@ -231,7 +231,7 @@ class DriverDSL(
class State { class State {
val registeredProcesses = LinkedList<Process>() val registeredProcesses = LinkedList<Process>()
val clients = LinkedList<ArtemisMessagingClient>() val clients = LinkedList<NodeMessagingClient>()
var localServer: ArtemisMessagingServer? = null var localServer: ArtemisMessagingServer? = null
} }
private val state = ThreadBox(State()) private val state = ThreadBox(State())
@ -324,7 +324,7 @@ class DriverDSL(
override fun startClient( override fun startClient(
providedName: String, providedName: String,
serverAddress: HostAndPort serverAddress: HostAndPort
): Future<ArtemisMessagingClient> { ): Future<NodeMessagingClient> {
val nodeConfiguration = NodeConfigurationFromConfig( val nodeConfiguration = NodeConfigurationFromConfig(
NodeConfiguration.loadConfig( NodeConfiguration.loadConfig(
@ -335,7 +335,7 @@ class DriverDSL(
) )
) )
) )
val client = ArtemisMessagingClient( val client = NodeMessagingClient(
Paths.get(baseDirectory, providedName), Paths.get(baseDirectory, providedName),
nodeConfiguration, nodeConfiguration,
serverHostPort = serverAddress, serverHostPort = serverAddress,
@ -344,7 +344,7 @@ class DriverDSL(
persistentInbox = false // Do not create a permanent queue for our transient UI identity persistentInbox = false // Do not create a permanent queue for our transient UI identity
) )
return Executors.newSingleThreadExecutor().submit(Callable<ArtemisMessagingClient> { return Executors.newSingleThreadExecutor().submit(Callable<NodeMessagingClient> {
client.configureWithDevSSLCertificate() client.configureWithDevSSLCertificate()
client.start() client.start()
thread { client.run() } thread { client.run() }
@ -386,7 +386,7 @@ class DriverDSL(
override fun start() { override fun start() {
startNetworkMapService() startNetworkMapService()
val networkMapClient = startClient("driver-$networkMapName-client", networkMapAddress).get() val networkMapClient = startClient("driver-$networkMapName-client", networkMapAddress).get()
val networkMapAddr = ArtemisMessagingClient.makeNetworkMapAddress(networkMapAddress) val networkMapAddr = NodeMessagingClient.makeNetworkMapAddress(networkMapAddress)
networkMapCache.addMapService(networkMapClient, networkMapAddr, true) networkMapCache.addMapService(networkMapClient, networkMapAddr, true)
networkMapNodeInfo = poll("network map cache for $networkMapName") { networkMapNodeInfo = poll("network map cache for $networkMapName") {
networkMapCache.partyNodes.forEach { networkMapCache.partyNodes.forEach {

View File

@ -10,7 +10,7 @@ import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.config.FullNodeConfiguration import com.r3corda.node.services.config.FullNodeConfiguration
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.node.services.messaging.ArtemisMessagingServer import com.r3corda.node.services.messaging.ArtemisMessagingServer
import com.r3corda.node.services.transactions.PersistentUniquenessProvider import com.r3corda.node.services.transactions.PersistentUniquenessProvider
import com.r3corda.node.servlets.AttachmentDownloadServlet import com.r3corda.node.servlets.AttachmentDownloadServlet
@ -118,9 +118,9 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
p2pAddr p2pAddr
}() }()
if (networkMapService != null) { if (networkMapService != null) {
return ArtemisMessagingClient(dir, configuration, serverAddr, services.storageService.myLegalIdentityKey.public, serverThread) return NodeMessagingClient(dir, configuration, serverAddr, services.storageService.myLegalIdentityKey.public, serverThread)
} else { } else {
return ArtemisMessagingClient(dir, configuration, serverAddr, null, serverThread) return NodeMessagingClient(dir, configuration, serverAddr, null, serverThread)
} }
} }
@ -134,7 +134,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
} }
// Start up the MQ client. // Start up the MQ client.
(net as ArtemisMessagingClient).apply { (net as NodeMessagingClient).apply {
configureWithDevSSLCertificate() // TODO: Client might need a separate certificate configureWithDevSSLCertificate() // TODO: Client might need a separate certificate
start() start()
} }
@ -266,7 +266,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
/** Starts a blocking event loop for message dispatch. */ /** Starts a blocking event loop for message dispatch. */
fun run() { fun run() {
(net as ArtemisMessagingClient).run() (net as NodeMessagingClient).run()
} }
// TODO: Do we really need setup? // TODO: Do we really need setup?

View File

@ -8,7 +8,7 @@ import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.ServiceType
import com.r3corda.node.internal.Node import com.r3corda.node.internal.Node
import com.r3corda.node.serialization.NodeClock import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.transactions.SimpleNotaryService import com.r3corda.node.services.transactions.SimpleNotaryService
import com.typesafe.config.Config import com.typesafe.config.Config
@ -116,7 +116,7 @@ class FullNodeConfiguration(conf: Config) : NodeConfiguration {
} }
} }
if (networkMapAddress == null) advertisedServices.add(NetworkMapService.Type) if (networkMapAddress == null) advertisedServices.add(NetworkMapService.Type)
val networkMapMessageAddress: SingleMessageRecipient? = if (networkMapAddress == null) null else ArtemisMessagingClient.makeNetworkMapAddress(networkMapAddress) val networkMapMessageAddress: SingleMessageRecipient? = if (networkMapAddress == null) null else NodeMessagingClient.makeNetworkMapAddress(networkMapAddress)
return Node(basedir.toAbsolutePath().normalize(), return Node(basedir.toAbsolutePath().normalize(),
artemisAddress, artemisAddress,
webAddress, webAddress,

View File

@ -23,11 +23,11 @@ import javax.annotation.concurrent.ThreadSafe
/** /**
* This class implements the [MessagingService] API using Apache Artemis, the successor to their ActiveMQ product. * This class implements the [MessagingService] API using Apache Artemis, the successor to their ActiveMQ product.
* Artemis is a message queue broker and here we run a client connecting to the specified broker instance * Artemis is a message queue broker and here we run a client connecting to the specified broker instance
* [ArtemisMessagingServer]. * [ArtemisMessagingServer]. It's primarily concerned with peer-to-peer messaging.
* *
* Message handlers are run on the provided [AffinityExecutor] synchronously, that is, the Artemis callback threads * Message handlers are run on the provided [AffinityExecutor] synchronously, that is, the Artemis callback threads
* are blocked until the handler is scheduled and completed. This allows backpressure to propagate from the given executor * are blocked until the handler is scheduled and completed. This allows backpressure to propagate from the given
* through into Artemis and from there, back through to senders. * executor through into Artemis and from there, back through to senders.
* *
* @param serverHostPort The address of the broker instance to connect to (might be running in the same process) * @param serverHostPort The address of the broker instance to connect to (might be running in the same process)
* @param myIdentity Either the public key to be used as the ArtemisMQ address and queue name for the node globally, or null to indicate * @param myIdentity Either the public key to be used as the ArtemisMQ address and queue name for the node globally, or null to indicate
@ -37,14 +37,14 @@ import javax.annotation.concurrent.ThreadSafe
* If false the inbox queue will be transient, which is appropriate for UI clients for example. * If false the inbox queue will be transient, which is appropriate for UI clients for example.
*/ */
@ThreadSafe @ThreadSafe
class ArtemisMessagingClient(directory: Path, class NodeMessagingClient(directory: Path,
config: NodeConfiguration, config: NodeConfiguration,
val serverHostPort: HostAndPort, val serverHostPort: HostAndPort,
val myIdentity: PublicKey?, val myIdentity: PublicKey?,
val executor: AffinityExecutor, val executor: AffinityExecutor,
val persistentInbox: Boolean = true) : ArtemisMessagingComponent(directory, config), MessagingServiceInternal { val persistentInbox: Boolean = true) : ArtemisMessagingComponent(directory, config), MessagingServiceInternal {
companion object { companion object {
val log = loggerFor<ArtemisMessagingClient>() val log = loggerFor<NodeMessagingClient>()
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic". // This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
// We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint // We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint

View File

@ -5,7 +5,7 @@ import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.messaging.Message import com.r3corda.core.messaging.Message
import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.node.services.messaging.ArtemisMessagingServer import com.r3corda.node.services.messaging.ArtemisMessagingServer
import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.InMemoryNetworkMapCache
import com.r3corda.node.utilities.AffinityExecutor import com.r3corda.node.utilities.AffinityExecutor
@ -36,7 +36,7 @@ class ArtemisMessagingTests {
override val trustStorePassword: String = "trustpass" override val trustStorePassword: String = "trustpass"
} }
var messagingClient: ArtemisMessagingClient? = null var messagingClient: NodeMessagingClient? = null
var messagingServer: ArtemisMessagingServer? = null var messagingServer: ArtemisMessagingServer? = null
val networkMapCache = InMemoryNetworkMapCache() val networkMapCache = InMemoryNetworkMapCache()
@ -103,8 +103,8 @@ class ArtemisMessagingTests {
assertNull(receivedMessages.poll(200, MILLISECONDS)) assertNull(receivedMessages.poll(200, MILLISECONDS))
} }
private fun createMessagingClient(server: HostAndPort = hostAndPort): ArtemisMessagingClient { private fun createMessagingClient(server: HostAndPort = hostAndPort): NodeMessagingClient {
return ArtemisMessagingClient(temporaryFolder.newFolder().toPath(), config, server, identity.public, AffinityExecutor.SAME_THREAD).apply { return NodeMessagingClient(temporaryFolder.newFolder().toPath(), config, server, identity.public, AffinityExecutor.SAME_THREAD).apply {
configureWithDevSSLCertificate() configureWithDevSSLCertificate()
messagingClient = this messagingClient = this
} }

View File

@ -24,7 +24,7 @@ import com.r3corda.node.internal.Node
import com.r3corda.demos.api.NodeInterestRates import com.r3corda.demos.api.NodeInterestRates
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.config.NodeConfigurationFromConfig import com.r3corda.node.services.config.NodeConfigurationFromConfig
import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.transactions.SimpleNotaryService import com.r3corda.node.services.transactions.SimpleNotaryService
import com.r3corda.testing.node.MockNetwork import com.r3corda.testing.node.MockNetwork
@ -388,7 +388,7 @@ fun runUploadRates(cliParams: CliParams.UploadRates) = runUploadRates(cliParams.
private fun createRecipient(addr: String): SingleMessageRecipient { private fun createRecipient(addr: String): SingleMessageRecipient {
val hostAndPort = HostAndPort.fromString(addr).withDefaultPort(Node.DEFAULT_PORT) val hostAndPort = HostAndPort.fromString(addr).withDefaultPort(Node.DEFAULT_PORT)
return ArtemisMessagingClient.makeNetworkMapAddress(hostAndPort) return NodeMessagingClient.makeNetworkMapAddress(hostAndPort)
} }
private fun startNode(params: CliParams.RunNode, networkMap: SingleMessageRecipient): Node { private fun startNode(params: CliParams.RunNode, networkMap: SingleMessageRecipient): Node {

View File

@ -11,7 +11,7 @@ import com.r3corda.core.utilities.LogHelper
import com.r3corda.demos.api.NodeInterestRates import com.r3corda.demos.api.NodeInterestRates
import com.r3corda.node.internal.Node import com.r3corda.node.internal.Node
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.protocols.RatesFixProtocol import com.r3corda.protocols.RatesFixProtocol
import com.r3corda.testing.node.makeTestDataSourceProperties import com.r3corda.testing.node.makeTestDataSourceProperties
import joptsimple.OptionParser import joptsimple.OptionParser
@ -49,7 +49,7 @@ fun main(args: Array<String>) {
LogHelper.setLevel("+RatesFixDemo", "-org.apache.activemq") LogHelper.setLevel("+RatesFixDemo", "-org.apache.activemq")
val dir = Paths.get(options.valueOf(dirArg)) val dir = Paths.get(options.valueOf(dirArg))
val networkMapAddr = ArtemisMessagingClient.makeNetworkMapAddress(HostAndPort.fromString(options.valueOf(networkMapAddrArg))) val networkMapAddr = NodeMessagingClient.makeNetworkMapAddress(HostAndPort.fromString(options.valueOf(networkMapAddrArg)))
val fixOf: FixOf = NodeInterestRates.parseFixOf(options.valueOf(fixOfArg)) val fixOf: FixOf = NodeInterestRates.parseFixOf(options.valueOf(fixOfArg))
val expectedRate = BigDecimal(options.valueOf(expectedRateArg)) val expectedRate = BigDecimal(options.valueOf(expectedRateArg))

View File

@ -24,7 +24,7 @@ import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.node.internal.Node import com.r3corda.node.internal.Node
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.config.NodeConfigurationFromConfig import com.r3corda.node.services.config.NodeConfigurationFromConfig
import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.persistence.NodeAttachmentService import com.r3corda.node.services.persistence.NodeAttachmentService
import com.r3corda.node.services.transactions.SimpleNotaryService import com.r3corda.node.services.transactions.SimpleNotaryService
@ -135,7 +135,7 @@ fun main(args: Array<String>) {
null null
} else { } else {
advertisedServices = emptySet() advertisedServices = emptySet()
ArtemisMessagingClient.makeNetworkMapAddress(theirNetAddr) NodeMessagingClient.makeNetworkMapAddress(theirNetAddr)
} }
// And now construct then start the node object. It takes a little while. // And now construct then start the node object. It takes a little while.