Add a client library that provides an RPC mechanism for talking to the Corda node.

The RPC mechanism uses message queues and is essentially conventional except for the fact that it supports marshalling observables. An observable encapsulates a stream of ongoing events, and server-side observables sent to the client are automatically bound to message queues and managed by Artemis.
This commit is contained in:
Mike Hearn
2016-08-26 15:31:17 +02:00
parent 25daa7d688
commit 4d83f1489f
21 changed files with 1191 additions and 97 deletions

View File

@ -64,6 +64,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
val messagingServerAddr: HostAndPort? = null) : AbstractNode(dir, configuration, networkMapAddress, advertisedServices, clock) {
companion object {
/** The port that is used by default if none is specified. As you know, 31337 is the most elite number. */
@JvmField
val DEFAULT_PORT = 31337
}
@ -122,10 +123,11 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
messageBroker = ArtemisMessagingServer(dir, configuration, p2pAddr, services.networkMapCache)
p2pAddr
}()
val ops = ServerRPCOps(services)
if (networkMapService != null) {
return NodeMessagingClient(dir, configuration, serverAddr, services.storageService.myLegalIdentityKey.public, serverThread)
return NodeMessagingClient(dir, configuration, serverAddr, services.storageService.myLegalIdentityKey.public, serverThread, rpcOps = ops)
} else {
return NodeMessagingClient(dir, configuration, serverAddr, null, serverThread)
return NodeMessagingClient(dir, configuration, serverAddr, null, serverThread, rpcOps = ops)
}
}

View File

@ -0,0 +1,14 @@
package com.r3corda.node.internal
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.services.messaging.CordaRPCOps
/**
* Server side implementations of RPCs available to MQ based client tools. Execution takes place on the server
* thread (i.e. serially). Arguments are serialised and deserialised automatically.
*/
class ServerRPCOps(services: ServiceHubInternal) : CordaRPCOps {
override val protocolVersion: Int = 0
// TODO: Add useful RPCs for client apps (examining the vault, etc)
}

View File

