mirror of
https://github.com/corda/corda.git
synced 2025-04-07 19:34:41 +00:00
Merged in mnesbit-cor-356-artemis-identity-as-address (pull request #301)
Only NetworkMapServer addresses can be publicly manufactured. Use identity public key as addressing, with only bridges using the HostAndPort information
This commit is contained in:
commit
4ac71bb97b
@ -20,8 +20,8 @@ interface NetworkMapCache {
|
||||
val logger = LoggerFactory.getLogger(NetworkMapCache::class.java)
|
||||
}
|
||||
|
||||
enum class MapChangeType { Added, Removed }
|
||||
data class MapChange(val node: NodeInfo, val type: MapChangeType )
|
||||
enum class MapChangeType { Added, Removed, Modified }
|
||||
data class MapChange(val node: NodeInfo, val prevNodeInfo: NodeInfo?, val type: MapChangeType )
|
||||
|
||||
/** A list of nodes that advertise a network map service */
|
||||
val networkMapNodes: List<NodeInfo>
|
||||
|
@ -63,9 +63,8 @@ interface DriverDSLExposedInterface {
|
||||
*
|
||||
* @param providedName name of the client, which will be used for creating its directory.
|
||||
* @param serverAddress the artemis server to connect to, for example a [Node].
|
||||
* @param clientAddress the address of the client (this is not bound by the client!), defaults to [serverAddress] if null.
|
||||
*/
|
||||
fun startClient(providedName: String, serverAddress: HostAndPort, clientAddress: HostAndPort?): Future<ArtemisMessagingClient>
|
||||
fun startClient(providedName: String, serverAddress: HostAndPort): Future<ArtemisMessagingClient>
|
||||
/**
|
||||
* Starts a local [ArtemisMessagingServer] of which there may only be one.
|
||||
*/
|
||||
@ -75,13 +74,12 @@ interface DriverDSLExposedInterface {
|
||||
}
|
||||
|
||||
fun DriverDSLExposedInterface.startClient(localServer: ArtemisMessagingServer) =
|
||||
startClient("driver-local-server-client", localServer.myHostPort, localServer.myHostPort)
|
||||
startClient("driver-local-server-client", localServer.myHostPort)
|
||||
|
||||
fun DriverDSLExposedInterface.startClient(remoteNodeInfo: NodeInfo, providedName: String? = null) =
|
||||
startClient(
|
||||
providedName = providedName ?: "${remoteNodeInfo.identity.name}-client",
|
||||
serverAddress = (remoteNodeInfo.address as ArtemisMessagingComponent.Address).hostAndPort,
|
||||
clientAddress = null
|
||||
serverAddress = ArtemisMessagingComponent.toHostAndPort(remoteNodeInfo.address)
|
||||
)
|
||||
|
||||
interface DriverDSLInternalInterface : DriverDSLExposedInterface {
|
||||
@ -224,6 +222,7 @@ class DriverDSL(
|
||||
private val networkMapName = "NetworkMapService"
|
||||
private val networkMapAddress = portAllocation.nextHostAndPort()
|
||||
private var networkMapNodeInfo: NodeInfo? = null
|
||||
private val identity = generateKeyPair()
|
||||
|
||||
class State {
|
||||
val registeredProcesses = LinkedList<Process>()
|
||||
@ -322,8 +321,7 @@ class DriverDSL(
|
||||
|
||||
override fun startClient(
|
||||
providedName: String,
|
||||
serverAddress: HostAndPort,
|
||||
clientAddress: HostAndPort?
|
||||
serverAddress: HostAndPort
|
||||
): Future<ArtemisMessagingClient> {
|
||||
|
||||
val nodeConfiguration = NodeConfigurationFromConfig(
|
||||
@ -339,8 +337,9 @@ class DriverDSL(
|
||||
Paths.get(baseDirectory, providedName),
|
||||
nodeConfiguration,
|
||||
serverHostPort = serverAddress,
|
||||
myHostPort = clientAddress ?: serverAddress,
|
||||
executor = AffinityExecutor.ServiceAffinityExecutor(providedName, 1)
|
||||
myIdentity = identity.public,
|
||||
executor = AffinityExecutor.ServiceAffinityExecutor(providedName, 1),
|
||||
persistentInbox = false // Do not create a permanent queue for our transient UI identity
|
||||
)
|
||||
|
||||
return Executors.newSingleThreadExecutor().submit(Callable<ArtemisMessagingClient> {
|
||||
@ -368,7 +367,8 @@ class DriverDSL(
|
||||
val server = ArtemisMessagingServer(
|
||||
Paths.get(baseDirectory, name),
|
||||
config,
|
||||
portAllocation.nextHostAndPort()
|
||||
portAllocation.nextHostAndPort(),
|
||||
networkMapCache
|
||||
)
|
||||
return Executors.newSingleThreadExecutor().submit(Callable<ArtemisMessagingServer> {
|
||||
server.configureWithDevSSLCertificate()
|
||||
@ -383,11 +383,11 @@ class DriverDSL(
|
||||
|
||||
override fun start() {
|
||||
startNetworkMapService()
|
||||
val networkMapClient = startClient("driver-$networkMapName-client", networkMapAddress, portAllocation.nextHostAndPort()).get()
|
||||
val networkMapClient = startClient("driver-$networkMapName-client", networkMapAddress).get()
|
||||
// We fake the network map's NodeInfo with a random public key in order to retrieve the correct NodeInfo from
|
||||
// the network map service itself.
|
||||
val fakeNodeInfo = NodeInfo(
|
||||
address = ArtemisMessagingClient.makeRecipient(networkMapAddress),
|
||||
address = ArtemisMessagingClient.makeNetworkMapAddress(networkMapAddress),
|
||||
identity = Party(
|
||||
name = networkMapName,
|
||||
owningKey = generateKeyPair().public
|
||||
|
@ -35,7 +35,7 @@ class NodeRunner {
|
||||
val networkMapNodeInfo =
|
||||
if (networkMapName != null && networkMapPublicKey != null && networkMapAddress != null) {
|
||||
NodeInfo(
|
||||
address = ArtemisMessagingClient.makeRecipient(networkMapAddress),
|
||||
address = ArtemisMessagingClient.makeNetworkMapAddress(networkMapAddress),
|
||||
identity = Party(
|
||||
name = networkMapName,
|
||||
owningKey = networkMapPublicKey
|
||||
|
@ -164,8 +164,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
val storageServices = initialiseStorageService(dir)
|
||||
storage = storageServices.first
|
||||
checkpointStorage = storageServices.second
|
||||
net = makeMessagingService()
|
||||
netMapCache = InMemoryNetworkMapCache()
|
||||
net = makeMessagingService()
|
||||
wallet = makeWalletService()
|
||||
|
||||
identity = makeIdentityService()
|
||||
|
@ -112,11 +112,16 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
|
||||
|
||||
override fun makeMessagingService(): MessagingServiceInternal {
|
||||
val serverAddr = messagingServerAddr ?: {
|
||||
messageBroker = ArtemisMessagingServer(dir, configuration, p2pAddr)
|
||||
messageBroker = ArtemisMessagingServer(dir, configuration, p2pAddr, services.networkMapCache)
|
||||
p2pAddr
|
||||
}()
|
||||
|
||||
return ArtemisMessagingClient(dir, configuration, serverAddr, p2pAddr, serverThread)
|
||||
if (networkMapService != null) {
|
||||
return ArtemisMessagingClient(dir, configuration, serverAddr, services.storageService.myLegalIdentityKey.public, serverThread)
|
||||
}
|
||||
else
|
||||
{
|
||||
return ArtemisMessagingClient(dir, configuration, serverAddr, null, serverThread)
|
||||
}
|
||||
}
|
||||
|
||||
override fun startMessagingService() {
|
||||
@ -124,6 +129,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
|
||||
messageBroker?.apply {
|
||||
configureWithDevSSLCertificate() // TODO: Create proper certificate provisioning process
|
||||
start()
|
||||
bridgeToNetworkMapService(networkMapService)
|
||||
}
|
||||
|
||||
// Start up the MQ client.
|
||||
|
@ -112,7 +112,7 @@ class FullNodeConfiguration(conf: Config) : NodeConfiguration {
|
||||
val clock: Clock = NodeClock()
|
||||
|
||||
fun createNode(): Node {
|
||||
val networkMapTarget = ArtemisMessagingClient.makeRecipient(mapService.address)
|
||||
val networkMapTarget = ArtemisMessagingClient.makeNetworkMapAddress(mapService.address)
|
||||
val advertisedServices = mutableSetOf<ServiceType>()
|
||||
if (mapService.hostServiceLocally) advertisedServices.add(NetworkMapService.Type)
|
||||
if (hostNotaryServiceLocally) advertisedServices.add(SimpleNotaryService.Type)
|
||||
|
@ -5,7 +5,6 @@ import com.r3corda.core.ThreadBox
|
||||
import com.r3corda.core.messaging.*
|
||||
import com.r3corda.core.serialization.opaque
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.node.internal.Node
|
||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
@ -14,8 +13,8 @@ import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import java.nio.file.FileSystems
|
||||
import java.nio.file.Path
|
||||
import java.security.PublicKey
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.Executor
|
||||
@ -31,14 +30,19 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
* through into Artemis and from there, back through to senders.
|
||||
*
|
||||
* @param serverHostPort The address of the broker instance to connect to (might be running in the same process)
|
||||
* @param myHostPort What host and port to use as an address for incoming messages
|
||||
* @param myIdentity Either the public key to be used as the ArtemisMQ address and queue name for the node globally, or null to indicate
|
||||
* that this is a NetworkMapService node which will be bound globally to the name "networkmap"
|
||||
* @param executor An executor to run received message tasks upon.
|
||||
* @param persistentInbox If true the inbox will be created persistent if not already created.
|
||||
* If false the inbox queue will be transient, which is appropriate for UI clients for example.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class ArtemisMessagingClient(directory: Path,
|
||||
config: NodeConfiguration,
|
||||
val serverHostPort: HostAndPort,
|
||||
val myHostPort: HostAndPort,
|
||||
val executor: AffinityExecutor) : ArtemisMessagingComponent(directory, config), MessagingServiceInternal {
|
||||
val myIdentity: PublicKey?,
|
||||
val executor: AffinityExecutor,
|
||||
val persistentInbox: Boolean = true) : ArtemisMessagingComponent(directory, config), MessagingServiceInternal {
|
||||
companion object {
|
||||
val log = loggerFor<ArtemisMessagingClient>()
|
||||
|
||||
@ -50,17 +54,19 @@ class ArtemisMessagingClient(directory: Path,
|
||||
|
||||
val SESSION_ID_PROPERTY = "session-id"
|
||||
|
||||
/** Temp helper until network map is established. */
|
||||
fun makeRecipient(hostAndPort: HostAndPort): SingleMessageRecipient = Address(hostAndPort)
|
||||
|
||||
fun makeRecipient(hostname: String) = makeRecipient(toHostAndPort(hostname))
|
||||
fun toHostAndPort(hostname: String) = HostAndPort.fromString(hostname).withDefaultPort(Node.DEFAULT_PORT)
|
||||
/**
|
||||
* This should be the only way to generate an ArtemisAddress and that only of the remote NetworkMapService node.
|
||||
* All other addresses come from the NetworkMapCache, or myAddress below.
|
||||
* The node will populate with their own identity based address when they register with the NetworkMapService.
|
||||
*/
|
||||
fun makeNetworkMapAddress(hostAndPort: HostAndPort): SingleMessageRecipient = NetworkMapAddress(hostAndPort)
|
||||
}
|
||||
|
||||
private class InnerState {
|
||||
var started = false
|
||||
var running = false
|
||||
val producers = HashMap<Address, ClientProducer>()
|
||||
val knownQueues = mutableSetOf<SimpleString>()
|
||||
var producer: ClientProducer? = null
|
||||
var consumer: ClientConsumer? = null
|
||||
var session: ClientSession? = null
|
||||
var clientFactory: ClientSessionFactory? = null
|
||||
@ -71,7 +77,10 @@ class ArtemisMessagingClient(directory: Path,
|
||||
val topicSession: TopicSession,
|
||||
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
|
||||
|
||||
override val myAddress: SingleMessageRecipient = Address(myHostPort)
|
||||
/**
|
||||
* Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache.
|
||||
*/
|
||||
override val myAddress: SingleMessageRecipient = if (myIdentity != null) NodeAddress(myIdentity, serverHostPort) else NetworkMapAddress(serverHostPort)
|
||||
|
||||
private val state = ThreadBox(InnerState())
|
||||
private val handlers = CopyOnWriteArrayList<Handler>()
|
||||
@ -94,14 +103,20 @@ class ArtemisMessagingClient(directory: Path,
|
||||
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport)
|
||||
clientFactory = locator.createSessionFactory()
|
||||
|
||||
// Create a queue on which to receive messages and set up the handler.
|
||||
val session = clientFactory!!.createSession()
|
||||
// Create a session and configure to commit manually after each acknowledge. (N.B. ackBatchSize is in Bytes!!!)
|
||||
val session = clientFactory!!.createSession(true, true, 1)
|
||||
this.session = session
|
||||
|
||||
val address = myHostPort.toString()
|
||||
val queueName = myHostPort.toString()
|
||||
session.createQueue(address, queueName, false)
|
||||
// Create a queue on which to receive messages and set up the handler.
|
||||
val queueName = toQueueName(myAddress)
|
||||
val query = session.queueQuery(queueName)
|
||||
if (!query.isExists) {
|
||||
session.createQueue(queueName, queueName, persistentInbox)
|
||||
}
|
||||
knownQueues.add(queueName)
|
||||
consumer = session.createConsumer(queueName)
|
||||
producer = session.createProducer()
|
||||
|
||||
session.start()
|
||||
}
|
||||
}
|
||||
@ -166,6 +181,7 @@ class ArtemisMessagingClient(directory: Path,
|
||||
}
|
||||
val topic = message.getStringProperty(TOPIC_PROPERTY)
|
||||
val sessionID = message.getLongProperty(SESSION_ID_PROPERTY)
|
||||
log.info("received message from: ${message.address} topic: $topic sessionID: $sessionID")
|
||||
|
||||
val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
|
||||
|
||||
@ -243,9 +259,10 @@ class ArtemisMessagingClient(directory: Path,
|
||||
shutdownLatch.await()
|
||||
}
|
||||
state.locked {
|
||||
for (producer in producers.values) producer.close()
|
||||
producers.clear()
|
||||
|
||||
producer?.close()
|
||||
producer = null
|
||||
// Ensure any trailing messages are committed to the journal
|
||||
session!!.commit()
|
||||
// Closing the factory closes all the sessions it produced as well.
|
||||
clientFactory!!.close()
|
||||
clientFactory = null
|
||||
@ -253,9 +270,7 @@ class ArtemisMessagingClient(directory: Path,
|
||||
}
|
||||
|
||||
override fun send(message: Message, target: MessageRecipients) {
|
||||
if (target !is Address)
|
||||
TODO("Only simple sends to single recipients are currently implemented")
|
||||
|
||||
val queueName = toQueueName(target)
|
||||
state.locked {
|
||||
val artemisMessage = session!!.createMessage(true).apply {
|
||||
val sessionID = message.topicSession.sessionID
|
||||
@ -264,21 +279,20 @@ class ArtemisMessagingClient(directory: Path,
|
||||
writeBodyBufferBytes(message.data)
|
||||
}
|
||||
|
||||
val producer = producers.getOrPut(target) {
|
||||
if (target != myAddress)
|
||||
maybeCreateQueue(target.hostAndPort)
|
||||
session!!.createProducer(target.hostAndPort.toString())
|
||||
if (knownQueues.add(queueName)) {
|
||||
maybeCreateQueue(queueName)
|
||||
}
|
||||
producer.send(artemisMessage)
|
||||
log.info("send to: $queueName topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID}")
|
||||
producer!!.send(queueName, artemisMessage)
|
||||
}
|
||||
}
|
||||
|
||||
private fun maybeCreateQueue(hostAndPort: HostAndPort) {
|
||||
private fun maybeCreateQueue(queueName: SimpleString) {
|
||||
state.alreadyLocked {
|
||||
val name = hostAndPort.toString()
|
||||
val queueQuery = session!!.queueQuery(SimpleString(name))
|
||||
val queueQuery = session!!.queueQuery(queueName)
|
||||
if (!queueQuery.isExists) {
|
||||
session!!.createQueue(name, name, true /* durable */)
|
||||
log.info("create client queue $queueName")
|
||||
session!!.createQueue(queueName, queueName, true /* durable */)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2,15 +2,20 @@ package com.r3corda.node.services.messaging
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.r3corda.core.crypto.X509Utilities
|
||||
import com.r3corda.core.crypto.parsePublicKeyBase58
|
||||
import com.r3corda.core.crypto.toBase58String
|
||||
import com.r3corda.core.messaging.MessageRecipients
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.security.PublicKey
|
||||
|
||||
/**
|
||||
* The base class for Artemis services that defines shared data structures and transport configuration
|
||||
@ -22,8 +27,69 @@ abstract class ArtemisMessagingComponent(val directory: Path, val config: NodeCo
|
||||
private val keyStorePath = directory.resolve("certificates").resolve("sslkeystore.jks")
|
||||
private val trustStorePath = directory.resolve("certificates").resolve("truststore.jks")
|
||||
|
||||
|
||||
companion object {
|
||||
const val PEERS_PREFIX = "peers."
|
||||
|
||||
@JvmStatic
|
||||
protected val NETWORK_MAP_ADDRESS = SimpleString(PEERS_PREFIX +"networkmap")
|
||||
|
||||
/**
|
||||
* Assuming the passed in target address is actually an ArtemisAddress will extract the host and port of the node. This should
|
||||
* only be used in unit tests and the internals of the messaging services to keep addressing opaque for the future.
|
||||
* N.B. Marked as JvmStatic to allow use in the inherited classes.
|
||||
*/
|
||||
@JvmStatic
|
||||
internal fun toHostAndPort(target: MessageRecipients): HostAndPort {
|
||||
val addr = target as? ArtemisMessagingComponent.ArtemisAddress ?: throw IllegalArgumentException("Not an Artemis address")
|
||||
return addr.hostAndPort
|
||||
}
|
||||
|
||||
/**
|
||||
* Assuming the passed in target address is actually an ArtemisAddress will extract the queue name used.
|
||||
* For now the queue name is the Base58 version of the node's identity.
|
||||
* This should only be used in the internals of the messaging services to keep addressing opaque for the future.
|
||||
* N.B. Marked as JvmStatic to allow use in the inherited classes.
|
||||
*/
|
||||
@JvmStatic
|
||||
protected fun toQueueName(target: MessageRecipients): SimpleString {
|
||||
val addr = target as? ArtemisMessagingComponent.ArtemisAddress ?: throw IllegalArgumentException("Not an Artemis address")
|
||||
return addr.queueName
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
protected interface ArtemisAddress {
|
||||
val queueName: SimpleString
|
||||
val hostAndPort: HostAndPort
|
||||
}
|
||||
|
||||
protected data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress {
|
||||
override val queueName: SimpleString = NETWORK_MAP_ADDRESS
|
||||
}
|
||||
|
||||
// In future: can contain onion routing info, etc.
|
||||
data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient
|
||||
protected data class NodeAddress(val identity: PublicKey, override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress {
|
||||
override val queueName: SimpleString by lazy { SimpleString(PEERS_PREFIX+identity.toBase58String()) }
|
||||
|
||||
override fun toString(): String {
|
||||
return "NodeAddress(identity = $queueName, $hostAndPort"
|
||||
}
|
||||
}
|
||||
|
||||
protected fun tryParseKeyFromQueueName(queueName: SimpleString): PublicKey? {
|
||||
val name = queueName.toString()
|
||||
if(!name.startsWith(PEERS_PREFIX)) {
|
||||
return null
|
||||
}
|
||||
val keyCode = name.substring(PEERS_PREFIX.length)
|
||||
return try {
|
||||
parsePublicKeyBase58(keyCode)
|
||||
} catch (ex: Exception) {
|
||||
null
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected enum class ConnectionDirection { INBOUND, OUTBOUND }
|
||||
|
||||
|
@ -3,9 +3,11 @@ package com.r3corda.node.services.messaging
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.r3corda.core.ThreadBox
|
||||
import com.r3corda.core.crypto.newSecureRandom
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.NetworkMapCache
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.core.config.BridgeConfiguration
|
||||
import org.apache.activemq.artemis.core.config.Configuration
|
||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
|
||||
@ -15,6 +17,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
|
||||
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule
|
||||
import rx.Subscription
|
||||
import java.math.BigInteger
|
||||
import java.nio.file.Path
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
@ -35,7 +38,8 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
@ThreadSafe
|
||||
class ArtemisMessagingServer(directory: Path,
|
||||
config: NodeConfiguration,
|
||||
val myHostPort: HostAndPort) : ArtemisMessagingComponent(directory, config) {
|
||||
val myHostPort: HostAndPort,
|
||||
val networkMapCache: NetworkMapCache) : ArtemisMessagingComponent(directory, config) {
|
||||
companion object {
|
||||
val log = loggerFor<ArtemisMessagingServer>()
|
||||
}
|
||||
@ -44,22 +48,73 @@ class ArtemisMessagingServer(directory: Path,
|
||||
var running = false
|
||||
}
|
||||
|
||||
val myAddress: SingleMessageRecipient = Address(myHostPort)
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
private lateinit var activeMQServer: ActiveMQServer
|
||||
private var networkChangeHandle: Subscription? = null
|
||||
|
||||
fun start() = mutex.locked {
|
||||
if (!running) {
|
||||
configureAndStartServer()
|
||||
networkChangeHandle = networkMapCache.changed.subscribe { onNetworkChange(it) }
|
||||
running = true
|
||||
}
|
||||
}
|
||||
|
||||
fun stop() = mutex.locked {
|
||||
networkChangeHandle?.unsubscribe()
|
||||
networkChangeHandle = null
|
||||
activeMQServer.stop()
|
||||
running = false
|
||||
}
|
||||
|
||||
fun bridgeToNetworkMapService(networkMapService: NodeInfo?) {
|
||||
if ((networkMapService != null) && (networkMapService.address is NetworkMapAddress)) {
|
||||
val query = activeMQServer.queueQuery(NETWORK_MAP_ADDRESS)
|
||||
if (!query.isExists) {
|
||||
activeMQServer.createQueue(NETWORK_MAP_ADDRESS, NETWORK_MAP_ADDRESS, null, true, false)
|
||||
}
|
||||
|
||||
maybeDeployBridgeForAddress(NETWORK_MAP_ADDRESS, networkMapService)
|
||||
}
|
||||
}
|
||||
|
||||
private fun onNetworkChange(change: NetworkMapCache.MapChange) {
|
||||
val address = change.node.address
|
||||
if (address is ArtemisMessagingComponent.ArtemisAddress) {
|
||||
val queueName = address.queueName
|
||||
when (change.type) {
|
||||
NetworkMapCache.MapChangeType.Added -> {
|
||||
val query = activeMQServer.queueQuery(queueName)
|
||||
if (query.isExists) {
|
||||
// Queue exists so now wire up bridge
|
||||
maybeDeployBridgeForAddress(queueName, change.node)
|
||||
}
|
||||
}
|
||||
|
||||
NetworkMapCache.MapChangeType.Modified -> {
|
||||
(change.prevNodeInfo?.address as? ArtemisMessagingComponent.ArtemisAddress)?.let {
|
||||
// remove any previous possibly different bridge
|
||||
maybeDestroyBridge(it.queueName)
|
||||
}
|
||||
val query = activeMQServer.queueQuery(queueName)
|
||||
if (query.isExists) {
|
||||
// Deploy new bridge
|
||||
maybeDeployBridgeForAddress(queueName, change.node)
|
||||
}
|
||||
}
|
||||
|
||||
NetworkMapCache.MapChangeType.Removed -> {
|
||||
(change.prevNodeInfo?.address as? ArtemisMessagingComponent.ArtemisAddress)?.let {
|
||||
// Remove old bridge
|
||||
maybeDestroyBridge(it.queueName)
|
||||
}
|
||||
// just in case of NetworkMapCache version issues
|
||||
maybeDestroyBridge(queueName)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun configureAndStartServer() {
|
||||
val config = createArtemisConfig(directory, myHostPort).apply {
|
||||
securityRoles = mapOf(
|
||||
@ -74,8 +129,16 @@ class ArtemisMessagingServer(directory: Path,
|
||||
registerActivationFailureListener { exception -> throw exception }
|
||||
// Deploy bridge for a newly created queue
|
||||
registerPostQueueCreationCallback { queueName ->
|
||||
log.trace("Queue created: $queueName")
|
||||
maybeDeployBridgeForAddress(queueName.toString())
|
||||
log.info("Queue created: $queueName")
|
||||
if (queueName != NETWORK_MAP_ADDRESS) {
|
||||
val identity = tryParseKeyFromQueueName(queueName)
|
||||
if (identity != null) {
|
||||
val nodeInfo = networkMapCache.getNodeByPublicKey(identity)
|
||||
if (nodeInfo != null) {
|
||||
maybeDeployBridgeForAddress(queueName, nodeInfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
activeMQServer.start()
|
||||
@ -102,35 +165,53 @@ class ArtemisMessagingServer(directory: Path,
|
||||
return ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, securityConfig)
|
||||
}
|
||||
|
||||
fun connectorExists(hostAndPort: HostAndPort) = hostAndPort.toString() in activeMQServer.configuration.connectorConfigurations
|
||||
|
||||
fun addConnector(hostAndPort: HostAndPort) = activeMQServer.configuration.addConnectorConfiguration(
|
||||
hostAndPort.toString(),
|
||||
tcpTransport(
|
||||
ConnectionDirection.OUTBOUND,
|
||||
hostAndPort.hostText,
|
||||
hostAndPort.port
|
||||
)
|
||||
)
|
||||
|
||||
fun bridgeExists(name: SimpleString) = activeMQServer.clusterManager.bridges.containsKey(name.toString())
|
||||
|
||||
fun deployBridge(hostAndPort: HostAndPort, name: SimpleString) = activeMQServer.deployBridge(BridgeConfiguration().apply {
|
||||
val nameStr = name.toString()
|
||||
setName(nameStr)
|
||||
queueName = nameStr
|
||||
forwardingAddress = nameStr
|
||||
staticConnectors = listOf(hostAndPort.toString())
|
||||
confirmationWindowSize = 100000 // a guess
|
||||
})
|
||||
|
||||
/**
|
||||
* For every queue created we need to have a bridge deployed in case the address of the queue
|
||||
* is that of a remote party
|
||||
*/
|
||||
private fun maybeDeployBridgeForAddress(name: String) {
|
||||
val hostAndPort = HostAndPort.fromString(name)
|
||||
private fun maybeDeployBridgeForAddress(name: SimpleString, nodeInfo: NodeInfo) {
|
||||
val hostAndPort = toHostAndPort(nodeInfo.address)
|
||||
|
||||
fun connectorExists() = name in activeMQServer.configuration.connectorConfigurations
|
||||
if (hostAndPort == myHostPort) {
|
||||
return
|
||||
}
|
||||
|
||||
fun addConnector() = activeMQServer.configuration.addConnectorConfiguration(
|
||||
name,
|
||||
tcpTransport(
|
||||
ConnectionDirection.OUTBOUND,
|
||||
hostAndPort.hostText,
|
||||
hostAndPort.port
|
||||
)
|
||||
)
|
||||
if (!connectorExists(hostAndPort)) {
|
||||
log.info("add connector $hostAndPort")
|
||||
addConnector(hostAndPort)
|
||||
}
|
||||
|
||||
fun deployBridge() = activeMQServer.deployBridge(BridgeConfiguration().apply {
|
||||
setName(name)
|
||||
queueName = name
|
||||
forwardingAddress = name
|
||||
staticConnectors = listOf(name)
|
||||
confirmationWindowSize = 100000 // a guess
|
||||
})
|
||||
if (!bridgeExists(name)) {
|
||||
log.info("add bridge $hostAndPort $name")
|
||||
deployBridge(hostAndPort, name)
|
||||
}
|
||||
}
|
||||
|
||||
if (!connectorExists()) {
|
||||
addConnector()
|
||||
deployBridge()
|
||||
private fun maybeDestroyBridge(name: SimpleString) {
|
||||
if (bridgeExists(name)) {
|
||||
activeMQServer.destroyBridge(name.toString())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -98,13 +98,17 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
|
||||
}
|
||||
|
||||
override fun addNode(node: NodeInfo) {
|
||||
registeredNodes[node.identity] = node
|
||||
_changed.onNext(MapChange(node, MapChangeType.Added))
|
||||
val oldValue = registeredNodes.put(node.identity, node)
|
||||
if (oldValue == null) {
|
||||
_changed.onNext(MapChange(node, oldValue, MapChangeType.Added))
|
||||
} else if(oldValue != node) {
|
||||
_changed.onNext(MapChange(node, oldValue, MapChangeType.Modified))
|
||||
}
|
||||
}
|
||||
|
||||
override fun removeNode(node: NodeInfo) {
|
||||
registeredNodes.remove(node.identity)
|
||||
_changed.onNext(MapChange(node, MapChangeType.Removed))
|
||||
val oldValue = registeredNodes.remove(node.identity)
|
||||
_changed.onNext(MapChange(node, oldValue, MapChangeType.Removed))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -4,7 +4,6 @@ import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.NetworkMapCache
|
||||
import com.r3corda.node.services.api.RegulatorService
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingComponent
|
||||
import com.r3corda.node.services.transactions.NotaryService
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
import org.junit.Test
|
||||
|
||||
@ -12,7 +11,7 @@ import org.junit.Test
|
||||
class DriverTests {
|
||||
companion object {
|
||||
fun nodeMustBeUp(networkMapCache: NetworkMapCache, nodeInfo: NodeInfo, nodeName: String) {
|
||||
val address = nodeInfo.address as ArtemisMessagingComponent.Address
|
||||
val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address)
|
||||
// Check that the node is registered in the network map
|
||||
poll("network map cache for $nodeName") {
|
||||
networkMapCache.get().firstOrNull {
|
||||
@ -20,13 +19,13 @@ class DriverTests {
|
||||
}
|
||||
}
|
||||
// Check that the port is bound
|
||||
addressMustBeBound(address.hostAndPort)
|
||||
addressMustBeBound(hostAndPort)
|
||||
}
|
||||
|
||||
fun nodeMustBeDown(nodeInfo: NodeInfo) {
|
||||
val address = nodeInfo.address as ArtemisMessagingComponent.Address
|
||||
val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address)
|
||||
// Check that the port is bound
|
||||
addressMustNotBeBound(address.hostAndPort)
|
||||
addressMustNotBeBound(hostAndPort)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,12 +1,14 @@
|
||||
package com.r3corda.node.services
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.r3corda.core.crypto.generateKeyPair
|
||||
import com.r3corda.core.messaging.Message
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.testing.freeLocalHostAndPort
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingClient
|
||||
import com.r3corda.node.services.messaging.ArtemisMessagingServer
|
||||
import com.r3corda.node.services.network.InMemoryNetworkMapCache
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.After
|
||||
@ -25,6 +27,7 @@ class ArtemisMessagingTests {
|
||||
|
||||
val hostAndPort = freeLocalHostAndPort()
|
||||
val topic = "platform.self"
|
||||
val identity = generateKeyPair()
|
||||
val config = object : NodeConfiguration {
|
||||
override val myLegalName: String = "me"
|
||||
override val exportJMXto: String = ""
|
||||
@ -36,6 +39,8 @@ class ArtemisMessagingTests {
|
||||
var messagingClient: ArtemisMessagingClient? = null
|
||||
var messagingServer: ArtemisMessagingServer? = null
|
||||
|
||||
val networkMapCache = InMemoryNetworkMapCache()
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
messagingClient?.stop()
|
||||
@ -98,16 +103,15 @@ class ArtemisMessagingTests {
|
||||
assertNull(receivedMessages.poll(200, MILLISECONDS))
|
||||
}
|
||||
|
||||
private fun createMessagingClient(server: HostAndPort = hostAndPort,
|
||||
local: HostAndPort = hostAndPort): ArtemisMessagingClient {
|
||||
return ArtemisMessagingClient(temporaryFolder.newFolder().toPath(), config, server, local, AffinityExecutor.SAME_THREAD).apply {
|
||||
private fun createMessagingClient(server: HostAndPort = hostAndPort): ArtemisMessagingClient {
|
||||
return ArtemisMessagingClient(temporaryFolder.newFolder().toPath(), config, server, identity.public, AffinityExecutor.SAME_THREAD).apply {
|
||||
configureWithDevSSLCertificate()
|
||||
messagingClient = this
|
||||
}
|
||||
}
|
||||
|
||||
private fun createMessagingServer(local: HostAndPort = hostAndPort): ArtemisMessagingServer {
|
||||
return ArtemisMessagingServer(temporaryFolder.newFolder().toPath(), config, local).apply {
|
||||
return ArtemisMessagingServer(temporaryFolder.newFolder().toPath(), config, local, networkMapCache).apply {
|
||||
configureWithDevSSLCertificate()
|
||||
messagingServer = this
|
||||
}
|
||||
|
@ -21,4 +21,4 @@ fi
|
||||
# Upload the rates to the buyer node
|
||||
curl -F rates=@scripts/example.rates.txt http://localhost:31338/upload/interest-rates
|
||||
|
||||
$bin --network-address=localhost:31300 --directory=build/trader-demo/rates-fix --network-map=localhost:31337 --network-map-identity-file=build/trader-demo/buyer/identity-public --oracle=localhost --oracle-identity-file=build/trader-demo/buyer/identity-public
|
||||
$bin --network-address=localhost:31300 --directory=build/trader-demo/rates-fix --network-map=localhost:31337 --network-map-identity-file=build/trader-demo/buyer/identity-public
|
@ -374,7 +374,7 @@ private fun runTrade(cliParams: CliParams.Trade): Int {
|
||||
|
||||
private fun createRecipient(addr: String): SingleMessageRecipient {
|
||||
val hostAndPort = HostAndPort.fromString(addr).withDefaultPort(Node.DEFAULT_PORT)
|
||||
return ArtemisMessagingClient.makeRecipient(hostAndPort)
|
||||
return ArtemisMessagingClient.makeNetworkMapAddress(hostAndPort)
|
||||
}
|
||||
|
||||
private fun startNode(params: CliParams.RunNode, networkMap: SingleMessageRecipient): Node {
|
||||
|
@ -4,10 +4,10 @@ import com.google.common.net.HostAndPort
|
||||
import com.r3corda.contracts.asset.Cash
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.hours
|
||||
import com.r3corda.core.logElapsedTime
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.node.services.testing.makeTestDataSourceProperties
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.core.utilities.Emoji
|
||||
import com.r3corda.core.utilities.LogHelper
|
||||
@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory
|
||||
import java.math.BigDecimal
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Paths
|
||||
import java.util.*
|
||||
import kotlin.system.exitProcess
|
||||
|
||||
private val log: Logger = LoggerFactory.getLogger("RatesFixDemo")
|
||||
@ -38,8 +39,6 @@ fun main(args: Array<String>) {
|
||||
val dirArg = parser.accepts("directory").withRequiredArg().defaultsTo("rate-fix-demo-data")
|
||||
val networkMapAddrArg = parser.accepts("network-map").withRequiredArg().required()
|
||||
val networkMapIdentityArg = parser.accepts("network-map-identity-file").withRequiredArg().required()
|
||||
val oracleAddrArg = parser.accepts("oracle").withRequiredArg().required()
|
||||
val oracleIdentityArg = parser.accepts("oracle-identity-file").withRequiredArg().required()
|
||||
|
||||
val fixOfArg = parser.accepts("fix-of").withRequiredArg().defaultsTo("ICE LIBOR 2016-03-16 1M")
|
||||
val expectedRateArg = parser.accepts("expected-rate").withRequiredArg().defaultsTo("0.67")
|
||||
@ -56,28 +55,24 @@ fun main(args: Array<String>) {
|
||||
LogHelper.setLevel("+RatesFixDemo", "-org.apache.activemq")
|
||||
|
||||
val dir = Paths.get(options.valueOf(dirArg))
|
||||
val networkMapAddr = ArtemisMessagingClient.makeRecipient(options.valueOf(networkMapAddrArg))
|
||||
val networkMapAddr = ArtemisMessagingClient.makeNetworkMapAddress(HostAndPort.fromString(options.valueOf(networkMapAddrArg)))
|
||||
val networkMapIdentity = Files.readAllBytes(Paths.get(options.valueOf(networkMapIdentityArg))).deserialize<Party>()
|
||||
val networkMapAddress = NodeInfo(networkMapAddr, networkMapIdentity, setOf(NetworkMapService.Type, NotaryService.Type))
|
||||
|
||||
// Load oracle stuff (in lieu of having a network map service)
|
||||
val oracleAddr = ArtemisMessagingClient.makeRecipient(options.valueOf(oracleAddrArg))
|
||||
val oracleIdentity = Files.readAllBytes(Paths.get(options.valueOf(oracleIdentityArg))).deserialize<Party>()
|
||||
val oracleNode = NodeInfo(oracleAddr, oracleIdentity)
|
||||
|
||||
val fixOf: FixOf = NodeInterestRates.parseFixOf(options.valueOf(fixOfArg))
|
||||
val expectedRate = BigDecimal(options.valueOf(expectedRateArg))
|
||||
val rateTolerance = BigDecimal(options.valueOf(rateToleranceArg))
|
||||
|
||||
// Bring up node.
|
||||
val advertisedServices: Set<ServiceType> = emptySet()
|
||||
val myNetAddr = ArtemisMessagingClient.toHostAndPort(options.valueOf(networkAddressArg))
|
||||
val myNetAddr = HostAndPort.fromString(options.valueOf(networkAddressArg))
|
||||
val config = object : NodeConfiguration {
|
||||
override val myLegalName: String = "Rate fix demo node"
|
||||
override val exportJMXto: String = "http"
|
||||
override val nearestCity: String = "Atlantis"
|
||||
override val keyStorePassword: String = "cordacadevpass"
|
||||
override val trustStorePassword: String = "trustpass"
|
||||
override val dataSourceProperties: Properties = makeTestDataSourceProperties()
|
||||
}
|
||||
|
||||
val apiAddr = HostAndPort.fromParts(myNetAddr.hostText, myNetAddr.port + 1)
|
||||
@ -86,11 +81,12 @@ fun main(args: Array<String>) {
|
||||
advertisedServices, DemoClock()).setup().start() }
|
||||
node.networkMapRegistrationFuture.get()
|
||||
val notaryNode = node.services.networkMapCache.notaryNodes[0]
|
||||
val rateOracle = node.services.networkMapCache.ratesOracleNodes[0]
|
||||
|
||||
// Make a garbage transaction that includes a rate fix.
|
||||
val tx = TransactionType.General.Builder(notaryNode.identity)
|
||||
tx.addOutputState(TransactionState(Cash.State(1500.DOLLARS `issued by` node.storage.myLegalIdentity.ref(1), node.keyManagement.freshKey().public), notaryNode.identity))
|
||||
val protocol = RatesFixProtocol(tx, oracleNode.identity, fixOf, expectedRate, rateTolerance)
|
||||
val protocol = RatesFixProtocol(tx, rateOracle.identity, fixOf, expectedRate, rateTolerance)
|
||||
node.smm.add("demo.ratefix", protocol).get()
|
||||
node.stop()
|
||||
|
||||
|
@ -140,7 +140,7 @@ fun main(args: Array<String>) {
|
||||
val path = Paths.get(baseDirectory, Role.BUYER.name.toLowerCase(), "identity-public")
|
||||
val party = Files.readAllBytes(path).deserialize<Party>()
|
||||
advertisedServices = emptySet()
|
||||
NodeInfo(ArtemisMessagingClient.makeRecipient(theirNetAddr), party, setOf(NetworkMapService.Type))
|
||||
NodeInfo(ArtemisMessagingClient.makeNetworkMapAddress(theirNetAddr), party, setOf(NetworkMapService.Type))
|
||||
}
|
||||
|
||||
// And now construct then start the node object. It takes a little while.
|
||||
|
Loading…
x
Reference in New Issue
Block a user