diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/NetworkMapCache.kt b/core/src/main/kotlin/com/r3corda/core/node/services/NetworkMapCache.kt index d909a8eb79..001ef52f50 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/services/NetworkMapCache.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/services/NetworkMapCache.kt @@ -20,8 +20,8 @@ interface NetworkMapCache { val logger = LoggerFactory.getLogger(NetworkMapCache::class.java) } - enum class MapChangeType { Added, Removed } - data class MapChange(val node: NodeInfo, val type: MapChangeType ) + enum class MapChangeType { Added, Removed, Modified } + data class MapChange(val node: NodeInfo, val prevNodeInfo: NodeInfo?, val type: MapChangeType ) /** A list of nodes that advertise a network map service */ val networkMapNodes: List<NodeInfo> diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index 6db3a1c656..8bccbc1540 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -63,9 +63,8 @@ interface DriverDSLExposedInterface { * * @param providedName name of the client, which will be used for creating its directory. * @param serverAddress the artemis server to connect to, for example a [Node]. - * @param clientAddress the address of the client (this is not bound by the client!), defaults to [serverAddress] if null. */ - fun startClient(providedName: String, serverAddress: HostAndPort, clientAddress: HostAndPort?): Future<ArtemisMessagingClient> + fun startClient(providedName: String, serverAddress: HostAndPort): Future<ArtemisMessagingClient> /** * Starts a local [ArtemisMessagingServer] of which there may only be one. */ @@ -75,13 +74,12 @@ interface DriverDSLExposedInterface { } fun DriverDSLExposedInterface.startClient(localServer: ArtemisMessagingServer) = - startClient("driver-local-server-client", localServer.myHostPort, localServer.myHostPort) + startClient("driver-local-server-client", localServer.myHostPort) fun DriverDSLExposedInterface.startClient(remoteNodeInfo: NodeInfo, providedName: String? = null) = startClient( providedName = providedName ?: "${remoteNodeInfo.identity.name}-client", - serverAddress = (remoteNodeInfo.address as ArtemisMessagingComponent.Address).hostAndPort, - clientAddress = null + serverAddress = ArtemisMessagingComponent.toHostAndPort(remoteNodeInfo.address) ) interface DriverDSLInternalInterface : DriverDSLExposedInterface { @@ -224,6 +222,7 @@ class DriverDSL( private val networkMapName = "NetworkMapService" private val networkMapAddress = portAllocation.nextHostAndPort() private var networkMapNodeInfo: NodeInfo? = null + private val identity = generateKeyPair() class State { val registeredProcesses = LinkedList<Process>() @@ -322,8 +321,7 @@ class DriverDSL( override fun startClient( providedName: String, - serverAddress: HostAndPort, - clientAddress: HostAndPort? + serverAddress: HostAndPort ): Future<ArtemisMessagingClient> { val nodeConfiguration = NodeConfigurationFromConfig( @@ -339,8 +337,9 @@ class DriverDSL( Paths.get(baseDirectory, providedName), nodeConfiguration, serverHostPort = serverAddress, - myHostPort = clientAddress ?: serverAddress, - executor = AffinityExecutor.ServiceAffinityExecutor(providedName, 1) + myIdentity = identity.public, + executor = AffinityExecutor.ServiceAffinityExecutor(providedName, 1), + persistentInbox = false // Do not create a permanent queue for our transient UI identity ) return Executors.newSingleThreadExecutor().submit(Callable<ArtemisMessagingClient> { @@ -368,7 +367,8 @@ class DriverDSL( val server = ArtemisMessagingServer( Paths.get(baseDirectory, name), config, - portAllocation.nextHostAndPort() + portAllocation.nextHostAndPort(), + networkMapCache ) return Executors.newSingleThreadExecutor().submit(Callable<ArtemisMessagingServer> { server.configureWithDevSSLCertificate() @@ -383,11 +383,11 @@ class DriverDSL( override fun start() { startNetworkMapService() - val networkMapClient = startClient("driver-$networkMapName-client", networkMapAddress, portAllocation.nextHostAndPort()).get() + val networkMapClient = startClient("driver-$networkMapName-client", networkMapAddress).get() // We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from // the network map service itself. val fakeNodeInfo = NodeInfo( - address = ArtemisMessagingClient.makeRecipient(networkMapAddress), + address = ArtemisMessagingClient.makeNetworkMapAddress(networkMapAddress), identity = Party( name = networkMapName, owningKey = generateKeyPair().public diff --git a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt index 8a7904cce4..1dad9bebb2 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/NodeRunner.kt @@ -35,7 +35,7 @@ class NodeRunner { val networkMapNodeInfo = if (networkMapName != null && networkMapPublicKey != null && networkMapAddress != null) { NodeInfo( - address = ArtemisMessagingClient.makeRecipient(networkMapAddress), + address = ArtemisMessagingClient.makeNetworkMapAddress(networkMapAddress), identity = Party( name = networkMapName, owningKey = networkMapPublicKey diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index e5e949f8a2..89a899e264 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -164,8 +164,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, val storageServices = initialiseStorageService(dir) storage = storageServices.first checkpointStorage = storageServices.second - net = makeMessagingService() netMapCache = InMemoryNetworkMapCache() + net = makeMessagingService() wallet = makeWalletService() identity = makeIdentityService() diff --git a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt index 245811f0fb..0b7828e04e 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt @@ -112,11 +112,16 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, override fun makeMessagingService(): MessagingServiceInternal { val serverAddr = messagingServerAddr ?: { - messageBroker = ArtemisMessagingServer(dir, configuration, p2pAddr) + messageBroker = ArtemisMessagingServer(dir, configuration, p2pAddr, services.networkMapCache) p2pAddr }() - - return ArtemisMessagingClient(dir, configuration, serverAddr, p2pAddr, serverThread) + if (networkMapService != null) { + return ArtemisMessagingClient(dir, configuration, serverAddr, services.storageService.myLegalIdentityKey.public, serverThread) + } + else + { + return ArtemisMessagingClient(dir, configuration, serverAddr, null, serverThread) + } } override fun startMessagingService() { @@ -124,6 +129,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, messageBroker?.apply { configureWithDevSSLCertificate() // TODO: Create proper certificate provisioning process start() + bridgeToNetworkMapService(networkMapService) } // Start up the MQ client. diff --git a/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt index ff91468a38..71c7361b2c 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt @@ -112,7 +112,7 @@ class FullNodeConfiguration(conf: Config) : NodeConfiguration { val clock: Clock = NodeClock() fun createNode(): Node { - val networkMapTarget = ArtemisMessagingClient.makeRecipient(mapService.address) + val networkMapTarget = ArtemisMessagingClient.makeNetworkMapAddress(mapService.address) val advertisedServices = mutableSetOf<ServiceType>() if (mapService.hostServiceLocally) advertisedServices.add(NetworkMapService.Type) if (hostNotaryServiceLocally) advertisedServices.add(SimpleNotaryService.Type) diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt index 658b1aa2e2..aecefec74f 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingClient.kt @@ -5,7 +5,6 @@ import com.r3corda.core.ThreadBox import com.r3corda.core.messaging.* import com.r3corda.core.serialization.opaque import com.r3corda.core.utilities.loggerFor -import com.r3corda.node.internal.Node import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.utilities.AffinityExecutor @@ -14,8 +13,8 @@ import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.* import java.nio.file.FileSystems import java.nio.file.Path +import java.security.PublicKey import java.time.Instant -import java.util.* import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CountDownLatch import java.util.concurrent.Executor @@ -31,14 +30,19 @@ import javax.annotation.concurrent.ThreadSafe * through into Artemis and from there, back through to senders. * * @param serverHostPort The address of the broker instance to connect to (might be running in the same process) - * @param myHostPort What host and port to use as an address for incoming messages + * @param myIdentity Either the public key to be used as the ArtemisMQ address and queue name for the node globally, or null to indicate + * that this is a NetworkMapService node which will be bound globally to the name "networkmap" + * @param executor An executor to run received message tasks upon. + * @param persistentInbox If true the inbox will be created persistent if not already created. + * If false the inbox queue will be transient, which is appropriate for UI clients for example. */ @ThreadSafe class ArtemisMessagingClient(directory: Path, config: NodeConfiguration, val serverHostPort: HostAndPort, - val myHostPort: HostAndPort, - val executor: AffinityExecutor) : ArtemisMessagingComponent(directory, config), MessagingServiceInternal { + val myIdentity: PublicKey?, + val executor: AffinityExecutor, + val persistentInbox: Boolean = true) : ArtemisMessagingComponent(directory, config), MessagingServiceInternal { companion object { val log = loggerFor<ArtemisMessagingClient>() @@ -50,17 +54,19 @@ class ArtemisMessagingClient(directory: Path, val SESSION_ID_PROPERTY = "session-id" - /** Temp helper until network map is established. */ - fun makeRecipient(hostAndPort: HostAndPort): SingleMessageRecipient = Address(hostAndPort) - - fun makeRecipient(hostname: String) = makeRecipient(toHostAndPort(hostname)) - fun toHostAndPort(hostname: String) = HostAndPort.fromString(hostname).withDefaultPort(Node.DEFAULT_PORT) + /** + * This should be the only way to generate an ArtemisAddress and that only of the remote NetworkMapService node. + * All other addresses come from the NetworkMapCache, or myAddress below. + * The node will populate with their own identity based address when they register with the NetworkMapService. + */ + fun makeNetworkMapAddress(hostAndPort: HostAndPort): SingleMessageRecipient = NetworkMapAddress(hostAndPort) } private class InnerState { var started = false var running = false - val producers = HashMap<Address, ClientProducer>() + val knownQueues = mutableSetOf<SimpleString>() + var producer: ClientProducer? = null var consumer: ClientConsumer? = null var session: ClientSession? = null var clientFactory: ClientSessionFactory? = null @@ -71,7 +77,10 @@ class ArtemisMessagingClient(directory: Path, val topicSession: TopicSession, val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration - override val myAddress: SingleMessageRecipient = Address(myHostPort) + /** + * Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache. + */ + override val myAddress: SingleMessageRecipient = if (myIdentity != null) NodeAddress(myIdentity, serverHostPort) else NetworkMapAddress(serverHostPort) private val state = ThreadBox(InnerState()) private val handlers = CopyOnWriteArrayList<Handler>() @@ -94,14 +103,20 @@ class ArtemisMessagingClient(directory: Path, val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport) clientFactory = locator.createSessionFactory() - // Create a queue on which to receive messages and set up the handler. - val session = clientFactory!!.createSession() + // Create a session and configure to commit manually after each acknowledge. (N.B. ackBatchSize is in Bytes!!!) + val session = clientFactory!!.createSession(true, true, 1) this.session = session - val address = myHostPort.toString() - val queueName = myHostPort.toString() - session.createQueue(address, queueName, false) + // Create a queue on which to receive messages and set up the handler. + val queueName = toQueueName(myAddress) + val query = session.queueQuery(queueName) + if (!query.isExists) { + session.createQueue(queueName, queueName, persistentInbox) + } + knownQueues.add(queueName) consumer = session.createConsumer(queueName) + producer = session.createProducer() + session.start() } } @@ -166,6 +181,7 @@ class ArtemisMessagingClient(directory: Path, } val topic = message.getStringProperty(TOPIC_PROPERTY) val sessionID = message.getLongProperty(SESSION_ID_PROPERTY) + log.info("received message from: ${message.address} topic: $topic sessionID: $sessionID") val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) } @@ -243,9 +259,10 @@ class ArtemisMessagingClient(directory: Path, shutdownLatch.await() } state.locked { - for (producer in producers.values) producer.close() - producers.clear() - + producer?.close() + producer = null + // Ensure any trailing messages are committed to the journal + session!!.commit() // Closing the factory closes all the sessions it produced as well. clientFactory!!.close() clientFactory = null @@ -253,9 +270,7 @@ class ArtemisMessagingClient(directory: Path, } override fun send(message: Message, target: MessageRecipients) { - if (target !is Address) - TODO("Only simple sends to single recipients are currently implemented") - + val queueName = toQueueName(target) state.locked { val artemisMessage = session!!.createMessage(true).apply { val sessionID = message.topicSession.sessionID @@ -264,21 +279,20 @@ class ArtemisMessagingClient(directory: Path, writeBodyBufferBytes(message.data) } - val producer = producers.getOrPut(target) { - if (target != myAddress) - maybeCreateQueue(target.hostAndPort) - session!!.createProducer(target.hostAndPort.toString()) + if (knownQueues.add(queueName)) { + maybeCreateQueue(queueName) } - producer.send(artemisMessage) + log.info("send to: $queueName topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID}") + producer!!.send(queueName, artemisMessage) } } - private fun maybeCreateQueue(hostAndPort: HostAndPort) { + private fun maybeCreateQueue(queueName: SimpleString) { state.alreadyLocked { - val name = hostAndPort.toString() - val queueQuery = session!!.queueQuery(SimpleString(name)) + val queueQuery = session!!.queueQuery(queueName) if (!queueQuery.isExists) { - session!!.createQueue(name, name, true /* durable */) + log.info("create client queue $queueName") + session!!.createQueue(queueName, queueName, true /* durable */) } } } diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingComponent.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingComponent.kt index 39231d4e09..dda497d869 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingComponent.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingComponent.kt @@ -2,15 +2,20 @@ package com.r3corda.node.services.messaging import com.google.common.net.HostAndPort import com.r3corda.core.crypto.X509Utilities +import com.r3corda.core.crypto.parsePublicKeyBase58 +import com.r3corda.core.crypto.toBase58String +import com.r3corda.core.messaging.MessageRecipients import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.node.services.config.NodeConfiguration +import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.TransportConfiguration import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants import java.nio.file.Files import java.nio.file.Path +import java.security.PublicKey /** * The base class for Artemis services that defines shared data structures and transport configuration @@ -22,8 +27,69 @@ abstract class ArtemisMessagingComponent(val directory: Path, val config: NodeCo private val keyStorePath = directory.resolve("certificates").resolve("sslkeystore.jks") private val trustStorePath = directory.resolve("certificates").resolve("truststore.jks") + + companion object { + const val PEERS_PREFIX = "peers." + + @JvmStatic + protected val NETWORK_MAP_ADDRESS = SimpleString(PEERS_PREFIX +"networkmap") + + /** + * Assuming the passed in target address is actually an ArtemisAddress will extract the host and port of the node. This should + * only be used in unit tests and the internals of the messaging services to keep addressing opaque for the future. + * N.B. Marked as JvmStatic to allow use in the inherited classes. + */ + @JvmStatic + internal fun toHostAndPort(target: MessageRecipients): HostAndPort { + val addr = target as? ArtemisMessagingComponent.ArtemisAddress ?: throw IllegalArgumentException("Not an Artemis address") + return addr.hostAndPort + } + + /** + * Assuming the passed in target address is actually an ArtemisAddress will extract the queue name used. + * For now the queue name is the Base58 version of the node's identity. + * This should only be used in the internals of the messaging services to keep addressing opaque for the future. + * N.B. Marked as JvmStatic to allow use in the inherited classes. + */ + @JvmStatic + protected fun toQueueName(target: MessageRecipients): SimpleString { + val addr = target as? ArtemisMessagingComponent.ArtemisAddress ?: throw IllegalArgumentException("Not an Artemis address") + return addr.queueName + + } + } + + protected interface ArtemisAddress { + val queueName: SimpleString + val hostAndPort: HostAndPort + } + + protected data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress { + override val queueName: SimpleString = NETWORK_MAP_ADDRESS + } + // In future: can contain onion routing info, etc. - data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient + protected data class NodeAddress(val identity: PublicKey, override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress { + override val queueName: SimpleString by lazy { SimpleString(PEERS_PREFIX+identity.toBase58String()) } + + override fun toString(): String { + return "NodeAddress(identity = $queueName, $hostAndPort" + } + } + + protected fun tryParseKeyFromQueueName(queueName: SimpleString): PublicKey? { + val name = queueName.toString() + if(!name.startsWith(PEERS_PREFIX)) { + return null + } + val keyCode = name.substring(PEERS_PREFIX.length) + return try { + parsePublicKeyBase58(keyCode) + } catch (ex: Exception) { + null + } + + } protected enum class ConnectionDirection { INBOUND, OUTBOUND } diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingServer.kt index 9012cf5141..08542ca89a 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingServer.kt @@ -3,9 +3,11 @@ package com.r3corda.node.services.messaging import com.google.common.net.HostAndPort import com.r3corda.core.ThreadBox import com.r3corda.core.crypto.newSecureRandom -import com.r3corda.core.messaging.SingleMessageRecipient +import com.r3corda.core.node.NodeInfo +import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.utilities.loggerFor import com.r3corda.node.services.config.NodeConfiguration +import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.core.config.BridgeConfiguration import org.apache.activemq.artemis.core.config.Configuration import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl @@ -15,6 +17,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule +import rx.Subscription import java.math.BigInteger import java.nio.file.Path import javax.annotation.concurrent.ThreadSafe @@ -35,7 +38,8 @@ import javax.annotation.concurrent.ThreadSafe @ThreadSafe class ArtemisMessagingServer(directory: Path, config: NodeConfiguration, - val myHostPort: HostAndPort) : ArtemisMessagingComponent(directory, config) { + val myHostPort: HostAndPort, + val networkMapCache: NetworkMapCache) : ArtemisMessagingComponent(directory, config) { companion object { val log = loggerFor<ArtemisMessagingServer>() } @@ -44,22 +48,73 @@ class ArtemisMessagingServer(directory: Path, var running = false } - val myAddress: SingleMessageRecipient = Address(myHostPort) private val mutex = ThreadBox(InnerState()) private lateinit var activeMQServer: ActiveMQServer + private var networkChangeHandle: Subscription? = null fun start() = mutex.locked { if (!running) { configureAndStartServer() + networkChangeHandle = networkMapCache.changed.subscribe { onNetworkChange(it) } running = true } } fun stop() = mutex.locked { + networkChangeHandle?.unsubscribe() + networkChangeHandle = null activeMQServer.stop() running = false } + fun bridgeToNetworkMapService(networkMapService: NodeInfo?) { + if ((networkMapService != null) && (networkMapService.address is NetworkMapAddress)) { + val query = activeMQServer.queueQuery(NETWORK_MAP_ADDRESS) + if (!query.isExists) { + activeMQServer.createQueue(NETWORK_MAP_ADDRESS, NETWORK_MAP_ADDRESS, null, true, false) + } + + maybeDeployBridgeForAddress(NETWORK_MAP_ADDRESS, networkMapService) + } + } + + private fun onNetworkChange(change: NetworkMapCache.MapChange) { + val address = change.node.address + if (address is ArtemisMessagingComponent.ArtemisAddress) { + val queueName = address.queueName + when (change.type) { + NetworkMapCache.MapChangeType.Added -> { + val query = activeMQServer.queueQuery(queueName) + if (query.isExists) { + // Queue exists so now wire up bridge + maybeDeployBridgeForAddress(queueName, change.node) + } + } + + NetworkMapCache.MapChangeType.Modified -> { + (change.prevNodeInfo?.address as? ArtemisMessagingComponent.ArtemisAddress)?.let { + // remove any previous possibly different bridge + maybeDestroyBridge(it.queueName) + } + val query = activeMQServer.queueQuery(queueName) + if (query.isExists) { + // Deploy new bridge + maybeDeployBridgeForAddress(queueName, change.node) + } + } + + NetworkMapCache.MapChangeType.Removed -> { + (change.prevNodeInfo?.address as? ArtemisMessagingComponent.ArtemisAddress)?.let { + // Remove old bridge + maybeDestroyBridge(it.queueName) + } + // just in case of NetworkMapCache version issues + maybeDestroyBridge(queueName) + } + } + } + } + private fun configureAndStartServer() { val config = createArtemisConfig(directory, myHostPort).apply { securityRoles = mapOf( @@ -74,8 +129,16 @@ class ArtemisMessagingServer(directory: Path, registerActivationFailureListener { exception -> throw exception } // Deploy bridge for a newly created queue registerPostQueueCreationCallback { queueName -> - log.trace("Queue created: $queueName") - maybeDeployBridgeForAddress(queueName.toString()) + log.info("Queue created: $queueName") + if (queueName != NETWORK_MAP_ADDRESS) { + val identity = tryParseKeyFromQueueName(queueName) + if (identity != null) { + val nodeInfo = networkMapCache.getNodeByPublicKey(identity) + if (nodeInfo != null) { + maybeDeployBridgeForAddress(queueName, nodeInfo) + } + } + } } } activeMQServer.start() @@ -102,35 +165,53 @@ class ArtemisMessagingServer(directory: Path, return ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, securityConfig) } + fun connectorExists(hostAndPort: HostAndPort) = hostAndPort.toString() in activeMQServer.configuration.connectorConfigurations + + fun addConnector(hostAndPort: HostAndPort) = activeMQServer.configuration.addConnectorConfiguration( + hostAndPort.toString(), + tcpTransport( + ConnectionDirection.OUTBOUND, + hostAndPort.hostText, + hostAndPort.port + ) + ) + + fun bridgeExists(name: SimpleString) = activeMQServer.clusterManager.bridges.containsKey(name.toString()) + + fun deployBridge(hostAndPort: HostAndPort, name: SimpleString) = activeMQServer.deployBridge(BridgeConfiguration().apply { + val nameStr = name.toString() + setName(nameStr) + queueName = nameStr + forwardingAddress = nameStr + staticConnectors = listOf(hostAndPort.toString()) + confirmationWindowSize = 100000 // a guess + }) + /** * For every queue created we need to have a bridge deployed in case the address of the queue * is that of a remote party */ - private fun maybeDeployBridgeForAddress(name: String) { - val hostAndPort = HostAndPort.fromString(name) + private fun maybeDeployBridgeForAddress(name: SimpleString, nodeInfo: NodeInfo) { + val hostAndPort = toHostAndPort(nodeInfo.address) - fun connectorExists() = name in activeMQServer.configuration.connectorConfigurations + if (hostAndPort == myHostPort) { + return + } - fun addConnector() = activeMQServer.configuration.addConnectorConfiguration( - name, - tcpTransport( - ConnectionDirection.OUTBOUND, - hostAndPort.hostText, - hostAndPort.port - ) - ) + if (!connectorExists(hostAndPort)) { + log.info("add connector $hostAndPort") + addConnector(hostAndPort) + } - fun deployBridge() = activeMQServer.deployBridge(BridgeConfiguration().apply { - setName(name) - queueName = name - forwardingAddress = name - staticConnectors = listOf(name) - confirmationWindowSize = 100000 // a guess - }) + if (!bridgeExists(name)) { + log.info("add bridge $hostAndPort $name") + deployBridge(hostAndPort, name) + } + } - if (!connectorExists()) { - addConnector() - deployBridge() + private fun maybeDestroyBridge(name: SimpleString) { + if (bridgeExists(name)) { + activeMQServer.destroyBridge(name.toString()) } } diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt index 52c5e95944..b76dd04229 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt @@ -98,13 +98,17 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach } override fun addNode(node: NodeInfo) { - registeredNodes[node.identity] = node - _changed.onNext(MapChange(node, MapChangeType.Added)) + val oldValue = registeredNodes.put(node.identity, node) + if (oldValue == null) { + _changed.onNext(MapChange(node, oldValue, MapChangeType.Added)) + } else if(oldValue != node) { + _changed.onNext(MapChange(node, oldValue, MapChangeType.Modified)) + } } override fun removeNode(node: NodeInfo) { - registeredNodes.remove(node.identity) - _changed.onNext(MapChange(node, MapChangeType.Removed)) + val oldValue = registeredNodes.remove(node.identity) + _changed.onNext(MapChange(node, oldValue, MapChangeType.Removed)) } /** diff --git a/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt index 9e88ac3967..4e5b421e1b 100644 --- a/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/driver/DriverTests.kt @@ -4,7 +4,6 @@ import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.node.services.api.RegulatorService import com.r3corda.node.services.messaging.ArtemisMessagingComponent -import com.r3corda.node.services.transactions.NotaryService import com.r3corda.node.services.transactions.SimpleNotaryService import org.junit.Test @@ -12,7 +11,7 @@ import org.junit.Test class DriverTests { companion object { fun nodeMustBeUp(networkMapCache: NetworkMapCache, nodeInfo: NodeInfo, nodeName: String) { - val address = nodeInfo.address as ArtemisMessagingComponent.Address + val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address) // Check that the node is registered in the network map poll("network map cache for $nodeName") { networkMapCache.get().firstOrNull { @@ -20,13 +19,13 @@ class DriverTests { } } // Check that the port is bound - addressMustBeBound(address.hostAndPort) + addressMustBeBound(hostAndPort) } fun nodeMustBeDown(nodeInfo: NodeInfo) { - val address = nodeInfo.address as ArtemisMessagingComponent.Address + val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address) // Check that the port is bound - addressMustNotBeBound(address.hostAndPort) + addressMustNotBeBound(hostAndPort) } } diff --git a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt index aab6e5587f..33f15192f3 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt @@ -1,12 +1,14 @@ package com.r3corda.node.services import com.google.common.net.HostAndPort +import com.r3corda.core.crypto.generateKeyPair import com.r3corda.core.messaging.Message import com.r3corda.core.node.services.DEFAULT_SESSION_ID import com.r3corda.core.testing.freeLocalHostAndPort import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.messaging.ArtemisMessagingClient import com.r3corda.node.services.messaging.ArtemisMessagingServer +import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.utilities.AffinityExecutor import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.After @@ -25,6 +27,7 @@ class ArtemisMessagingTests { val hostAndPort = freeLocalHostAndPort() val topic = "platform.self" + val identity = generateKeyPair() val config = object : NodeConfiguration { override val myLegalName: String = "me" override val exportJMXto: String = "" @@ -36,6 +39,8 @@ class ArtemisMessagingTests { var messagingClient: ArtemisMessagingClient? = null var messagingServer: ArtemisMessagingServer? = null + val networkMapCache = InMemoryNetworkMapCache() + @After fun cleanUp() { messagingClient?.stop() @@ -98,16 +103,15 @@ class ArtemisMessagingTests { assertNull(receivedMessages.poll(200, MILLISECONDS)) } - private fun createMessagingClient(server: HostAndPort = hostAndPort, - local: HostAndPort = hostAndPort): ArtemisMessagingClient { - return ArtemisMessagingClient(temporaryFolder.newFolder().toPath(), config, server, local, AffinityExecutor.SAME_THREAD).apply { + private fun createMessagingClient(server: HostAndPort = hostAndPort): ArtemisMessagingClient { + return ArtemisMessagingClient(temporaryFolder.newFolder().toPath(), config, server, identity.public, AffinityExecutor.SAME_THREAD).apply { configureWithDevSSLCertificate() messagingClient = this } } private fun createMessagingServer(local: HostAndPort = hostAndPort): ArtemisMessagingServer { - return ArtemisMessagingServer(temporaryFolder.newFolder().toPath(), config, local).apply { + return ArtemisMessagingServer(temporaryFolder.newFolder().toPath(), config, local, networkMapCache).apply { configureWithDevSSLCertificate() messagingServer = this } diff --git a/scripts/get-rate-fix.sh b/scripts/get-rate-fix.sh index 53e30facd0..5bda2d2391 100755 --- a/scripts/get-rate-fix.sh +++ b/scripts/get-rate-fix.sh @@ -21,4 +21,4 @@ fi # Upload the rates to the buyer node curl -F rates=@scripts/example.rates.txt http://localhost:31338/upload/interest-rates -$bin --network-address=localhost:31300 --directory=build/trader-demo/rates-fix --network-map=localhost:31337 --network-map-identity-file=build/trader-demo/buyer/identity-public --oracle=localhost --oracle-identity-file=build/trader-demo/buyer/identity-public \ No newline at end of file +$bin --network-address=localhost:31300 --directory=build/trader-demo/rates-fix --network-map=localhost:31337 --network-map-identity-file=build/trader-demo/buyer/identity-public \ No newline at end of file diff --git a/src/main/kotlin/com/r3corda/demos/IRSDemo.kt b/src/main/kotlin/com/r3corda/demos/IRSDemo.kt index 0d72427503..3138b98073 100644 --- a/src/main/kotlin/com/r3corda/demos/IRSDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/IRSDemo.kt @@ -374,7 +374,7 @@ private fun runTrade(cliParams: CliParams.Trade): Int { private fun createRecipient(addr: String): SingleMessageRecipient { val hostAndPort = HostAndPort.fromString(addr).withDefaultPort(Node.DEFAULT_PORT) - return ArtemisMessagingClient.makeRecipient(hostAndPort) + return ArtemisMessagingClient.makeNetworkMapAddress(hostAndPort) } private fun startNode(params: CliParams.RunNode, networkMap: SingleMessageRecipient): Node { diff --git a/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt b/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt index 139661a05b..2010f4f24a 100644 --- a/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt @@ -4,10 +4,10 @@ import com.google.common.net.HostAndPort import com.r3corda.contracts.asset.Cash import com.r3corda.core.contracts.* import com.r3corda.core.crypto.Party -import com.r3corda.core.hours import com.r3corda.core.logElapsedTime import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.ServiceType +import com.r3corda.core.node.services.testing.makeTestDataSourceProperties import com.r3corda.core.serialization.deserialize import com.r3corda.core.utilities.Emoji import com.r3corda.core.utilities.LogHelper @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory import java.math.BigDecimal import java.nio.file.Files import java.nio.file.Paths +import java.util.* import kotlin.system.exitProcess private val log: Logger = LoggerFactory.getLogger("RatesFixDemo") @@ -38,8 +39,6 @@ fun main(args: Array<String>) { val dirArg = parser.accepts("directory").withRequiredArg().defaultsTo("rate-fix-demo-data") val networkMapAddrArg = parser.accepts("network-map").withRequiredArg().required() val networkMapIdentityArg = parser.accepts("network-map-identity-file").withRequiredArg().required() - val oracleAddrArg = parser.accepts("oracle").withRequiredArg().required() - val oracleIdentityArg = parser.accepts("oracle-identity-file").withRequiredArg().required() val fixOfArg = parser.accepts("fix-of").withRequiredArg().defaultsTo("ICE LIBOR 2016-03-16 1M") val expectedRateArg = parser.accepts("expected-rate").withRequiredArg().defaultsTo("0.67") @@ -56,28 +55,24 @@ fun main(args: Array<String>) { LogHelper.setLevel("+RatesFixDemo", "-org.apache.activemq") val dir = Paths.get(options.valueOf(dirArg)) - val networkMapAddr = ArtemisMessagingClient.makeRecipient(options.valueOf(networkMapAddrArg)) + val networkMapAddr = ArtemisMessagingClient.makeNetworkMapAddress(HostAndPort.fromString(options.valueOf(networkMapAddrArg))) val networkMapIdentity = Files.readAllBytes(Paths.get(options.valueOf(networkMapIdentityArg))).deserialize<Party>() val networkMapAddress = NodeInfo(networkMapAddr, networkMapIdentity, setOf(NetworkMapService.Type, NotaryService.Type)) - // Load oracle stuff (in lieu of having a network map service) - val oracleAddr = ArtemisMessagingClient.makeRecipient(options.valueOf(oracleAddrArg)) - val oracleIdentity = Files.readAllBytes(Paths.get(options.valueOf(oracleIdentityArg))).deserialize<Party>() - val oracleNode = NodeInfo(oracleAddr, oracleIdentity) - val fixOf: FixOf = NodeInterestRates.parseFixOf(options.valueOf(fixOfArg)) val expectedRate = BigDecimal(options.valueOf(expectedRateArg)) val rateTolerance = BigDecimal(options.valueOf(rateToleranceArg)) // Bring up node. val advertisedServices: Set<ServiceType> = emptySet() - val myNetAddr = ArtemisMessagingClient.toHostAndPort(options.valueOf(networkAddressArg)) + val myNetAddr = HostAndPort.fromString(options.valueOf(networkAddressArg)) val config = object : NodeConfiguration { override val myLegalName: String = "Rate fix demo node" override val exportJMXto: String = "http" override val nearestCity: String = "Atlantis" override val keyStorePassword: String = "cordacadevpass" override val trustStorePassword: String = "trustpass" + override val dataSourceProperties: Properties = makeTestDataSourceProperties() } val apiAddr = HostAndPort.fromParts(myNetAddr.hostText, myNetAddr.port + 1) @@ -86,11 +81,12 @@ fun main(args: Array<String>) { advertisedServices, DemoClock()).setup().start() } node.networkMapRegistrationFuture.get() val notaryNode = node.services.networkMapCache.notaryNodes[0] + val rateOracle = node.services.networkMapCache.ratesOracleNodes[0] // Make a garbage transaction that includes a rate fix. val tx = TransactionType.General.Builder(notaryNode.identity) tx.addOutputState(TransactionState(Cash.State(1500.DOLLARS `issued by` node.storage.myLegalIdentity.ref(1), node.keyManagement.freshKey().public), notaryNode.identity)) - val protocol = RatesFixProtocol(tx, oracleNode.identity, fixOf, expectedRate, rateTolerance) + val protocol = RatesFixProtocol(tx, rateOracle.identity, fixOf, expectedRate, rateTolerance) node.smm.add("demo.ratefix", protocol).get() node.stop() diff --git a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt index 4b591469d1..7c3ce5c88c 100644 --- a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt @@ -140,7 +140,7 @@ fun main(args: Array<String>) { val path = Paths.get(baseDirectory, Role.BUYER.name.toLowerCase(), "identity-public") val party = Files.readAllBytes(path).deserialize<Party>() advertisedServices = emptySet() - NodeInfo(ArtemisMessagingClient.makeRecipient(theirNetAddr), party, setOf(NetworkMapService.Type)) + NodeInfo(ArtemisMessagingClient.makeNetworkMapAddress(theirNetAddr), party, setOf(NetworkMapService.Type)) } // And now construct then start the node object. It takes a little while.