From 089ba2cb69ed4951991ae3208e3cbf3e06fe3c66 Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Tue, 23 Aug 2016 10:47:51 +0100 Subject: [PATCH] Only NetworkMapServer addresses can be publicly manufactured. Use identity publick key as addressing, with only bridges using the HostAndPort information Fixup after rebase and fix issue with checking previous deployment of bridges Correct comments on ArtemisMessagingClient constructor Fixup rates fix demo Get rid of when statements Make NetworkMapCache send modify as well as add//remove events. Make inboxes for nodes persistent. Suppress warnings Fix message acknowledgement so that it actually consumes messages properly. Change queueName to SimpleString to stop lots of wasted conversions Get rid of spurious import Tidy up and add comments Update to include comments on PR Remove unnecessary import --- .../core/node/services/NetworkMapCache.kt | 4 +- .../kotlin/com/r3corda/node/driver/Driver.kt | 24 ++-- .../com/r3corda/node/driver/NodeRunner.kt | 2 +- .../com/r3corda/node/internal/AbstractNode.kt | 2 +- .../kotlin/com/r3corda/node/internal/Node.kt | 12 +- .../node/services/config/NodeConfiguration.kt | 2 +- .../messaging/ArtemisMessagingClient.kt | 78 +++++----- .../messaging/ArtemisMessagingComponent.kt | 68 ++++++++- .../messaging/ArtemisMessagingServer.kt | 133 ++++++++++++++---- .../network/InMemoryNetworkMapCache.kt | 12 +- .../com/r3corda/node/driver/DriverTests.kt | 9 +- .../node/services/ArtemisMessagingTests.kt | 12 +- scripts/get-rate-fix.sh | 2 +- src/main/kotlin/com/r3corda/demos/IRSDemo.kt | 2 +- .../kotlin/com/r3corda/demos/RateFixDemo.kt | 18 +-- .../kotlin/com/r3corda/demos/TraderDemo.kt | 2 +- 16 files changed, 276 insertions(+), 106 deletions(-) diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/NetworkMapCache.kt b/core/src/main/kotlin/com/r3corda/core/node/services/NetworkMapCache.kt index d909a8eb79..001ef52f50 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/services/NetworkMapCache.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/services/NetworkMapCache.kt @@ -20,8 +20,8 @@ interface NetworkMapCache { val logger = LoggerFactory.getLogger(NetworkMapCache::class.java) } - enum class MapChangeType { Added, Removed } - data class MapChange(val node: NodeInfo, val type: MapChangeType ) + enum class MapChangeType { Added, Removed, Modified } + data class MapChange(val node: NodeInfo, val prevNodeInfo: NodeInfo?, val type: MapChangeType ) /** A list of nodes that advertise a network map service */ val networkMapNodes: List diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index 6db3a1c656..8bccbc1540 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -63,9 +63,8 @@ interface DriverDSLExposedInterface { * * @param providedName name of the client, which will be used for creating its directory. * @param serverAddress the artemis server to connect to, for example a [Node]. - * @param clientAddress the address of the client (this is not bound by the client!), defaults to [serverAddress] if null. */ - fun startClient(providedName: String, serverAddress: HostAndPort, clientAddress: HostAndPort?): Future + fun startClient(providedName: String, serverAddress: HostAndPort): Future /** * Starts a local [ArtemisMessagingServer] of which there may only be one. */ @@ -75,13 +74,12 @@ interface DriverDSLExposedInterface { } fun DriverDSLExposedInterface.startClient(localServer: ArtemisMessagingServer) = - startClient("driver-local-server-client", localServer.myHostPort, localServer.myHostPort) + startClient("driver-local-server-client", localServer.myHostPort) fun DriverDSLExposedInterface.startClient(remoteNodeInfo: NodeInfo, providedName: String? = null) = startClient( providedName = providedName ?: "${remoteNodeInfo.identity.name}-client", - serverAddress = (remoteNodeInfo.address as ArtemisMessagingComponent.Address).hostAndPort, - clientAddress = null + serverAddress = ArtemisMessagingComponent.toHostAndPort(remoteNodeInfo.address) ) interface DriverDSLInternalInterface : DriverDSLExposedInterface { @@ -224,6 +222,7 @@ class DriverDSL( private val networkMapName = "NetworkMapService" private val networkMapAddress = portAllocation.nextHostAndPort() private var networkMapNodeInfo: NodeInfo? = null + private val identity = generateKeyPair() class State { val registeredProcesses = LinkedList() @@ -322,8 +321,7 @@ class DriverDSL( override fun startClient( providedName: String, - serverAddress: HostAndPort, - clientAddress: HostAndPort? + serverAddress: HostAndPort ): Future { val nodeConfiguration = NodeConfigurationFromConfig( @@ -339,8 +337,9 @@ class DriverDSL( Paths.get(baseDirectory, providedName), nodeConfiguration, serverHostPort = serverAddress, - myHostPort = clientAddress ?: serverAddress, - executor = AffinityExecutor.ServiceAffinityExecutor(providedName, 1) + myIdentity = identity.public, + executor = AffinityExecutor.ServiceAffinityExecutor(providedName, 1), + persistentInbox = false // Do not create a permanent queue for our transient UI identity ) return Executors.newSingleThreadExecutor().submit(Callable { @@ -368,7 +367,8 @@ class DriverDSL( val server = ArtemisMessagingServer( Paths.get(baseDirectory, name), config, - portAllocation.nextHostAndPort() + portAllocation.nextHostAndPort(), + networkMapCache ) return Executors.newSingleThreadExecutor().submit(Callable { server.configureWithDevSSLCertificate() @@ -383,11 +383,11 @@ class DriverDSL( override fun start() { startNetworkMapService() - val networkMapClient = startClient("driver-$networkMapName-client", networkMapAddress, portAllocation.nextHostAndPort()).get() + val networkMapClient = startClient("driver-$networkMapName-client", networkMapAddress).get() // We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from // the network map service itself. val fakeNodeInfo = NodeInfo( - address = ArtemisMessagingClient.makeRecipient(networkMapAddress), + address = ArtemisMessagingClient.makeNetworkMapAddress(networkMapAddress), identity = Party( name = networkMapName, owningKey = generateKeyPair().public diff --git a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt index 8a7904cce4..1dad9bebb2 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt @@ -35,7 +35,7 @@ class NodeRunner { val networkMapNodeInfo = if (networkMapName != null && networkMapPublicKey != null && networkMapAddress != null) { NodeInfo( - address = ArtemisMessagingClient.makeRecipient(networkMapAddress), + address = ArtemisMessagingClient.makeNetworkMapAddress(networkMapAddress), identity = Party( name = networkMapName, owningKey = networkMapPublicKey diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index e5e949f8a2..89a899e264 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -164,8 +164,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, val storageServices = initialiseStorageService(dir) storage = storageServices.first checkpointStorage = storageServices.second - net = makeMessagingService() netMapCache = InMemoryNetworkMapCache() + net = makeMessagingService() wallet = makeWalletService() identity = makeIdentityService() diff --git a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt index 245811f0fb..0b7828e04e 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt @@ -112,11 +112,16 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, override fun makeMessagingService(): MessagingServiceInternal { val serverAddr = messagingServerAddr ?: { - messageBroker = ArtemisMessagingServer(dir, configuration, p2pAddr) + messageBroker = ArtemisMessagingServer(dir, configuration, p2pAddr, services.networkMapCache) p2pAddr }() - - return ArtemisMessagingClient(dir, configuration, serverAddr, p2pAddr, serverThread) + if (networkMapService != null) { + return ArtemisMessagingClient(dir, configuration, serverAddr, services.storageService.myLegalIdentityKey.public, serverThread) + } + else + { + return ArtemisMessagingClient(dir, configuration, serverAddr, null, serverThread) + } } override fun startMessagingService() { @@ -124,6 +129,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, messageBroker?.apply { configureWithDevSSLCertificate() // TODO: Create proper certificate provisioning process start() + bridgeToNetworkMapService(networkMapService) } // Start up the MQ client. diff --git a/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt index ff91468a38..71c7361b2c 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt @@ -112,7 +112,7 @@ class FullNodeConfiguration(conf: Config) : NodeConfiguration { val clock: Clock = NodeClock() fun createNode(): Node { - val networkMapTarget = ArtemisMessagingClient.makeRecipient(mapService.address) + val networkMapTarget = ArtemisMessagingClient.makeNetworkMapAddress(mapService.address) val advertisedServices = mutableSetOf() if (mapService.hostServiceLocally) advertisedServices.add(NetworkMapService.Type) if (hostNotaryServiceLocally) advertisedServices.add(SimpleNotaryService.Type) diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt index 658b1aa2e2..aecefec74f 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt @@ -5,7 +5,6 @@ import com.r3corda.core.ThreadBox import com.r3corda.core.messaging.* import com.r3corda.core.serialization.opaque import com.r3corda.core.utilities.loggerFor -import com.r3corda.node.internal.Node import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.utilities.AffinityExecutor @@ -14,8 +13,8 @@ import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.* import java.nio.file.FileSystems import java.nio.file.Path +import java.security.PublicKey import java.time.Instant -import java.util.* import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CountDownLatch import java.util.concurrent.Executor @@ -31,14 +30,19 @@ import javax.annotation.concurrent.ThreadSafe * through into Artemis and from there, back through to senders. * * @param serverHostPort The address of the broker instance to connect to (might be running in the same process) - * @param myHostPort What host and port to use as an address for incoming messages + * @param myIdentity Either the public key to be used as the ArtemisMQ address and queue name for the node globally, or null to indicate + * that this is a NetworkMapService node which will be bound globally to the name "networkmap" + * @param executor An executor to run received message tasks upon. + * @param persistentInbox If true the inbox will be created persistent if not already created. + * If false the inbox queue will be transient, which is appropriate for UI clients for example. */ @ThreadSafe class ArtemisMessagingClient(directory: Path, config: NodeConfiguration, val serverHostPort: HostAndPort, - val myHostPort: HostAndPort, - val executor: AffinityExecutor) : ArtemisMessagingComponent(directory, config), MessagingServiceInternal { + val myIdentity: PublicKey?, + val executor: AffinityExecutor, + val persistentInbox: Boolean = true) : ArtemisMessagingComponent(directory, config), MessagingServiceInternal { companion object { val log = loggerFor() @@ -50,17 +54,19 @@ class ArtemisMessagingClient(directory: Path, val SESSION_ID_PROPERTY = "session-id" - /** Temp helper until network map is established. */ - fun makeRecipient(hostAndPort: HostAndPort): SingleMessageRecipient = Address(hostAndPort) - - fun makeRecipient(hostname: String) = makeRecipient(toHostAndPort(hostname)) - fun toHostAndPort(hostname: String) = HostAndPort.fromString(hostname).withDefaultPort(Node.DEFAULT_PORT) + /** + * This should be the only way to generate an ArtemisAddress and that only of the remote NetworkMapService node. + * All other addresses come from the NetworkMapCache, or myAddress below. + * The node will populate with their own identity based address when they register with the NetworkMapService. + */ + fun makeNetworkMapAddress(hostAndPort: HostAndPort): SingleMessageRecipient = NetworkMapAddress(hostAndPort) } private class InnerState { var started = false var running = false - val producers = HashMap() + val knownQueues = mutableSetOf() + var producer: ClientProducer? = null var consumer: ClientConsumer? = null var session: ClientSession? = null var clientFactory: ClientSessionFactory? = null @@ -71,7 +77,10 @@ class ArtemisMessagingClient(directory: Path, val topicSession: TopicSession, val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration - override val myAddress: SingleMessageRecipient = Address(myHostPort) + /** + * Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache. + */ + override val myAddress: SingleMessageRecipient = if (myIdentity != null) NodeAddress(myIdentity, serverHostPort) else NetworkMapAddress(serverHostPort) private val state = ThreadBox(InnerState()) private val handlers = CopyOnWriteArrayList() @@ -94,14 +103,20 @@ class ArtemisMessagingClient(directory: Path, val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport) clientFactory = locator.createSessionFactory() - // Create a queue on which to receive messages and set up the handler. - val session = clientFactory!!.createSession() + // Create a session and configure to commit manually after each acknowledge. (N.B. ackBatchSize is in Bytes!!!) + val session = clientFactory!!.createSession(true, true, 1) this.session = session - val address = myHostPort.toString() - val queueName = myHostPort.toString() - session.createQueue(address, queueName, false) + // Create a queue on which to receive messages and set up the handler. + val queueName = toQueueName(myAddress) + val query = session.queueQuery(queueName) + if (!query.isExists) { + session.createQueue(queueName, queueName, persistentInbox) + } + knownQueues.add(queueName) consumer = session.createConsumer(queueName) + producer = session.createProducer() + session.start() } } @@ -166,6 +181,7 @@ class ArtemisMessagingClient(directory: Path, } val topic = message.getStringProperty(TOPIC_PROPERTY) val sessionID = message.getLongProperty(SESSION_ID_PROPERTY) + log.info("received message from: ${message.address} topic: $topic sessionID: $sessionID") val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) } @@ -243,9 +259,10 @@ class ArtemisMessagingClient(directory: Path, shutdownLatch.await() } state.locked { - for (producer in producers.values) producer.close() - producers.clear() - + producer?.close() + producer = null + // Ensure any trailing messages are committed to the journal + session!!.commit() // Closing the factory closes all the sessions it produced as well. clientFactory!!.close() clientFactory = null @@ -253,9 +270,7 @@ class ArtemisMessagingClient(directory: Path, } override fun send(message: Message, target: MessageRecipients) { - if (target !is Address) - TODO("Only simple sends to single recipients are currently implemented") - + val queueName = toQueueName(target) state.locked { val artemisMessage = session!!.createMessage(true).apply { val sessionID = message.topicSession.sessionID @@ -264,21 +279,20 @@ class ArtemisMessagingClient(directory: Path, writeBodyBufferBytes(message.data) } - val producer = producers.getOrPut(target) { - if (target != myAddress) - maybeCreateQueue(target.hostAndPort) - session!!.createProducer(target.hostAndPort.toString()) + if (knownQueues.add(queueName)) { + maybeCreateQueue(queueName) } - producer.send(artemisMessage) + log.info("send to: $queueName topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID}") + producer!!.send(queueName, artemisMessage) } } - private fun maybeCreateQueue(hostAndPort: HostAndPort) { + private fun maybeCreateQueue(queueName: SimpleString) { state.alreadyLocked { - val name = hostAndPort.toString() - val queueQuery = session!!.queueQuery(SimpleString(name)) + val queueQuery = session!!.queueQuery(queueName) if (!queueQuery.isExists) { - session!!.createQueue(name, name, true /* durable */) + log.info("create client queue $queueName") + session!!.createQueue(queueName, queueName, true /* durable */) } } } diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingComponent.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingComponent.kt index 39231d4e09..dda497d869 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingComponent.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingComponent.kt @@ -2,15 +2,20 @@ package com.r3corda.node.services.messaging import com.google.common.net.HostAndPort import com.r3corda.core.crypto.X509Utilities +import com.r3corda.core.crypto.parsePublicKeyBase58 +import com.r3corda.core.crypto.toBase58String +import com.r3corda.core.messaging.MessageRecipients import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.node.services.config.NodeConfiguration +import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.TransportConfiguration import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants import java.nio.file.Files import java.nio.file.Path +import java.security.PublicKey /** * The base class for Artemis services that defines shared data structures and transport configuration @@ -22,8 +27,69 @@ abstract class ArtemisMessagingComponent(val directory: Path, val config: NodeCo private val keyStorePath = directory.resolve("certificates").resolve("sslkeystore.jks") private val trustStorePath = directory.resolve("certificates").resolve("truststore.jks") + + companion object { + const val PEERS_PREFIX = "peers." + + @JvmStatic + protected val NETWORK_MAP_ADDRESS = SimpleString(PEERS_PREFIX +"networkmap") + + /** + * Assuming the passed in target address is actually an ArtemisAddress will extract the host and port of the node. This should + * only be used in unit tests and the internals of the messaging services to keep addressing opaque for the future. + * N.B. Marked as JvmStatic to allow use in the inherited classes. + */ + @JvmStatic + internal fun toHostAndPort(target: MessageRecipients): HostAndPort { + val addr = target as? ArtemisMessagingComponent.ArtemisAddress ?: throw IllegalArgumentException("Not an Artemis address") + return addr.hostAndPort + } + + /** + * Assuming the passed in target address is actually an ArtemisAddress will extract the queue name used. + * For now the queue name is the Base58 version of the node's identity. + * This should only be used in the internals of the messaging services to keep addressing opaque for the future. + * N.B. Marked as JvmStatic to allow use in the inherited classes. + */ + @JvmStatic + protected fun toQueueName(target: MessageRecipients): SimpleString { + val addr = target as? ArtemisMessagingComponent.ArtemisAddress ?: throw IllegalArgumentException("Not an Artemis address") + return addr.queueName + + } + } + + protected interface ArtemisAddress { + val queueName: SimpleString + val hostAndPort: HostAndPort + } + + protected data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress { + override val queueName: SimpleString = NETWORK_MAP_ADDRESS + } + // In future: can contain onion routing info, etc. - data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient + protected data class NodeAddress(val identity: PublicKey, override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress { + override val queueName: SimpleString by lazy { SimpleString(PEERS_PREFIX+identity.toBase58String()) } + + override fun toString(): String { + return "NodeAddress(identity = $queueName, $hostAndPort" + } + } + + protected fun tryParseKeyFromQueueName(queueName: SimpleString): PublicKey? { + val name = queueName.toString() + if(!name.startsWith(PEERS_PREFIX)) { + return null + } + val keyCode = name.substring(PEERS_PREFIX.length) + return try { + parsePublicKeyBase58(keyCode) + } catch (ex: Exception) { + null + } + + } protected enum class ConnectionDirection { INBOUND, OUTBOUND } diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingServer.kt index 9012cf5141..08542ca89a 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingServer.kt @@ -3,9 +3,11 @@ package com.r3corda.node.services.messaging import com.google.common.net.HostAndPort import com.r3corda.core.ThreadBox import com.r3corda.core.crypto.newSecureRandom -import com.r3corda.core.messaging.SingleMessageRecipient +import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.utilities.loggerFor import com.r3corda.node.services.config.NodeConfiguration +import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.core.config.BridgeConfiguration import org.apache.activemq.artemis.core.config.Configuration import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl @@ -15,6 +17,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule +import rx.Subscription import java.math.BigInteger import java.nio.file.Path import javax.annotation.concurrent.ThreadSafe @@ -35,7 +38,8 @@ import javax.annotation.concurrent.ThreadSafe @ThreadSafe class ArtemisMessagingServer(directory: Path, config: NodeConfiguration, - val myHostPort: HostAndPort) : ArtemisMessagingComponent(directory, config) { + val myHostPort: HostAndPort, + val networkMapCache: NetworkMapCache) : ArtemisMessagingComponent(directory, config) { companion object { val log = loggerFor() } @@ -44,22 +48,73 @@ class ArtemisMessagingServer(directory: Path, var running = false } - val myAddress: SingleMessageRecipient = Address(myHostPort) private val mutex = ThreadBox(InnerState()) private lateinit var activeMQServer: ActiveMQServer + private var networkChangeHandle: Subscription? = null fun start() = mutex.locked { if (!running) { configureAndStartServer() + networkChangeHandle = networkMapCache.changed.subscribe { onNetworkChange(it) } running = true } } fun stop() = mutex.locked { + networkChangeHandle?.unsubscribe() + networkChangeHandle = null activeMQServer.stop() running = false } + fun bridgeToNetworkMapService(networkMapService: NodeInfo?) { + if ((networkMapService != null) && (networkMapService.address is NetworkMapAddress)) { + val query = activeMQServer.queueQuery(NETWORK_MAP_ADDRESS) + if (!query.isExists) { + activeMQServer.createQueue(NETWORK_MAP_ADDRESS, NETWORK_MAP_ADDRESS, null, true, false) + } + + maybeDeployBridgeForAddress(NETWORK_MAP_ADDRESS, networkMapService) + } + } + + private fun onNetworkChange(change: NetworkMapCache.MapChange) { + val address = change.node.address + if (address is ArtemisMessagingComponent.ArtemisAddress) { + val queueName = address.queueName + when (change.type) { + NetworkMapCache.MapChangeType.Added -> { + val query = activeMQServer.queueQuery(queueName) + if (query.isExists) { + // Queue exists so now wire up bridge + maybeDeployBridgeForAddress(queueName, change.node) + } + } + + NetworkMapCache.MapChangeType.Modified -> { + (change.prevNodeInfo?.address as? ArtemisMessagingComponent.ArtemisAddress)?.let { + // remove any previous possibly different bridge + maybeDestroyBridge(it.queueName) + } + val query = activeMQServer.queueQuery(queueName) + if (query.isExists) { + // Deploy new bridge + maybeDeployBridgeForAddress(queueName, change.node) + } + } + + NetworkMapCache.MapChangeType.Removed -> { + (change.prevNodeInfo?.address as? ArtemisMessagingComponent.ArtemisAddress)?.let { + // Remove old bridge + maybeDestroyBridge(it.queueName) + } + // just in case of NetworkMapCache version issues + maybeDestroyBridge(queueName) + } + } + } + } + private fun configureAndStartServer() { val config = createArtemisConfig(directory, myHostPort).apply { securityRoles = mapOf( @@ -74,8 +129,16 @@ class ArtemisMessagingServer(directory: Path, registerActivationFailureListener { exception -> throw exception } // Deploy bridge for a newly created queue registerPostQueueCreationCallback { queueName -> - log.trace("Queue created: $queueName") - maybeDeployBridgeForAddress(queueName.toString()) + log.info("Queue created: $queueName") + if (queueName != NETWORK_MAP_ADDRESS) { + val identity = tryParseKeyFromQueueName(queueName) + if (identity != null) { + val nodeInfo = networkMapCache.getNodeByPublicKey(identity) + if (nodeInfo != null) { + maybeDeployBridgeForAddress(queueName, nodeInfo) + } + } + } } } activeMQServer.start() @@ -102,35 +165,53 @@ class ArtemisMessagingServer(directory: Path, return ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, securityConfig) } + fun connectorExists(hostAndPort: HostAndPort) = hostAndPort.toString() in activeMQServer.configuration.connectorConfigurations + + fun addConnector(hostAndPort: HostAndPort) = activeMQServer.configuration.addConnectorConfiguration( + hostAndPort.toString(), + tcpTransport( + ConnectionDirection.OUTBOUND, + hostAndPort.hostText, + hostAndPort.port + ) + ) + + fun bridgeExists(name: SimpleString) = activeMQServer.clusterManager.bridges.containsKey(name.toString()) + + fun deployBridge(hostAndPort: HostAndPort, name: SimpleString) = activeMQServer.deployBridge(BridgeConfiguration().apply { + val nameStr = name.toString() + setName(nameStr) + queueName = nameStr + forwardingAddress = nameStr + staticConnectors = listOf(hostAndPort.toString()) + confirmationWindowSize = 100000 // a guess + }) + /** * For every queue created we need to have a bridge deployed in case the address of the queue * is that of a remote party */ - private fun maybeDeployBridgeForAddress(name: String) { - val hostAndPort = HostAndPort.fromString(name) + private fun maybeDeployBridgeForAddress(name: SimpleString, nodeInfo: NodeInfo) { + val hostAndPort = toHostAndPort(nodeInfo.address) - fun connectorExists() = name in activeMQServer.configuration.connectorConfigurations + if (hostAndPort == myHostPort) { + return + } - fun addConnector() = activeMQServer.configuration.addConnectorConfiguration( - name, - tcpTransport( - ConnectionDirection.OUTBOUND, - hostAndPort.hostText, - hostAndPort.port - ) - ) + if (!connectorExists(hostAndPort)) { + log.info("add connector $hostAndPort") + addConnector(hostAndPort) + } - fun deployBridge() = activeMQServer.deployBridge(BridgeConfiguration().apply { - setName(name) - queueName = name - forwardingAddress = name - staticConnectors = listOf(name) - confirmationWindowSize = 100000 // a guess - }) + if (!bridgeExists(name)) { + log.info("add bridge $hostAndPort $name") + deployBridge(hostAndPort, name) + } + } - if (!connectorExists()) { - addConnector() - deployBridge() + private fun maybeDestroyBridge(name: SimpleString) { + if (bridgeExists(name)) { + activeMQServer.destroyBridge(name.toString()) } } diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt index 52c5e95944..b76dd04229 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt @@ -98,13 +98,17 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach } override fun addNode(node: NodeInfo) { - registeredNodes[node.identity] = node - _changed.onNext(MapChange(node, MapChangeType.Added)) + val oldValue = registeredNodes.put(node.identity, node) + if (oldValue == null) { + _changed.onNext(MapChange(node, oldValue, MapChangeType.Added)) + } else if(oldValue != node) { + _changed.onNext(MapChange(node, oldValue, MapChangeType.Modified)) + } } override fun removeNode(node: NodeInfo) { - registeredNodes.remove(node.identity) - _changed.onNext(MapChange(node, MapChangeType.Removed)) + val oldValue = registeredNodes.remove(node.identity) + _changed.onNext(MapChange(node, oldValue, MapChangeType.Removed)) } /** diff --git a/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt index 9e88ac3967..4e5b421e1b 100644 --- a/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt @@ -4,7 +4,6 @@ import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.node.services.api.RegulatorService import com.r3corda.node.services.messaging.ArtemisMessagingComponent -import com.r3corda.node.services.transactions.NotaryService import com.r3corda.node.services.transactions.SimpleNotaryService import org.junit.Test @@ -12,7 +11,7 @@ import org.junit.Test class DriverTests { companion object { fun nodeMustBeUp(networkMapCache: NetworkMapCache, nodeInfo: NodeInfo, nodeName: String) { - val address = nodeInfo.address as ArtemisMessagingComponent.Address + val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address) // Check that the node is registered in the network map poll("network map cache for $nodeName") { networkMapCache.get().firstOrNull { @@ -20,13 +19,13 @@ class DriverTests { } } // Check that the port is bound - addressMustBeBound(address.hostAndPort) + addressMustBeBound(hostAndPort) } fun nodeMustBeDown(nodeInfo: NodeInfo) { - val address = nodeInfo.address as ArtemisMessagingComponent.Address + val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address) // Check that the port is bound - addressMustNotBeBound(address.hostAndPort) + addressMustNotBeBound(hostAndPort) } } diff --git a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt index aab6e5587f..33f15192f3 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt @@ -1,12 +1,14 @@ package com.r3corda.node.services import com.google.common.net.HostAndPort +import com.r3corda.core.crypto.generateKeyPair import com.r3corda.core.messaging.Message import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.testing.freeLocalHostAndPort import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.messaging.ArtemisMessagingServer +import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.utilities.AffinityExecutor import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.After @@ -25,6 +27,7 @@ class ArtemisMessagingTests { val hostAndPort = freeLocalHostAndPort() val topic = "platform.self" + val identity = generateKeyPair() val config = object : NodeConfiguration { override val myLegalName: String = "me" override val exportJMXto: String = "" @@ -36,6 +39,8 @@ class ArtemisMessagingTests { var messagingClient: ArtemisMessagingClient? = null var messagingServer: ArtemisMessagingServer? = null + val networkMapCache = InMemoryNetworkMapCache() + @After fun cleanUp() { messagingClient?.stop() @@ -98,16 +103,15 @@ class ArtemisMessagingTests { assertNull(receivedMessages.poll(200, MILLISECONDS)) } - private fun createMessagingClient(server: HostAndPort = hostAndPort, - local: HostAndPort = hostAndPort): ArtemisMessagingClient { - return ArtemisMessagingClient(temporaryFolder.newFolder().toPath(), config, server, local, AffinityExecutor.SAME_THREAD).apply { + private fun createMessagingClient(server: HostAndPort = hostAndPort): ArtemisMessagingClient { + return ArtemisMessagingClient(temporaryFolder.newFolder().toPath(), config, server, identity.public, AffinityExecutor.SAME_THREAD).apply { configureWithDevSSLCertificate() messagingClient = this } } private fun createMessagingServer(local: HostAndPort = hostAndPort): ArtemisMessagingServer { - return ArtemisMessagingServer(temporaryFolder.newFolder().toPath(), config, local).apply { + return ArtemisMessagingServer(temporaryFolder.newFolder().toPath(), config, local, networkMapCache).apply { configureWithDevSSLCertificate() messagingServer = this } diff --git a/scripts/get-rate-fix.sh b/scripts/get-rate-fix.sh index 53e30facd0..5bda2d2391 100755 --- a/scripts/get-rate-fix.sh +++ b/scripts/get-rate-fix.sh @@ -21,4 +21,4 @@ fi # Upload the rates to the buyer node curl -F rates=@scripts/example.rates.txt http://localhost:31338/upload/interest-rates -$bin --network-address=localhost:31300 --directory=build/trader-demo/rates-fix --network-map=localhost:31337 --network-map-identity-file=build/trader-demo/buyer/identity-public --oracle=localhost --oracle-identity-file=build/trader-demo/buyer/identity-public \ No newline at end of file +$bin --network-address=localhost:31300 --directory=build/trader-demo/rates-fix --network-map=localhost:31337 --network-map-identity-file=build/trader-demo/buyer/identity-public \ No newline at end of file diff --git a/src/main/kotlin/com/r3corda/demos/IRSDemo.kt b/src/main/kotlin/com/r3corda/demos/IRSDemo.kt index 0d72427503..3138b98073 100644 --- a/src/main/kotlin/com/r3corda/demos/IRSDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/IRSDemo.kt @@ -374,7 +374,7 @@ private fun runTrade(cliParams: CliParams.Trade): Int { private fun createRecipient(addr: String): SingleMessageRecipient { val hostAndPort = HostAndPort.fromString(addr).withDefaultPort(Node.DEFAULT_PORT) - return ArtemisMessagingClient.makeRecipient(hostAndPort) + return ArtemisMessagingClient.makeNetworkMapAddress(hostAndPort) } private fun startNode(params: CliParams.RunNode, networkMap: SingleMessageRecipient): Node { diff --git a/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt b/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt index 139661a05b..2010f4f24a 100644 --- a/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt @@ -4,10 +4,10 @@ import com.google.common.net.HostAndPort import com.r3corda.contracts.asset.Cash import com.r3corda.core.contracts.* import com.r3corda.core.crypto.Party -import com.r3corda.core.hours import com.r3corda.core.logElapsedTime import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.ServiceType +import com.r3corda.core.node.services.testing.makeTestDataSourceProperties import com.r3corda.core.serialization.deserialize import com.r3corda.core.utilities.Emoji import com.r3corda.core.utilities.LogHelper @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory import java.math.BigDecimal import java.nio.file.Files import java.nio.file.Paths +import java.util.* import kotlin.system.exitProcess private val log: Logger = LoggerFactory.getLogger("RatesFixDemo") @@ -38,8 +39,6 @@ fun main(args: Array) { val dirArg = parser.accepts("directory").withRequiredArg().defaultsTo("rate-fix-demo-data") val networkMapAddrArg = parser.accepts("network-map").withRequiredArg().required() val networkMapIdentityArg = parser.accepts("network-map-identity-file").withRequiredArg().required() - val oracleAddrArg = parser.accepts("oracle").withRequiredArg().required() - val oracleIdentityArg = parser.accepts("oracle-identity-file").withRequiredArg().required() val fixOfArg = parser.accepts("fix-of").withRequiredArg().defaultsTo("ICE LIBOR 2016-03-16 1M") val expectedRateArg = parser.accepts("expected-rate").withRequiredArg().defaultsTo("0.67") @@ -56,28 +55,24 @@ fun main(args: Array) { LogHelper.setLevel("+RatesFixDemo", "-org.apache.activemq") val dir = Paths.get(options.valueOf(dirArg)) - val networkMapAddr = ArtemisMessagingClient.makeRecipient(options.valueOf(networkMapAddrArg)) + val networkMapAddr = ArtemisMessagingClient.makeNetworkMapAddress(HostAndPort.fromString(options.valueOf(networkMapAddrArg))) val networkMapIdentity = Files.readAllBytes(Paths.get(options.valueOf(networkMapIdentityArg))).deserialize() val networkMapAddress = NodeInfo(networkMapAddr, networkMapIdentity, setOf(NetworkMapService.Type, NotaryService.Type)) - // Load oracle stuff (in lieu of having a network map service) - val oracleAddr = ArtemisMessagingClient.makeRecipient(options.valueOf(oracleAddrArg)) - val oracleIdentity = Files.readAllBytes(Paths.get(options.valueOf(oracleIdentityArg))).deserialize() - val oracleNode = NodeInfo(oracleAddr, oracleIdentity) - val fixOf: FixOf = NodeInterestRates.parseFixOf(options.valueOf(fixOfArg)) val expectedRate = BigDecimal(options.valueOf(expectedRateArg)) val rateTolerance = BigDecimal(options.valueOf(rateToleranceArg)) // Bring up node. val advertisedServices: Set = emptySet() - val myNetAddr = ArtemisMessagingClient.toHostAndPort(options.valueOf(networkAddressArg)) + val myNetAddr = HostAndPort.fromString(options.valueOf(networkAddressArg)) val config = object : NodeConfiguration { override val myLegalName: String = "Rate fix demo node" override val exportJMXto: String = "http" override val nearestCity: String = "Atlantis" override val keyStorePassword: String = "cordacadevpass" override val trustStorePassword: String = "trustpass" + override val dataSourceProperties: Properties = makeTestDataSourceProperties() } val apiAddr = HostAndPort.fromParts(myNetAddr.hostText, myNetAddr.port + 1) @@ -86,11 +81,12 @@ fun main(args: Array) { advertisedServices, DemoClock()).setup().start() } node.networkMapRegistrationFuture.get() val notaryNode = node.services.networkMapCache.notaryNodes[0] + val rateOracle = node.services.networkMapCache.ratesOracleNodes[0] // Make a garbage transaction that includes a rate fix. val tx = TransactionType.General.Builder(notaryNode.identity) tx.addOutputState(TransactionState(Cash.State(1500.DOLLARS `issued by` node.storage.myLegalIdentity.ref(1), node.keyManagement.freshKey().public), notaryNode.identity)) - val protocol = RatesFixProtocol(tx, oracleNode.identity, fixOf, expectedRate, rateTolerance) + val protocol = RatesFixProtocol(tx, rateOracle.identity, fixOf, expectedRate, rateTolerance) node.smm.add("demo.ratefix", protocol).get() node.stop() diff --git a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt index 4b591469d1..7c3ce5c88c 100644 --- a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt @@ -140,7 +140,7 @@ fun main(args: Array) { val path = Paths.get(baseDirectory, Role.BUYER.name.toLowerCase(), "identity-public") val party = Files.readAllBytes(path).deserialize() advertisedServices = emptySet() - NodeInfo(ArtemisMessagingClient.makeRecipient(theirNetAddr), party, setOf(NetworkMapService.Type)) + NodeInfo(ArtemisMessagingClient.makeNetworkMapAddress(theirNetAddr), party, setOf(NetworkMapService.Type)) } // And now construct then start the node object. It takes a little while.