mirror of
https://github.com/corda/corda.git
synced 2025-02-20 17:33:15 +00:00
Only NetworkMapServer addresses can be publicly manufactured. Use identity publick key as addressing, with only bridges using the HostAndPort information
Fixup after rebase and fix issue with checking previous deployment of bridges Correct comments on ArtemisMessagingClient constructor Fixup rates fix demo Get rid of when statements Make NetworkMapCache send modify as well as add//remove events. Make inboxes for nodes persistent. Suppress warnings Fix message acknowledgement so that it actually consumes messages properly. Change queueName to SimpleString to stop lots of wasted conversions Get rid of spurious import Tidy up and add comments Update to include comments on PR Remove unnecessary import
This commit is contained in:
parent
99a0da9202
commit
089ba2cb69
@ -20,8 +20,8 @@ interface NetworkMapCache {
|
||||
val logger = LoggerFactory.getLogger(NetworkMapCache::class.java)
|
||||
}
|
||||
|
||||
enum class MapChangeType { Added, Removed }
|
||||
data class MapChange(val node: NodeInfo, val type: MapChangeType )
|
||||
enum class MapChangeType { Added, Removed, Modified }
|
||||
data class MapChange(val node: NodeInfo, val prevNodeInfo: NodeInfo?, val type: MapChangeType )
|
||||
|
||||
/** A list of nodes that advertise a network map service */
|
||||
val networkMapNodes: List<NodeInfo>
|
||||
|
@ -63,9 +63,8 @@ interface DriverDSLExposedInterface {
|
||||
*
|
||||
* @param providedName name of the client, which will be used for creating its directory.
|
||||
* @param serverAddress the artemis server to connect to, for example a [Node].
|
||||
* @param clientAddress the address of the client (this is not bound by the client!), defaults to [serverAddress] if null.
|
||||
*/
|
||||
fun startClient(providedName: String, serverAddress: HostAndPort, clientAddress: HostAndPort?): Future<ArtemisMessagingClient>
|
||||
fun startClient(providedName: String, serverAddress: HostAndPort): Future<ArtemisMessagingClient>
|
||||
/**
|
||||
* Starts a local [ArtemisMessagingServer] of which there may only be one.
|
||||
*/
|
||||
@ -75,13 +74,12 @@ interface DriverDSLExposedInterface {
|
||||
}
|
||||
|
||||
fun DriverDSLExposedInterface.startClient(localServer: ArtemisMessagingServer) =
|
||||
startClient("driver-local-server-client", localServer.myHostPort, localServer.myHostPort)
|
||||
startClient("driver-local-server-client", localServer.myHostPort)
|
||||
|
||||
fun DriverDSLExposedInterface.startClient(remoteNodeInfo: NodeInfo, providedName: String? = null) =
|
||||
startClient(
|
||||
providedName = providedName ?: "${remoteNodeInfo.identity.name}-client",
|
||||
serverAddress = (remoteNodeInfo.address as ArtemisMessagingComponent.Address).hostAndPort,
|
||||
clientAddress = null
|
||||
serverAddress = ArtemisMessagingComponent.toHostAndPort(remoteNodeInfo.address)
|
||||
)
|
||||
|
||||
interface DriverDSLInternalInterface : DriverDSLExposedInterface {
|
||||
@ -224,6 +222,7 @@ class DriverDSL(
|
||||
private val networkMapName = "NetworkMapService"
|
||||
private val networkMapAddress = portAllocation.nextHostAndPort()
|
||||
private var networkMapNodeInfo: NodeInfo? = null
|
||||
private val identity = generateKeyPair()
|
||||
|
||||
class State {
|
||||
val registeredProcesses = LinkedList<Process>()
|
||||
@ -322,8 +321,7 @@ class DriverDSL(
|
||||
|
||||
override fun startClient(
|
||||
providedName: String,
|
||||
serverAddress: HostAndPort,
|
||||
clientAddress: HostAndPort?
|
||||
serverAddress: HostAndPort
|
||||
): Future<ArtemisMessagingClient> {
|
||||
|
||||
val nodeConfiguration = NodeConfigurationFromConfig(
|
||||
@ -339,8 +337,9 @@ class DriverDSL(
|
||||
Paths.get(baseDirectory, providedName),
|
||||
nodeConfiguration,
|
||||
serverHostPort = serverAddress,
|
||||
myHostPort = clientAddress ?: serverAddress,
|
||||
executor = AffinityExecutor.ServiceAffinityExecutor(providedName, 1)
|
||||
myIdentity = identity.public,
|
||||
executor = AffinityExecutor.ServiceAffinityExecutor(providedName, 1),
|
||||
persistentInbox = false // Do not create a permanent queue for our transient UI identity
|
||||
)
|
||||
|
||||
return Executors.newSingleThreadExecutor().submit(Callable<ArtemisMessagingClient> {
|
||||
@ -368,7 +367,8 @@ class DriverDSL(
|
||||
val server = ArtemisMessagingServer(
|
||||
Paths.get(baseDirectory, name),
|
||||
config,
|
||||
portAllocation.nextHostAndPort()
|
||||
portAllocation.nextHostAndPort(),
|
||||
networkMapCache
|
||||
)
|
||||
return Executors.newSingleThreadExecutor().submit(Callable<ArtemisMessagingServer> {
|
||||
server.configureWithDevSSLCertificate()
|
||||
@ -383,11 +383,11 @@ class DriverDSL(
|
||||
|
||||
override fun start() {
|
||||
startNetworkMapService()
|
||||
val networkMapClient = startClient("driver-$networkMapName-client", networkMapAddress, portAllocation.nextHostAndPort()).get()
|
||||
val networkMapClient = startClient("driver-$networkMapName-client", networkMapAddress).get()
|
||||
// We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from
|
||||
// the network map service itself.
|
||||
val fakeNodeInfo = NodeInfo(
|
||||
address = ArtemisMessagingClient.makeRecipient(networkMapAddress),
|
||||
address = ArtemisMessagingClient.makeNetworkMapAddress(networkMapAddress),
|
||||
identity = Party(
|
||||
name = networkMapName,
|
||||
owningKey = generateKeyPair().public
|
||||
|
@ -35,7 +35,7 @@ class NodeRunner {
|
||||
val networkMapNodeInfo =
|
||||
if (networkMapName != null && networkMapPublicKey != null && networkMapAddress != null) {
|
||||
NodeInfo(
|
||||
address = ArtemisMessagingClient.makeRecipient(networkMapAddress),
|
||||
address = ArtemisMessagingClient.makeNetworkMapAddress(networkMapAddress),
|
||||
identity = Party(
|
||||
name = networkMapName,
|
||||
owningKey = networkMapPublicKey
|
||||
|
@ -164,8 +164,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
val storageServices = initialiseStorageService(dir)
|
||||
storage = storageServices.first
|
||||
checkpointStorage = storageServices.second
|
||||
net = makeMessagingService()
|
||||
netMapCache = InMemoryNetworkMapCache()
|
||||
net = makeMessagingService()
|
||||
wallet = makeWalletService()
|
||||
|
||||
identity = makeIdentityService()
|
||||
|
@ -112,11 +112,16 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
|
||||
|
||||
override fun makeMessagingService(): MessagingServiceInternal {
|
||||
val serverAddr = messagingServerAddr ?: {
|
||||
messageBroker = ArtemisMessagingServer(dir, configuration, p2pAddr)
|
||||
messageBroker = ArtemisMessagingServer(dir, configuration, p2pAddr, services.networkMapCache)
|
||||
p2pAddr
|
||||
}()
|
||||
|
||||
return ArtemisMessagingClient(dir, configuration, serverAddr, p2pAddr, serverThread)
|
||||
if (networkMapService != null) {
|
||||
return ArtemisMessagingClient(dir, configuration, serverAddr, services.storageService.myLegalIdentityKey.public, serverThread)
|
||||
}
|
||||
else
|
||||
{
|
||||
return ArtemisMessagingClient(dir, configuration, serverAddr, null, serverThread)
|
||||
}
|
||||
}
|
||||
|
||||
override fun startMessagingService() {
|
||||
@ -124,6 +129,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
|
||||
messageBroker?.apply {
|
||||
configureWithDevSSLCertificate() // TODO: Create proper certificate provisioning process
|
||||
start()
|
||||
bridgeToNetworkMapService(networkMapService)
|
||||
}
|
||||
|
||||
// Start up the MQ client.
|
||||
|
@ -112,7 +112,7 @@ class FullNodeConfiguration(conf: Config) : NodeConfiguration {
|
||||
val clock: Clock = NodeClock()
|
||||
|
||||
fun createNode(): Node {
|
||||
val networkMapTarget = ArtemisMessagingClient.makeRecipient(mapService.address)
|
||||
val networkMapTarget = ArtemisMessagingClient.makeNetworkMapAddress(mapService.address)
|
||||
val advertisedServices = mutableSetOf<ServiceType>()
|
||||
if (mapService.hostServiceLocally) advertisedServices.add(NetworkMapService.Type)
|
||||
if (hostNotaryServiceLocally) advertisedServices.add(SimpleNotaryService.Type)
|
||||
|
@ -5,7 +5,6 @@ import com.r3corda.core.ThreadBox
|
||||
import com.r3corda.core.messaging.*
|
||||
import com.r3corda.core.serialization.opaque
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.node.internal.Node
|
||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
@ -14,8 +13,8 @@ import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import java.nio.file.FileSystems
|
||||
import java.nio.file.Path
|
||||
import java.security.PublicKey
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.Executor
|
||||
@ -31,14 +30,19 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
* through into Artemis and from there, back through to senders.
|
||||
*
|
||||
* @param serverHostPort The address of the broker instance to connect to (might be running in the same process)
|
||||
* @param myHostPort What host and port to use as an address for incoming messages
|
||||
* @param myIdentity Either the public key to be used as the ArtemisMQ address and queue name for the node globally, or null to indicate
|
||||
* that this is a NetworkMapService node which will be bound globally to the name "networkmap"
|
||||
* @param executor An executor to run received message tasks upon.
|
||||
* @param persistentInbox If true the inbox will be created persistent if not already created.
|
||||
* If false the inbox queue will be transient, which is appropriate for UI clients for example.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class ArtemisMessagingClient(directory: Path,
|
||||
config: NodeConfiguration,
|
||||
val serverHostPort: HostAndPort,
|
||||
val myHostPort: HostAndPort,
|
||||
val executor: AffinityExecutor) : ArtemisMessagingComponent(directory, config), MessagingServiceInternal {
|
||||
val myIdentity: PublicKey?,
|
||||
val executor: AffinityExecutor,
|
||||
val persistentInbox: Boolean = true) : ArtemisMessagingComponent(directory, config), MessagingServiceInternal {
|
||||
companion object {
|
||||
val log = loggerFor<ArtemisMessagingClient>()
|
||||
|
||||
@ -50,17 +54,19 @@ class ArtemisMessagingClient(directory: Path,
|
||||
|
||||
val SESSION_ID_PROPERTY = "session-id"
|
||||
|
||||
/** Temp helper until network map is established. */
|
||||
fun makeRecipient(hostAndPort: HostAndPort): SingleMessageRecipient = Address(hostAndPort)
|
||||
|
||||
fun makeRecipient(hostname: String) = makeRecipient(toHostAndPort(hostname))
|
||||
fun toHostAndPort(hostname: String) = HostAndPort.fromString(hostname).withDefaultPort(Node.DEFAULT_PORT)
|
||||
/**
|
||||
* This should be the only way to generate an ArtemisAddress and that only of the remote NetworkMapService node.
|
||||
* All other addresses come from the NetworkMapCache, or myAddress below.
|
||||
* The node will populate with their own identity based address when they register with the NetworkMapService.
|
||||
*/
|
||||
fun makeNetworkMapAddress(hostAndPort: HostAndPort): SingleMessageRecipient = NetworkMapAddress(hostAndPort)
|
||||
}
|
||||
|
||||
private class InnerState {
|
||||
var started = false
|
||||
var running = false
|
||||
val producers = HashMap<Address, ClientProducer>()
|
||||
val knownQueues = mutableSetOf<SimpleString>()
|
||||
var producer: ClientProducer? = null
|
||||
var consumer: ClientConsumer? = null
|
||||
var session: ClientSession? = null
|
||||
var clientFactory: ClientSessionFactory? = null
|
||||
@ -71,7 +77,10 @@ class ArtemisMessagingClient(directory: Path,
|
||||
val topicSession: TopicSession,
|
||||
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
|
||||
|
||||
override val myAddress: SingleMessageRecipient = Address(myHostPort)
|
||||
/**
|
||||
* Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache.
|
||||
*/
|
||||
override val myAddress: SingleMessageRecipient = if (myIdentity != null) NodeAddress(myIdentity, serverHostPort) else NetworkMapAddress(serverHostPort)
|
||||
|
||||
private val state = ThreadBox(InnerState())
|
||||
private val handlers = CopyOnWriteArrayList<Handler>()
|
||||
@ -94,14 +103,20 @@ class ArtemisMessagingClient(directory: Path,
|
||||
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport)
|
||||
clientFactory = locator.createSessionFactory()
|
||||
|
||||
// Create a queue on which to receive messages and set up the handler.
|
||||
val session = clientFactory!!.createSession()
|
||||
// Create a session and configure to commit manually after each acknowledge. (N.B. ackBatchSize is in Bytes!!!)
|
||||
val session = clientFactory!!.createSession(true, true, 1)
|
||||
this.session = session
|
||||
|
||||
val address = myHostPort.toString()
|
||||
val queueName = myHostPort.toString()
|
||||
session.createQueue(address, queueName, false)
|
||||
// Create a queue on which to receive messages and set up the handler.
|
||||
val queueName = toQueueName(myAddress)
|
||||
val query = session.queueQuery(queueName)
|
||||
if (!query.isExists) {
|
||||
session.createQueue(queueName, queueName, persistentInbox)
|
||||
}
|
||||
knownQueues.add(queueName)
|
||||
consumer = session.createConsumer(queueName)
|
||||
producer = session.createProducer()
|
||||
|
||||
session.start()
|
||||
}
|
||||
}
|
||||
@ -166,6 +181,7 @@ class ArtemisMessagingClient(directory: Path,
|
||||
}
|
||||
val topic = message.getStringProperty(TOPIC_PROPERTY)
|
||||
val sessionID = message.getLongProperty(SESSION_ID_PROPERTY)
|
||||
log.info("received message from: ${message.address} topic: $topic sessionID: $sessionID")
|
||||
|
||||
val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
|
||||
|
||||
@ -243,9 +259,10 @@ class ArtemisMessagingClient(directory: Path,
|
||||
shutdownLatch.await()
|
||||
}
|
||||
state.locked {
|
||||
for (producer in producers.values) producer.close()
|
||||
producers.clear()
|
||||
|
||||
producer?.close()
|
||||
producer = null
|
||||
// Ensure any trailing messages are committed to the journal
|
||||
session!!.commit()
|
||||
// Closing the factory closes all the sessions it produced as well.
|
||||
clientFactory!!.close()
|
||||
clientFactory = null
|
||||
@ -253,9 +270,7 @@ class ArtemisMessagingClient(directory: Path,
|
||||
}
|
||||
|
||||
override fun send(message: Message, target: MessageRecipients) {
|
||||
if (target !is Address)
|
||||
TODO("Only simple sends to single recipients are currently implemented")
|
||||
|
||||
val queueName = toQueueName(target)
|
||||
state.locked {
|
||||
val artemisMessage = session!!.createMessage(true).apply {
|
||||
val sessionID = message.topicSession.sessionID
|
||||
@ -264,21 +279,20 @@ class ArtemisMessagingClient(directory: Path,
|
||||
writeBodyBufferBytes(message.data)
|
||||
}
|
||||
|
||||
val producer = producers.getOrPut(target) {
|
||||
if (target != myAddress)
|
||||
maybeCreateQueue(target.hostAndPort)
|
||||
session!!.createProducer(target.hostAndPort.toString())
|
||||
if (knownQueues.add(queueName)) {
|
||||
maybeCreateQueue(queueName)
|
||||
}
|
||||
producer.send(artemisMessage)
|
||||
log.info("send to: $queueName topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID}")
|
||||
producer!!.send(queueName, artemisMessage)
|
||||
}
|
||||
}
|
||||
|
||||
private fun maybeCreateQueue(hostAndPort: HostAndPort) {
|
||||
private fun maybeCreateQueue(queueName: SimpleString) {
|
||||
state.alreadyLocked {
|
||||
val name = hostAndPort.toString()
|
||||
val queueQuery = session!!.queueQuery(SimpleString(name))
|
||||
val queueQuery = session!!.queueQuery(queueName)
|
||||
if (!queueQuery.isExists) {
|
||||
session!!.createQueue(name, name, true /* durable */)
|
||||
log.info("create client queue $queueName")
|
||||
session!!.createQueue(queueName, queueName, true /* durable */)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2,15 +2,20 @@ package com.r3corda.node.services.messaging
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.r3corda.core.crypto.X509Utilities
|
||||
import com.r3corda.core.crypto.parsePublicKeyBase58
|
||||
import com.r3corda.core.crypto.toBase58String
|
||||
import com.r3corda.core.messaging.MessageRecipients
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.security.PublicKey
|
||||
|
||||
/**
|
||||
* The base class for Artemis services that defines shared data structures and transport configuration
|
||||
@ -22,8 +27,69 @@ abstract class ArtemisMessagingComponent(val directory: Path, val config: NodeCo
|
||||
private val keyStorePath = directory.resolve("certificates").resolve("sslkeystore.jks")
|
||||
private val trustStorePath = directory.resolve("certificates").resolve("truststore.jks")
|
||||
|
||||
|
||||
companion object {
|
||||
const val PEERS_PREFIX = "peers."
|
||||
|
||||
@JvmStatic
|
||||
protected val NETWORK_MAP_ADDRESS = SimpleString(PEERS_PREFIX +"networkmap")
|
||||
|
||||
/**
|
||||
* Assuming the passed in target address is actually an ArtemisAddress will extract the host and port of the node. This should
|
||||
* only be used in unit tests and the internals of the messaging services to keep addressing opaque for the future.
|
||||
* N.B. Marked as JvmStatic to allow use in the inherited classes.
|
||||
*/
|
||||
@JvmStatic
|
||||
internal fun toHostAndPort(target: MessageRecipients): HostAndPort {
|
||||
val addr = target as? ArtemisMessagingComponent.ArtemisAddress ?: throw IllegalArgumentException("Not an Artemis address")
|
||||
return addr.hostAndPort
|
||||
}
|
||||
|
||||
/**
|
||||
* Assuming the passed in target address is actually an ArtemisAddress will extract the queue name used.
|
||||
* For now the queue name is the Base58 version of the node's identity.
|
||||
* This should only be used in the internals of the messaging services to keep addressing opaque for the future.
|
||||
* N.B. Marked as JvmStatic to allow use in the inherited classes.
|
||||
*/
|
||||
@JvmStatic
|
||||
protected fun toQueueName(target: MessageRecipients): SimpleString {
|
||||
val addr = target as? ArtemisMessagingComponent.ArtemisAddress ?: throw IllegalArgumentException("Not an Artemis address")
|
||||
return addr.queueName
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
protected interface ArtemisAddress {
|
||||
val queueName: SimpleString
|
||||
val hostAndPort: HostAndPort
|
||||
}
|
||||
|
||||
protected data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress {
|
||||
override val queueName: SimpleString = NETWORK_MAP_ADDRESS
|
||||
}
|
||||
|
||||
// In future: can contain onion routing info, etc.
|
||||
data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient
|
||||
protected data class NodeAddress(val identity: PublicKey, override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress {
|
||||
override val queueName: SimpleString by lazy { SimpleString(PEERS_PREFIX+identity.toBase58String()) }
|
||||
|
||||
override fun toString(): String {
|
||||
return "NodeAddress(identity = $queueName, $hostAndPort"
|
||||
}
|
||||
}
|
||||
|
||||
protected fun tryParseKeyFromQueueName(queueName: SimpleString): PublicKey? {
|
||||
val name = queueName.toString()
|
||||
if(!name.startsWith(PEERS_PREFIX)) {
|
||||
return null
|
||||
}
|
||||
val keyCode = name.substring(PEERS_PREFIX.length)
|
||||
return try {
|
||||
parsePublicKeyBase58(keyCode)
|
||||
} catch (ex: Exception) {
|
||||
null
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected enum class ConnectionDirection { INBOUND, OUTBOUND }
|
||||
|
||||
|
@ -3,9 +3,11 @@ package com.r3corda.node.services.messaging
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.r3corda.core.ThreadBox
|
||||
import com.r3corda.core.crypto.newSecureRandom
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.NetworkMapCache
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.core.config.BridgeConfiguration
|
||||
import org.apache.activemq.artemis.core.config.Configuration
|
||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
|
||||
@ -15,6 +17,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
|
||||
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule
|
||||
import rx.Subscription
|
||||
import java.math.BigInteger
|
||||
import java.nio.file.Path
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
@ -35,7 +38,8 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
@ThreadSafe
|
||||
class ArtemisMessagingServer(directory: Path,
|
||||
config: NodeConfiguration,
|
||||
val myHostPort: HostAndPort) : ArtemisMessagingComponent(directory, config) {
|
||||
val myHostPort: HostAndPort,
|
||||
val networkMapCache: NetworkMapCache) : ArtemisMessagingComponent(directory, config) {
|
||||
companion object {
|
||||
val log = loggerFor<ArtemisMessagingServer>()
|
||||
}
|
||||
@ -44,22 +48,73 @@ class ArtemisMessagingServer(directory: Path,
|
||||
var running = false
|
||||
}
|
||||
|
||||
val myAddress: SingleMessageRecipient = Address(myHostPort)
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
private lateinit var activeMQServer: ActiveMQServer
|
||||
private var networkChangeHandle: Subscription? = null
|
||||
|
||||
fun start() = mutex.locked {
|
||||
if (!running) {
|
||||
configureAndStartServer()
|
||||
networkChangeHandle = networkMapCache.changed.subscribe { onNetworkChange(it) }
|
||||
running = true
|
||||
}
|
||||
}
|
||||
|
||||
fun stop() = mutex.locked {
|
||||
networkChangeHandle?.unsubscribe()
|
||||
networkChangeHandle = null
|
||||
activeMQServer.stop()
|
||||
running = false
|
||||
}
|
||||
|
||||
fun bridgeToNetworkMapService(networkMapService: NodeInfo?) {
|
||||
if ((networkMapService != null) && (networkMapService.address is NetworkMapAddress)) {
|
||||
val query = activeMQServer.queueQuery(NETWORK_MAP_ADDRESS)
|
||||
if (!query.isExists) {
|
||||
activeMQServer.createQueue(NETWORK_MAP_ADDRESS, NETWORK_MAP_ADDRESS, null, true, false)
|
||||
}
|
||||
|
||||
maybeDeployBridgeForAddress(NETWORK_MAP_ADDRESS, networkMapService)
|
||||
}
|
||||
}
|
||||
|
||||
private fun onNetworkChange(change: NetworkMapCache.MapChange) {
|
||||
val address = change.node.address
|
||||
if (address is ArtemisMessagingComponent.ArtemisAddress) {
|
||||
val queueName = address.queueName
|
||||
when (change.type) {
|
||||
NetworkMapCache.MapChangeType.Added -> {
|
||||
val query = activeMQServer.queueQuery(queueName)
|
||||
if (query.isExists) {
|
||||
// Queue exists so now wire up bridge
|
||||
maybeDeployBridgeForAddress(queueName, change.node)
|
||||
}
|
||||
}
|
||||
|
||||
NetworkMapCache.MapChangeType.Modified -> {
|
||||
(change.prevNodeInfo?.address as? ArtemisMessagingComponent.ArtemisAddress)?.let {
|
||||
// remove any previous possibly different bridge
|
||||
maybeDestroyBridge(it.queueName)
|
||||
}
|
||||
val query = activeMQServer.queueQuery(queueName)
|
||||
if (query.isExists) {
|
||||
// Deploy new bridge
|
||||
maybeDeployBridgeForAddress(queueName, change.node)
|
||||
}
|
||||
}
|
||||
|
||||
NetworkMapCache.MapChangeType.Removed -> {
|
||||
(change.prevNodeInfo?.address as? ArtemisMessagingComponent.ArtemisAddress)?.let {
|
||||
// Remove old bridge
|
||||
maybeDestroyBridge(it.queueName)
|
||||
}
|
||||
// just in case of NetworkMapCache version issues
|
||||
maybeDestroyBridge(queueName)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun configureAndStartServer() {
|
||||
val config = createArtemisConfig(directory, myHostPort).apply {
|
||||
securityRoles = mapOf(
|
||||
@ -74,8 +129,16 @@ class ArtemisMessagingServer(directory: Path,
|
||||
registerActivationFailureListener { exception -> throw exception }
|
||||
// Deploy bridge for a newly created queue
|
||||
registerPostQueueCreationCallback { queueName ->
|
||||
log.trace("Queue created: $queueName")
|
||||
maybeDeployBridgeForAddress(queueName.toString())
|
||||
log.info("Queue created: $queueName")
|
||||
if (queueName != NETWORK_MAP_ADDRESS) {
|
||||
val identity = tryParseKeyFromQueueName(queueName)
|
||||
if (identity != null) {
|
||||
val nodeInfo = networkMapCache.getNodeByPublicKey(identity)
|
||||
if (nodeInfo != null) {
|
||||
maybeDeployBridgeForAddress(queueName, nodeInfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
activeMQServer.start()
|
||||
@ -102,35 +165,53 @@ class ArtemisMessagingServer(directory: Path,
|
||||
return ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, securityConfig)
|
||||
}
|
||||
|
||||
fun connectorExists(hostAndPort: HostAndPort) = hostAndPort.toString() in activeMQServer.configuration.connectorConfigurations
|
||||
|
||||
fun addConnector(hostAndPort: HostAndPort) = activeMQServer.configuration.addConnectorConfiguration(
|
||||
hostAndPort.toString(),
|
||||
tcpTransport(
|
||||
ConnectionDirection.OUTBOUND,
|
||||
hostAndPort.hostText,
|
||||
hostAndPort.port
|
||||
)
|
||||
)
|
||||
|
||||
fun bridgeExists(name: SimpleString) = activeMQServer.clusterManager.bridges.containsKey(name.toString())
|
||||
|
||||
fun deployBridge(hostAndPort: HostAndPort, name: SimpleString) = activeMQServer.deployBridge(BridgeConfiguration().apply {
|
||||
val nameStr = name.toString()
|
||||
setName(nameStr)
|
||||
queueName = nameStr
|
||||
forwardingAddress = nameStr
|
||||
staticConnectors = listOf(hostAndPort.toString())
|
||||
confirmationWindowSize = 100000 // a guess
|
||||
})
|
||||
|
||||
/**
|
||||
* For every queue created we need to have a bridge deployed in case the address of the queue
|
||||
* is that of a remote party
|
||||
*/
|
||||
private fun maybeDeployBridgeForAddress(name: String) {
|
||||
val hostAndPort = HostAndPort.fromString(name)
|
||||
private fun maybeDeployBridgeForAddress(name: SimpleString, nodeInfo: NodeInfo) {
|
||||
val hostAndPort = toHostAndPort(nodeInfo.address)
|
||||
|
||||
fun connectorExists() = name in activeMQServer.configuration.connectorConfigurations
|
||||
if (hostAndPort == myHostPort) {
|
||||
return
|
||||
}
|
||||
|
||||
fun addConnector() = activeMQServer.configuration.addConnectorConfiguration(
|
||||
name,
|
||||
tcpTransport(
|
||||
ConnectionDirection.OUTBOUND,
|
||||
hostAndPort.hostText,
|
||||
hostAndPort.port
|
||||
)
|
||||
)
|
||||
if (!connectorExists(hostAndPort)) {
|
||||
log.info("add connector $hostAndPort")
|
||||
addConnector(hostAndPort)
|
||||
}
|
||||
|
||||
fun deployBridge() = activeMQServer.deployBridge(BridgeConfiguration().apply {
|
||||
setName(name)
|
||||
queueName = name
|
||||
forwardingAddress = name
|
||||
staticConnectors = listOf(name)
|
||||
confirmationWindowSize = 100000 // a guess
|
||||
})
|
||||
if (!bridgeExists(name)) {
|
||||
log.info("add bridge $hostAndPort $name")
|
||||
deployBridge(hostAndPort, name)
|
||||
}
|
||||
}
|
||||
|
||||
if (!connectorExists()) {
|
||||
addConnector()
|
||||
deployBridge()
|
||||
private fun maybeDestroyBridge(name: SimpleString) {
|
||||
if (bridgeExists(name)) {
|
||||
activeMQServer.destroyBridge(name.toString())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -98,13 +98,17 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
|
||||
}
|
||||
|
||||
override fun addNode(node: NodeInfo) {
|
||||
registeredNodes[node.identity] = node
|
||||
_changed.onNext(MapChange(node, MapChangeType.Added))
|
||||
val oldValue = registeredNodes.put(node.identity, node)
|
||||
if (oldValue == null) {
|
||||
_changed.onNext(MapChange(node, oldValue, MapChangeType.Added))
|
||||
} else if(oldValue != node) {
|
||||
_changed.onNext(MapChange(node, oldValue, MapChangeType.Modified))
|
||||
}
|
||||
}
|
||||
|
||||
override fun removeNode(node: NodeInfo) {
|
||||
registeredNodes.remove(node.identity)
|
||||
_changed.onNext(MapChange(node, MapChangeType.Removed))
|
||||
val oldValue = registeredNodes.remove(node.identity)
|
||||
_changed.onNext(MapChange(node, oldValue, MapChangeType.Removed))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -4,7 +4,6 @@ import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.NetworkMapCache
|
||||
import com.r3corda.node.services.api.RegulatorService
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingComponent
|
||||
import com.r3corda.node.services.transactions.NotaryService
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
import org.junit.Test
|
||||
|
||||
@ -12,7 +11,7 @@ import org.junit.Test
|
||||
class DriverTests {
|
||||
companion object {
|
||||
fun nodeMustBeUp(networkMapCache: NetworkMapCache, nodeInfo: NodeInfo, nodeName: String) {
|
||||
val address = nodeInfo.address as ArtemisMessagingComponent.Address
|
||||
val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address)
|
||||
// Check that the node is registered in the network map
|
||||
poll("network map cache for $nodeName") {
|
||||
networkMapCache.get().firstOrNull {
|
||||
@ -20,13 +19,13 @@ class DriverTests {
|
||||
}
|
||||
}
|
||||
// Check that the port is bound
|
||||
addressMustBeBound(address.hostAndPort)
|
||||
addressMustBeBound(hostAndPort)
|
||||
}
|
||||
|
||||
fun nodeMustBeDown(nodeInfo: NodeInfo) {
|
||||
val address = nodeInfo.address as ArtemisMessagingComponent.Address
|
||||
val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address)
|
||||
// Check that the port is bound
|
||||
addressMustNotBeBound(address.hostAndPort)
|
||||
addressMustNotBeBound(hostAndPort)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,12 +1,14 @@
|
||||
package com.r3corda.node.services
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.r3corda.core.crypto.generateKeyPair
|
||||
import com.r3corda.core.messaging.Message
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.testing.freeLocalHostAndPort
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingClient
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingServer
|
||||
import com.r3corda.node.services.network.InMemoryNetworkMapCache
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.After
|
||||
@ -25,6 +27,7 @@ class ArtemisMessagingTests {
|
||||
|
||||
val hostAndPort = freeLocalHostAndPort()
|
||||
val topic = "platform.self"
|
||||
val identity = generateKeyPair()
|
||||
val config = object : NodeConfiguration {
|
||||
override val myLegalName: String = "me"
|
||||
override val exportJMXto: String = ""
|
||||
@ -36,6 +39,8 @@ class ArtemisMessagingTests {
|
||||
var messagingClient: ArtemisMessagingClient? = null
|
||||
var messagingServer: ArtemisMessagingServer? = null
|
||||
|
||||
val networkMapCache = InMemoryNetworkMapCache()
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
messagingClient?.stop()
|
||||
@ -98,16 +103,15 @@ class ArtemisMessagingTests {
|
||||
assertNull(receivedMessages.poll(200, MILLISECONDS))
|
||||
}
|
||||
|
||||
private fun createMessagingClient(server: HostAndPort = hostAndPort,
|
||||
local: HostAndPort = hostAndPort): ArtemisMessagingClient {
|
||||
return ArtemisMessagingClient(temporaryFolder.newFolder().toPath(), config, server, local, AffinityExecutor.SAME_THREAD).apply {
|
||||
private fun createMessagingClient(server: HostAndPort = hostAndPort): ArtemisMessagingClient {
|
||||
return ArtemisMessagingClient(temporaryFolder.newFolder().toPath(), config, server, identity.public, AffinityExecutor.SAME_THREAD).apply {
|
||||
configureWithDevSSLCertificate()
|
||||
messagingClient = this
|
||||
}
|
||||
}
|
||||
|
||||
private fun createMessagingServer(local: HostAndPort = hostAndPort): ArtemisMessagingServer {
|
||||
return ArtemisMessagingServer(temporaryFolder.newFolder().toPath(), config, local).apply {
|
||||
return ArtemisMessagingServer(temporaryFolder.newFolder().toPath(), config, local, networkMapCache).apply {
|
||||
configureWithDevSSLCertificate()
|
||||
messagingServer = this
|
||||
}
|
||||
|
@ -21,4 +21,4 @@ fi
|
||||
# Upload the rates to the buyer node
|
||||
curl -F rates=@scripts/example.rates.txt http://localhost:31338/upload/interest-rates
|
||||
|
||||
$bin --network-address=localhost:31300 --directory=build/trader-demo/rates-fix --network-map=localhost:31337 --network-map-identity-file=build/trader-demo/buyer/identity-public --oracle=localhost --oracle-identity-file=build/trader-demo/buyer/identity-public
|
||||
$bin --network-address=localhost:31300 --directory=build/trader-demo/rates-fix --network-map=localhost:31337 --network-map-identity-file=build/trader-demo/buyer/identity-public
|
@ -374,7 +374,7 @@ private fun runTrade(cliParams: CliParams.Trade): Int {
|
||||
|
||||
private fun createRecipient(addr: String): SingleMessageRecipient {
|
||||
val hostAndPort = HostAndPort.fromString(addr).withDefaultPort(Node.DEFAULT_PORT)
|
||||
return ArtemisMessagingClient.makeRecipient(hostAndPort)
|
||||
return ArtemisMessagingClient.makeNetworkMapAddress(hostAndPort)
|
||||
}
|
||||
|
||||
private fun startNode(params: CliParams.RunNode, networkMap: SingleMessageRecipient): Node {
|
||||
|
@ -4,10 +4,10 @@ import com.google.common.net.HostAndPort
|
||||
import com.r3corda.contracts.asset.Cash
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.hours
|
||||
import com.r3corda.core.logElapsedTime
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.node.services.testing.makeTestDataSourceProperties
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.core.utilities.Emoji
|
||||
import com.r3corda.core.utilities.LogHelper
|
||||
@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory
|
||||
import java.math.BigDecimal
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Paths
|
||||
import java.util.*
|
||||
import kotlin.system.exitProcess
|
||||
|
||||
private val log: Logger = LoggerFactory.getLogger("RatesFixDemo")
|
||||
@ -38,8 +39,6 @@ fun main(args: Array<String>) {
|
||||
val dirArg = parser.accepts("directory").withRequiredArg().defaultsTo("rate-fix-demo-data")
|
||||
val networkMapAddrArg = parser.accepts("network-map").withRequiredArg().required()
|
||||
val networkMapIdentityArg = parser.accepts("network-map-identity-file").withRequiredArg().required()
|
||||
val oracleAddrArg = parser.accepts("oracle").withRequiredArg().required()
|
||||
val oracleIdentityArg = parser.accepts("oracle-identity-file").withRequiredArg().required()
|
||||
|
||||
val fixOfArg = parser.accepts("fix-of").withRequiredArg().defaultsTo("ICE LIBOR 2016-03-16 1M")
|
||||
val expectedRateArg = parser.accepts("expected-rate").withRequiredArg().defaultsTo("0.67")
|
||||
@ -56,28 +55,24 @@ fun main(args: Array<String>) {
|
||||
LogHelper.setLevel("+RatesFixDemo", "-org.apache.activemq")
|
||||
|
||||
val dir = Paths.get(options.valueOf(dirArg))
|
||||
val networkMapAddr = ArtemisMessagingClient.makeRecipient(options.valueOf(networkMapAddrArg))
|
||||
val networkMapAddr = ArtemisMessagingClient.makeNetworkMapAddress(HostAndPort.fromString(options.valueOf(networkMapAddrArg)))
|
||||
val networkMapIdentity = Files.readAllBytes(Paths.get(options.valueOf(networkMapIdentityArg))).deserialize<Party>()
|
||||
val networkMapAddress = NodeInfo(networkMapAddr, networkMapIdentity, setOf(NetworkMapService.Type, NotaryService.Type))
|
||||
|
||||
// Load oracle stuff (in lieu of having a network map service)
|
||||
val oracleAddr = ArtemisMessagingClient.makeRecipient(options.valueOf(oracleAddrArg))
|
||||
val oracleIdentity = Files.readAllBytes(Paths.get(options.valueOf(oracleIdentityArg))).deserialize<Party>()
|
||||
val oracleNode = NodeInfo(oracleAddr, oracleIdentity)
|
||||
|
||||
val fixOf: FixOf = NodeInterestRates.parseFixOf(options.valueOf(fixOfArg))
|
||||
val expectedRate = BigDecimal(options.valueOf(expectedRateArg))
|
||||
val rateTolerance = BigDecimal(options.valueOf(rateToleranceArg))
|
||||
|
||||
// Bring up node.
|
||||
val advertisedServices: Set<ServiceType> = emptySet()
|
||||
val myNetAddr = ArtemisMessagingClient.toHostAndPort(options.valueOf(networkAddressArg))
|
||||
val myNetAddr = HostAndPort.fromString(options.valueOf(networkAddressArg))
|
||||
val config = object : NodeConfiguration {
|
||||
override val myLegalName: String = "Rate fix demo node"
|
||||
override val exportJMXto: String = "http"
|
||||
override val nearestCity: String = "Atlantis"
|
||||
override val keyStorePassword: String = "cordacadevpass"
|
||||
override val trustStorePassword: String = "trustpass"
|
||||
override val dataSourceProperties: Properties = makeTestDataSourceProperties()
|
||||
}
|
||||
|
||||
val apiAddr = HostAndPort.fromParts(myNetAddr.hostText, myNetAddr.port + 1)
|
||||
@ -86,11 +81,12 @@ fun main(args: Array<String>) {
|
||||
advertisedServices, DemoClock()).setup().start() }
|
||||
node.networkMapRegistrationFuture.get()
|
||||
val notaryNode = node.services.networkMapCache.notaryNodes[0]
|
||||
val rateOracle = node.services.networkMapCache.ratesOracleNodes[0]
|
||||
|
||||
// Make a garbage transaction that includes a rate fix.
|
||||
val tx = TransactionType.General.Builder(notaryNode.identity)
|
||||
tx.addOutputState(TransactionState(Cash.State(1500.DOLLARS `issued by` node.storage.myLegalIdentity.ref(1), node.keyManagement.freshKey().public), notaryNode.identity))
|
||||
val protocol = RatesFixProtocol(tx, oracleNode.identity, fixOf, expectedRate, rateTolerance)
|
||||
val protocol = RatesFixProtocol(tx, rateOracle.identity, fixOf, expectedRate, rateTolerance)
|
||||
node.smm.add("demo.ratefix", protocol).get()
|
||||
node.stop()
|
||||
|
||||
|
@ -140,7 +140,7 @@ fun main(args: Array<String>) {
|
||||
val path = Paths.get(baseDirectory, Role.BUYER.name.toLowerCase(), "identity-public")
|
||||
val party = Files.readAllBytes(path).deserialize<Party>()
|
||||
advertisedServices = emptySet()
|
||||
NodeInfo(ArtemisMessagingClient.makeRecipient(theirNetAddr), party, setOf(NetworkMapService.Type))
|
||||
NodeInfo(ArtemisMessagingClient.makeNetworkMapAddress(theirNetAddr), party, setOf(NetworkMapService.Type))
|
||||
}
|
||||
|
||||
// And now construct then start the node object. It takes a little while.
|
||||
|
Loading…
x
Reference in New Issue
Block a user