@ -8,7 +8,7 @@ import com.r3corda.core.node.services.TxWritableStorageService
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRefFactory
interface MessagingServiceInternal: MessagingService {
interface MessagingServiceInternal : MessagingService {
/**
* Initiates shutdown: if called from a thread that isn't controlled by the executor passed to the constructor
* then this will block until all in-flight messages have finished being handled and acknowledged. If called

View File

@ -1,10 +1,7 @@
package com.r3corda.node.services.config
import com.google.common.net.HostAndPort
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.ServiceType
import com.r3corda.node.internal.Node
import com.r3corda.node.serialization.NodeClock
@ -25,12 +22,17 @@ import java.util.*
import kotlin.reflect.KProperty
import kotlin.reflect.jvm.javaType
interface NodeConfiguration {
interface NodeSSLConfiguration {
val keyStorePassword: String
val trustStorePassword: String
// TODO: Move cert paths into this interface as well.
}
interface NodeConfiguration : NodeSSLConfiguration {
val myLegalName: String
val exportJMXto: String
val nearestCity: String
val keyStorePassword: String
val trustStorePassword: String
val dataSourceProperties: Properties get() = Properties()
companion object {

View File

@ -5,10 +5,12 @@ import com.google.common.net.HostAndPort
import com.r3corda.core.crypto.X509Utilities
import com.r3corda.core.crypto.parsePublicKeyBase58
import com.r3corda.core.crypto.toBase58String
import com.r3corda.core.div
import com.r3corda.core.messaging.MessageRecipients
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.core.use
import com.r3corda.node.services.config.NodeSSLConfiguration
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
@ -16,21 +18,27 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactor
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
import java.nio.file.Files
import java.nio.file.Path
import java.security.KeyStore
import java.security.PublicKey
/**
* The base class for Artemis services that defines shared data structures and transport configuration
*
* @param directory A place where Artemis can stash its message journal and other files.
* @param certificatePath A place where Artemis can stash its message journal and other files.
* @param config The config object is used to pass in the passwords for the certificate KeyStore and TrustStore
*/
abstract class ArtemisMessagingComponent(val directory: Path, val config: NodeConfiguration) : SingletonSerializeAsToken() {
private val keyStorePath = directory.resolve("certificates").resolve("sslkeystore.jks")
private val trustStorePath = directory.resolve("certificates").resolve("truststore.jks")
abstract class ArtemisMessagingComponent(val certificatePath: Path, val config: NodeSSLConfiguration) : SingletonSerializeAsToken() {
val keyStorePath: Path = certificatePath / "sslkeystore.jks"
val trustStorePath: Path = certificatePath / "truststore.jks"
companion object {
init {
System.setProperty("org.jboss.logging.provider", "slf4j")
}
const val PEERS_PREFIX = "peers."
const val CLIENTS_PREFIX = "clients."
const val RPC_REQUESTS_QUEUE = "rpc.requests"
@JvmStatic
protected val NETWORK_MAP_ADDRESS = SimpleString(PEERS_PREFIX +"networkmap")
@ -70,8 +78,12 @@ abstract class ArtemisMessagingComponent(val directory: Path, val config: NodeCo
override val queueName: SimpleString = NETWORK_MAP_ADDRESS
}
// In future: can contain onion routing info, etc.
protected data class NodeAddress(val identity: PublicKey, override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress {
/**
* This is the class used to implement [SingleMessageRecipient], for now. Note that in future this class
* may change or evolve and code that relies upon it being a simple host/port may not function correctly.
* For instance it may contain onion routing data.
*/
data class NodeAddress(val identity: PublicKey, override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress {
override val queueName: SimpleString by lazy { SimpleString(PEERS_PREFIX+identity.toBase58String()) }
override fun toString(): String {
@ -79,18 +91,9 @@ abstract class ArtemisMessagingComponent(val directory: Path, val config: NodeCo
}
}
protected fun tryParseKeyFromQueueName(queueName: SimpleString): PublicKey? {
val name = queueName.toString()
if(!name.startsWith(PEERS_PREFIX)) {
return null
}
val keyCode = name.substring(PEERS_PREFIX.length)
return try {
parsePublicKeyBase58(keyCode)
} catch (ex: Exception) {
null
}
protected fun parseKeyFromQueueName(name: String): PublicKey {
require(name.startsWith(PEERS_PREFIX))
return parsePublicKeyBase58(name.substring(PEERS_PREFIX.length))
}
protected enum class ConnectionDirection { INBOUND, OUTBOUND }
@ -105,7 +108,21 @@ abstract class ArtemisMessagingComponent(val directory: Path, val config: NodeCo
"TLS_ECDH_ECDSA_WITH_AES_128_GCM_SHA256",
"TLS_ECDH_RSA_WITH_AES_128_GCM_SHA256",
"TLS_DHE_RSA_WITH_AES_128_GCM_SHA256",
"TLS_DHE_DSS_WITH_AES_128_GCM_SHA256")
"TLS_DHE_DSS_WITH_AES_128_GCM_SHA256"
)
/**
* Returns nothing if the keystore was opened OK or throws if not. Useful to check the password, as
* unfortunately Artemis tends to bury the exception when the password is wrong.
*/
fun checkStorePasswords() {
keyStorePath.use {
KeyStore.getInstance("JKS").load(it, config.keyStorePassword.toCharArray())
}
trustStorePath.use {
KeyStore.getInstance("JKS").load(it, config.trustStorePassword.toCharArray())
}
}
protected fun tcpTransport(direction: ConnectionDirection, host: String, port: Int) =
TransportConfiguration(
@ -144,11 +161,7 @@ abstract class ArtemisMessagingComponent(val directory: Path, val config: NodeCo
* the CA certs in Node resources. Then provision KeyStores into certificates folder under node path.
*/
fun configureWithDevSSLCertificate() {
val keyStorePath = directory.resolve("certificates").resolve("sslkeystore.jks")
val trustStorePath = directory.resolve("certificates").resolve("truststore.jks")
Files.createDirectories(directory.resolve("certificates"))
Files.createDirectories(certificatePath)
if (!Files.exists(trustStorePath)) {
Files.copy(javaClass.classLoader.getResourceAsStream("com/r3corda/node/internal/certificates/cordatruststore.jks"),
trustStorePath)

View File

@ -2,6 +2,7 @@ package com.r3corda.node.services.messaging
import com.google.common.net.HostAndPort
import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.AddressFormatException
import com.r3corda.core.crypto.newSecureRandom
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.NodeInfo
@ -117,7 +118,7 @@ class ArtemisMessagingServer(directory: Path,
}
private fun configureAndStartServer() {
val config = createArtemisConfig(directory, myHostPort).apply {
val config = createArtemisConfig(certificatePath, myHostPort).apply {
securityRoles = mapOf(
"#" to setOf(Role("internal", true, true, true, true, true, true, true))
)
@ -128,19 +129,32 @@ class ArtemisMessagingServer(directory: Path,
activeMQServer = ActiveMQServerImpl(config, securityManager).apply {
// Throw any exceptions which are detected during startup
registerActivationFailureListener { exception -> throw exception }
// Deploy bridge for a newly created queue
// Some types of queue might need special preparation on our side, like dialling back or preparing
// a lazily initialised subsystem.
registerPostQueueCreationCallback { queueName ->
log.info("Queue created: $queueName")
if (queueName != NETWORK_MAP_ADDRESS) {
val identity = tryParseKeyFromQueueName(queueName)
if (identity != null) {
log.debug("Queue created: $queueName")
if (queueName.startsWith(PEERS_PREFIX) && queueName != NETWORK_MAP_ADDRESS) {
try {
val identity = parseKeyFromQueueName(queueName.toString())
val nodeInfo = networkMapCache.getNodeByPublicKey(identity)
if (nodeInfo != null) {
maybeDeployBridgeForAddress(queueName, nodeInfo.address)
} else {
log.error("Queue created for a peer that we don't know from the network map: $queueName")
}
} catch (e: AddressFormatException) {
log.error("Protocol violation: Could not parse queue name as Base 58: $queueName")
}
}
}
registerPostQueueDeletionCallback { address, qName ->
if (qName == address)
log.debug("Queue deleted: $qName")
else
log.debug("Queue deleted: $qName for $address")
}
}
activeMQServer.start()
}
@ -148,7 +162,6 @@ class ArtemisMessagingServer(directory: Path,
private fun createArtemisConfig(directory: Path, hp: HostAndPort): Configuration {
val config = ConfigurationImpl()
setConfigDirectories(config, directory)
// We will be talking to our server purely in memory.
config.acceptorConfigurations = setOf(
tcpTransport(ConnectionDirection.INBOUND, "0.0.0.0", hp.port)
)
@ -166,9 +179,9 @@ class ArtemisMessagingServer(directory: Path,
return ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, securityConfig)
}
fun connectorExists(hostAndPort: HostAndPort) = hostAndPort.toString() in activeMQServer.configuration.connectorConfigurations
private fun connectorExists(hostAndPort: HostAndPort) = hostAndPort.toString() in activeMQServer.configuration.connectorConfigurations
fun addConnector(hostAndPort: HostAndPort) = activeMQServer.configuration.addConnectorConfiguration(
private fun addConnector(hostAndPort: HostAndPort) = activeMQServer.configuration.addConnectorConfiguration(
hostAndPort.toString(),
tcpTransport(
ConnectionDirection.OUTBOUND,
@ -177,37 +190,32 @@ class ArtemisMessagingServer(directory: Path,
)
)
fun bridgeExists(name: SimpleString) = activeMQServer.clusterManager.bridges.containsKey(name.toString())
private fun bridgeExists(name: SimpleString) = activeMQServer.clusterManager.bridges.containsKey(name.toString())
fun deployBridge(hostAndPort: HostAndPort, name: SimpleString) = activeMQServer.deployBridge(BridgeConfiguration().apply {
val nameStr = name.toString()
setName(nameStr)
queueName = nameStr
forwardingAddress = nameStr
staticConnectors = listOf(hostAndPort.toString())
confirmationWindowSize = 100000 // a guess
})
private fun deployBridge(hostAndPort: HostAndPort, name: SimpleString) {
activeMQServer.deployBridge(BridgeConfiguration().apply {
val nameStr = name.toString()
setName(nameStr)
queueName = nameStr
forwardingAddress = nameStr
staticConnectors = listOf(hostAndPort.toString())
confirmationWindowSize = 100000 // a guess
})
}
/**
* For every queue created we need to have a bridge deployed in case the address of the queue
* is that of a remote party
* is that of a remote party.
*/
private fun maybeDeployBridgeForAddress(name: SimpleString, address: SingleMessageRecipient) {
val hostAndPort = toHostAndPort(address)
if (hostAndPort == myHostPort) {
private fun maybeDeployBridgeForAddress(name: SimpleString, nodeInfo: SingleMessageRecipient) {
require(name.startsWith(PEERS_PREFIX))
val hostAndPort = toHostAndPort(nodeInfo)
if (hostAndPort == myHostPort)
return
}
if (!connectorExists(hostAndPort)) {
log.info("add connector $hostAndPort")
if (!connectorExists(hostAndPort))
addConnector(hostAndPort)
}
if (!bridgeExists(name)) {
log.info("add bridge $hostAndPort $name")
if (!bridgeExists(name))
deployBridge(hostAndPort, name)
}
}
private fun maybeDestroyBridge(name: SimpleString) {

View File

@ -0,0 +1,11 @@
package com.r3corda.node.services.messaging
import rx.Observable
/**
* RPC operations that the node exposes to clients using the Java client library. These can be called from
* client apps and are implemented by the node in the [ServerRPCOps] class.
*/
interface CordaRPCOps : RPCOps {
// TODO: Add useful RPCs for client apps (examining the vault, etc)
}

View File

@ -2,7 +2,9 @@ package com.r3corda.node.services.messaging
import com.google.common.net.HostAndPort
import com.r3corda.core.ThreadBox
import com.r3corda.core.div
import com.r3corda.core.messaging.*
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.serialization.opaque
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.api.MessagingServiceInternal
@ -21,6 +23,8 @@ import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executor
import javax.annotation.concurrent.ThreadSafe
// TODO: Stop the wallet explorer and other clients from using this class and get rid of persistentInbox
/**
* This class implements the [MessagingService] API using Apache Artemis, the successor to their ActiveMQ product.
* Artemis is a message queue broker and here we run a client connecting to the specified broker instance
@ -30,6 +34,10 @@ import javax.annotation.concurrent.ThreadSafe
* are blocked until the handler is scheduled and completed. This allows backpressure to propagate from the given
* executor through into Artemis and from there, back through to senders.
*
* An implementation of [CordaRPCOps] can be provided. If given, clients using the CordaMQClient RPC library can
* invoke methods on the provided implementation. There is more documentation on this in the docsite and the
* CordaRPCClient class.
*
* @param serverHostPort The address of the broker instance to connect to (might be running in the same process)
* @param myIdentity Either the public key to be used as the ArtemisMQ address and queue name for the node globally, or null to indicate
* that this is a NetworkMapService node which will be bound globally to the name "networkmap"
@ -43,7 +51,9 @@ class NodeMessagingClient(directory: Path,
val serverHostPort: HostAndPort,
val myIdentity: PublicKey?,
val executor: AffinityExecutor,
val persistentInbox: Boolean = true) : ArtemisMessagingComponent(directory, config), MessagingServiceInternal {
val persistentInbox: Boolean = true,
private val rpcOps: CordaRPCOps? = null)
: ArtemisMessagingComponent(directory / "certificates", config), MessagingServiceInternal {
companion object {
val log = loggerFor<NodeMessagingClient>()
@ -68,9 +78,12 @@ class NodeMessagingClient(directory: Path,
var running = false
val knownQueues = mutableSetOf<SimpleString>()
var producer: ClientProducer? = null
var consumer: ClientConsumer? = null
var p2pConsumer: ClientConsumer? = null
var session: ClientSession? = null
var clientFactory: ClientSessionFactory? = null
// Consumer for inbound client RPC messages.
var rpcConsumer: ClientConsumer? = null
var rpcNotificationConsumer: ClientConsumer? = null
// TODO: This is not robust and needs to be replaced by more intelligently using the message queue server.
var undeliveredMessages = listOf<Message>()
@ -99,7 +112,7 @@ class NodeMessagingClient(directory: Path,
started = true
log.info("Connecting to server: $serverHostPort")
// Connect to our server.
// Connect to our server. TODO: This should use the in-VM transport.
val tcpTransport = tcpTransport(ConnectionDirection.OUTBOUND, serverHostPort.hostText, serverHostPort.port)
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport)
clientFactory = locator.createSessionFactory()
@ -107,30 +120,43 @@ class NodeMessagingClient(directory: Path,
// Create a session and configure to commit manually after each acknowledge. (N.B. ackBatchSize is in Bytes!!!)
val session = clientFactory!!.createSession(true, true, 1)
this.session = session
session.start()
// Create a queue on which to receive messages and set up the handler.
// Create a general purpose producer.
producer = session.createProducer()
// Create a queue, consumer and producer for handling P2P network messages.
val queueName = toQueueName(myAddress)
val query = session.queueQuery(queueName)
if (!query.isExists) {
session.createQueue(queueName, queueName, persistentInbox)
}
knownQueues.add(queueName)
consumer = session.createConsumer(queueName)
producer = session.createProducer()
p2pConsumer = session.createConsumer(queueName)
session.start()
// Create an RPC queue and consumer: this will service locally connected clients only (not via a
// bridge) and those clients must have authenticated. We could use a single consumer for everything
// and perhaps we should, but these queues are not worth persisting.
if (rpcOps != null) {
session.createTemporaryQueue(RPC_REQUESTS_QUEUE, RPC_REQUESTS_QUEUE)
session.createTemporaryQueue("activemq.notifications", "rpc.qremovals", "_AMQ_NotifType = 1")
rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE)
rpcNotificationConsumer = session.createConsumer("rpc.qremovals")
}
}
}
private var shutdownLatch = CountDownLatch(1)
/** Starts the event loop: this method only returns once [stop] has been called. */
/** Starts the p2p event loop: this method only returns once [stop] has been called. */
fun run() {
val consumer = state.locked {
check(started)
check(!running) { "run can't be called twice" }
running = true
consumer!!
// Optionally, start RPC dispatch.
dispatcher?.start(rpcConsumer!!, rpcNotificationConsumer!!, executor)
p2pConsumer!!
}
while (true) {
@ -254,13 +280,13 @@ class NodeMessagingClient(directory: Path,
// We allow stop() to be called without a run() in between, but it must have at least been started.
check(started)
val c = consumer ?: throw IllegalStateException("stop can't be called twice")
val c = p2pConsumer ?: throw IllegalStateException("stop can't be called twice")
try {
c.close()
} catch(e: ActiveMQObjectClosedException) {
// Ignore it: this can happen if the server has gone away before we do.
}
consumer = null
p2pConsumer = null
val prevRunning = running
running = false
prevRunning
@ -272,6 +298,10 @@ class NodeMessagingClient(directory: Path,
// Only first caller to gets running true to protect against double stop, which seems to happen in some integration tests.
if (running) {
state.locked {
rpcConsumer?.close()
rpcConsumer = null
rpcNotificationConsumer?.close()
rpcNotificationConsumer = null
producer?.close()
producer = null
// Ensure any trailing messages are committed to the journal
@ -305,7 +335,7 @@ class NodeMessagingClient(directory: Path,
state.alreadyLocked {
val queueQuery = session!!.queueQuery(queueName)
if (!queueQuery.isExists) {
log.info("create client queue $queueName")
log.info("Create fresh queue $queueName")
session!!.createQueue(queueName, queueName, true /* durable */)
}
}
@ -346,6 +376,17 @@ class NodeMessagingClient(directory: Path,
}
}
override fun createMessage(topic: String, sessionID: Long, data: ByteArray): Message
= createMessage(TopicSession(topic, sessionID), data)
override fun createMessage(topic: String, sessionID: Long, data: ByteArray) = createMessage(TopicSession(topic, sessionID), data)
private fun createRPCDispatcher(ops: CordaRPCOps) = object : RPCDispatcher(ops) {
override fun send(bits: SerializedBytes<*>, toAddress: String) {
state.locked {
val msg = session!!.createMessage(false)
msg.writeBodyBufferBytes(bits.bits)
producer!!.send(toAddress, msg)
}
}
}
private val dispatcher = if (rpcOps != null) createRPCDispatcher(rpcOps) else null
}

View File

@ -0,0 +1,129 @@
package com.r3corda.node.services.messaging
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.google.common.collect.HashMultimap
import com.r3corda.core.ErrorOr
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.core.utilities.debug
import com.r3corda.node.utilities.AffinityExecutor
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import rx.Notification
import rx.Observable
import rx.Subscription
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.atomic.AtomicInteger
// TODO: Exposing the authenticated message sender.
/**
* Intended to service transient clients only (not p2p nodes) for short-lived, transient request/response pairs.
* If you need robustness, this is the wrong system. If you don't want a response, this is probably the
* wrong system (you could just send a message). If you want complex customisation of how requests/responses
* are handled, this is probably the wrong system.
*/
abstract class RPCDispatcher(val target: Any) {
private val methodTable = target.javaClass.declaredMethods.associateBy { it.name }
private val queueToSubscription = HashMultimap.create<String, Subscription>()
// Created afresh for every RPC that is annotated as returning observables. Every time an observable is
// encountered either in the RPC response or in an object graph that is being emitted by one of those
// observables, the handle counter is incremented and the server-side observable is subscribed to. The
// materialized observations are then sent to the queue the client created where they can be picked up.
//
// When the observables are deserialised on the client side, the handle is read from the byte stream and
// the queue is filtered to extract just those observations.
private inner class ObservableSerializer(private val toQName: String) : Serializer<Observable<Any>>() {
private val handleCounter = AtomicInteger()
override fun read(kryo: Kryo, input: Input, type: Class<Observable<Any>>): Observable<Any> {
throw UnsupportedOperationException("not implemented")
}
override fun write(kryo: Kryo, output: Output, obj: Observable<Any>) {
val handle = handleCounter.andIncrement
output.writeInt(handle, true)
// Observables can do three kinds of callback: "next" with a content object, "completed" and "error".
// Materializing the observable converts these three kinds of callback into a single stream of objects
// representing what happened, which is useful for us to send over the wire.
val subscription = obj.materialize().subscribe { materialised: Notification<out Any> ->
val newKryo = createRPCKryo(observableSerializer = this@ObservableSerializer)
val bits = MarshalledObservation(handle, materialised).serialize(newKryo)
rpcLog.debug("RPC sending observation: $materialised")
send(bits, toQName)
}
synchronized(queueToSubscription) {
queueToSubscription.put(toQName, subscription)
}
}
}
fun dispatch(msg: ClientRPCRequestMessage) {
val (argBytes, replyTo, observationsTo, name) = msg
val maybeArgs = argBytes.deserialize<Array<Any>>()
rpcLog.debug { "-> RPC -> $name(${maybeArgs.joinToString()}) [reply to $replyTo]" }
val response: ErrorOr<Any?> = ErrorOr.catch {
val method = methodTable[name] ?: throw RPCException("Received RPC for unknown method $name - possible client/server version skew?")
if (method.isAnnotationPresent(RPCReturnsObservables::class.java) && observationsTo == null)
throw RPCException("Received RPC without any destination for observations, but the RPC returns observables")
try {
method.invoke(target, *maybeArgs)
} catch (e: InvocationTargetException) {
throw e.cause!!
}
}
rpcLog.debug { "<- RPC <- $name = $response " }
val kryo = createRPCKryo(observableSerializer = if (observationsTo != null) ObservableSerializer(observationsTo) else null)
// Serialise, or send back a simple serialised ErrorOr structure if we couldn't do it.
val responseBits = try {
response.serialize(kryo)
} catch (e: KryoException) {
rpcLog.error("Failed to respond to inbound RPC $name", e)
ErrorOr.of(e).serialize(kryo)
}
send(responseBits, replyTo)
}
abstract fun send(bits: SerializedBytes<*>, toAddress: String)
fun start(rpcConsumer: ClientConsumer, rpcNotificationConsumer: ClientConsumer?, onExecutor: AffinityExecutor) {
rpcNotificationConsumer?.setMessageHandler { msg ->
val qName = msg.getStringProperty("_AMQ_RoutingName")
val subscriptions = synchronized(queueToSubscription) {
queueToSubscription.removeAll(qName)
}
if (subscriptions.isNotEmpty()) {
rpcLog.debug("Observable queue was deleted, unsubscribing: $qName")
subscriptions.forEach { it.unsubscribe() }
}
}
rpcConsumer.setMessageHandler { msg ->
msg.acknowledge()
// All RPCs run on the main server thread, in order to avoid running concurrently with
// potentially state changing requests from other nodes and each other. If we need to
// give better latency to client RPCs in future we could use an executor that supports
// job priorities.
onExecutor.execute {
try {
val rpcMessage = msg.toRPCRequestMessage()
dispatch(rpcMessage)
} catch(e: RPCException) {
rpcLog.warn("Received malformed client RPC message: ${e.message}")
rpcLog.trace("RPC exception", e)
} catch(e: Throwable) {
rpcLog.error("Uncaught exception when dispatching client RPC", e)
}
}
}
}
}

View File

@ -0,0 +1,142 @@
package com.r3corda.node.services.messaging
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Registration
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.r3corda.core.ErrorOr
import com.r3corda.core.crypto.Party
import com.r3corda.core.serialization.*
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.transactions.WireTransaction
import de.javakaffee.kryoserializers.ArraysAsListSerializer
import de.javakaffee.kryoserializers.guava.*
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.objenesis.strategy.StdInstantiatorStrategy
import org.slf4j.LoggerFactory
import rx.Notification
import rx.Observable
import java.time.Instant
import java.util.*
/** Global RPC logger */
val rpcLog by lazy { LoggerFactory.getLogger("com.r3corda.rpc") }
/** Used in the RPC wire protocol to wrap an observation with the handle of the observable it's intended for. */
data class MarshalledObservation(val forHandle: Int, val what: Notification<*>)
/**
* If an RPC is tagged with this annotation it may return one or more observables anywhere in its response graph.
* Calling such a method comes with consequences: it's slower, and consumes server side resources as observations
* will buffer up on the server until they're consumed by the client.
*/
@Target(AnnotationTarget.FUNCTION)
@MustBeDocumented
annotation class RPCReturnsObservables
/** Records the protocol version in which this RPC was added. */
@Target(AnnotationTarget.FUNCTION)
@MustBeDocumented
annotation class RPCSinceVersion(val version: Int)
/** The contents of an RPC request message, separated from the MQ layer. */
data class ClientRPCRequestMessage(
val args: SerializedBytes<Array<Any>>,
val replyToAddress: String,
val observationsToAddress: String?,
val methodName: String
) {
companion object {
const val REPLY_TO = "reply-to"
const val OBSERVATIONS_TO = "observations-to"
const val METHOD_NAME = "method-name"
}
}
/**
* Base interface that all RPC servers must implement. Note: in Corda there's only one RPC interface. This base
* interface is here in case we split the RPC system out into a separate library one day.
*/
interface RPCOps {
/** Returns the RPC protocol version. Exists since version 0 so guaranteed to be present. */
val protocolVersion: Int
}
/**
* Thrown to indicate a fatal error in the RPC system itself, as opposed to an error generated by the invoked
* method.
*/
open class RPCException(msg: String, cause: Throwable?) : RuntimeException(msg, cause) {
constructor(msg: String) : this(msg, null)
class DeadlineExceeded(rpcName: String) : RPCException("Deadline exceeded on call to $rpcName")
}
/** Convert an Artemis [ClientMessage] to a MQ-neutral [ClientRPCRequestMessage]. */
fun ClientMessage.toRPCRequestMessage(): ClientRPCRequestMessage {
fun ClientMessage.requiredString(name: String): String = getStringProperty(name) ?: throw RPCException("Malformed request message: missing $name property")
val methodName = requiredString(ClientRPCRequestMessage.METHOD_NAME)
// TODO: Look up the authenticated sender identity once we upgrade to Artemis 1.4 and use that instead.
// This current approach is insecure: one client could send an RPC with a reply-to address owned by
// another, although they'd have to be able to figure out the other client ID first.
// We also need that to figure out what RPCs are allowed.
val replyTo = requiredString(ClientRPCRequestMessage.REPLY_TO)
val observationsTo = getStringProperty(ClientRPCRequestMessage.OBSERVATIONS_TO)
val argBytes = ByteArray(bodySize).apply { bodyBuffer.readBytes(this) }
check(argBytes.isNotEmpty())
return ClientRPCRequestMessage(SerializedBytes(argBytes), replyTo, observationsTo, methodName)
}
// The Kryo used for the RPC wire protocol. Every type in the wire protocol is listed here explicitly.
// This is annoying to write out, but will make it easier to formalise the wire protocol when the time comes,
// because we can see everything we're using in one place.
private class RPCKryo(private val observableSerializer: Serializer<Observable<Any>>? = null) : Kryo() {
init {
isRegistrationRequired = true
// Allow construction of objects using a JVM backdoor that skips invoking the constructors, if there is no
// no-arg constructor available.
instantiatorStrategy = Kryo.DefaultInstantiatorStrategy(StdInstantiatorStrategy())
register(Arrays.asList("").javaClass, ArraysAsListSerializer())
register(Instant::class.java, ReferencesAwareJavaSerializer)
register(SignedTransaction::class.java, ImmutableClassSerializer(SignedTransaction::class))
register(WireTransaction::class.java, WireTransactionSerializer)
register(SerializedBytes::class.java, SerializedBytesSerializer)
register(Party::class.java)
ImmutableListSerializer.registerSerializers(this)
ImmutableSetSerializer.registerSerializers(this)
ImmutableSortedSetSerializer.registerSerializers(this)
ImmutableMapSerializer.registerSerializers(this)
ImmutableMultimapSerializer.registerSerializers(this)
noReferencesWithin<WireTransaction>()
register(ErrorOr::class.java)
register(MarshalledObservation::class.java, ImmutableClassSerializer(MarshalledObservation::class))
register(Notification::class.java)
register(Notification.Kind::class.java)
register(kotlin.Pair::class.java)
// Exceptions. We don't bother sending the stack traces as the client will fill in its own anyway.
register(IllegalArgumentException::class.java)
register(RPCException::class.java)
register(Array<StackTraceElement>::class.java, object : Serializer<Array<StackTraceElement>>() {
override fun read(kryo: Kryo, input: Input, type: Class<Array<StackTraceElement>>): Array<StackTraceElement> = emptyArray()
override fun write(kryo: Kryo, output: Output, `object`: Array<StackTraceElement>) {}
})
register(Collections.unmodifiableList(emptyList<String>()).javaClass)
}
val observableRegistration: Registration? = if (observableSerializer != null) register(Observable::class.java, observableSerializer) else null
override fun getRegistration(type: Class<*>): Registration {
if (Observable::class.java.isAssignableFrom(type))
return observableRegistration ?: throw IllegalStateException("This RPC was not annotated with @RPCReturnsObservables")
return super.getRegistration(type)
}
}
fun createRPCKryo(observableSerializer: Serializer<Observable<Any>>? = null): Kryo = RPCKryo(observableSerializer)