Local RPC, demos use RPC, NODE has special privileges

This commit is contained in:
Andras Slemmer
2016-11-18 11:44:29 +00:00
parent bbba7d3f19
commit a601f0abf5
69 changed files with 544 additions and 342 deletions

View File

@ -11,7 +11,7 @@ import org.junit.Test
class DriverTests {
companion object {
fun nodeMustBeUp(nodeInfo: NodeInfo, nodeName: String) {
fun nodeMustBeUp(nodeInfo: NodeInfo) {
val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address)
// Check that the port is bound
addressMustBeBound(hostAndPort)
@ -30,8 +30,8 @@ class DriverTests {
val notary = startNode("TestNotary", setOf(ServiceInfo(SimpleNotaryService.type)))
val regulator = startNode("Regulator", setOf(ServiceInfo(RegulatorService.type)))
nodeMustBeUp(notary.getOrThrow().nodeInfo, "TestNotary")
nodeMustBeUp(regulator.getOrThrow().nodeInfo, "Regulator")
nodeMustBeUp(notary.getOrThrow().nodeInfo)
nodeMustBeUp(regulator.getOrThrow().nodeInfo)
Pair(notary.getOrThrow(), regulator.getOrThrow())
}
nodeMustBeDown(notary.nodeInfo)
@ -42,7 +42,7 @@ class DriverTests {
fun startingNodeWithNoServicesWorks() {
val noService = driver {
val noService = startNode("NoService")
nodeMustBeUp(noService.getOrThrow().nodeInfo, "NoService")
nodeMustBeUp(noService.getOrThrow().nodeInfo)
noService.getOrThrow()
}
nodeMustBeDown(noService.nodeInfo)
@ -52,7 +52,7 @@ class DriverTests {
fun randomFreePortAllocationWorks() {
val nodeInfo = driver(portAllocation = PortAllocation.RandomFree()) {
val nodeInfo = startNode("NoService")
nodeMustBeUp(nodeInfo.getOrThrow().nodeInfo, "NoService")
nodeMustBeUp(nodeInfo.getOrThrow().nodeInfo)
nodeInfo.getOrThrow()
}
nodeMustBeDown(nodeInfo.nodeInfo)

View File

@ -2,12 +2,12 @@ package net.corda.services.messaging
import co.paralleluniverse.fibers.Suspendable
import com.google.common.net.HostAndPort
import net.corda.client.impl.CordaRPCClientImpl
import net.corda.core.crypto.Party
import net.corda.core.crypto.composite
import net.corda.core.crypto.generateKeyPair
import net.corda.core.flows.FlowLogic
import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.random63BitValue
import net.corda.core.seconds
import net.corda.node.internal.Node
@ -18,7 +18,7 @@ import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NOT
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
import net.corda.node.services.messaging.CordaRPCOps
import net.corda.node.services.messaging.CordaRPCClientImpl
import net.corda.node.services.messaging.NodeMessagingClient.Companion.RPC_QUEUE_REMOVALS_QUEUE
import net.corda.testing.messaging.SimpleMQClient
import net.corda.testing.node.NodeBasedTest

View File

@ -11,6 +11,7 @@ import net.corda.core.crypto.X509Utilities
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.flows.FlowStateMachine
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.*
import net.corda.core.node.services.*
@ -21,6 +22,7 @@ import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.flows.FinalityFlow
import net.corda.flows.sendRequest
import net.corda.node.api.APIServer
import net.corda.node.services.api.*
@ -30,7 +32,6 @@ import net.corda.node.services.events.NodeSchedulerService
import net.corda.node.services.events.ScheduledActivityObserver
import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.RPCOps
import net.corda.node.services.network.InMemoryNetworkMapCache
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_FLOW_TOPIC
@ -82,7 +83,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
CashCommand.IssueCash::class.java,
CashCommand.PayCash::class.java,
CashCommand.ExitCash::class.java
)
),
FinalityFlow::class.java to emptySet()
)
}
@ -340,8 +342,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
private fun buildPluginServices(tokenizableServices: MutableList<Any>): List<Any> {
val pluginServices = pluginRegistries.flatMap { x -> x.servicePlugins }
val serviceList = mutableListOf<Any>()
for (serviceClass in pluginServices) {
val service = serviceClass.getConstructor(PluginServiceHub::class.java).newInstance(services)
for (serviceConstructor in pluginServices) {
val service = serviceConstructor.apply(services)
serviceList.add(service)
tokenizableServices.add(service)
if (service is AcceptsFileUpload) {

View File

@ -2,23 +2,36 @@ package net.corda.node.internal
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.StateMachineInfo
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.StateMachineTransactionMapping
import net.corda.core.node.services.Vault
import net.corda.core.serialization.serialize
import net.corda.node.services.messaging.requirePermission
import net.corda.core.toObservable
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker
import net.corda.node.services.messaging.*
import net.corda.node.services.messaging.createRPCKryo
import net.corda.node.services.startFlowPermission
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.databaseTransaction
import org.jetbrains.exposed.sql.Database
import rx.Observable
import java.io.BufferedInputStream
import java.io.File
import java.io.FileInputStream
import java.io.InputStream
import java.time.Instant
import java.time.LocalDateTime
/**
* Server side implementations of RPCs available to MQ based client tools. Execution takes place on the server
@ -51,8 +64,8 @@ class CordaRPCOpsImpl(
override fun stateMachinesAndUpdates(): Pair<List<StateMachineInfo>, Observable<StateMachineUpdate>> {
val (allStateMachines, changes) = smm.track()
return Pair(
allStateMachines.map { StateMachineInfo.fromFlowStateMachineImpl(it) },
changes.map { StateMachineUpdate.fromStateMachineChange(it) }
allStateMachines.map { stateMachineInfoFromFlowStateMachineImpl(it) },
changes.map { stateMachineUpdateFromStateMachineChange(it) }
)
}
@ -84,8 +97,41 @@ class CordaRPCOpsImpl(
val stateMachine = services.invokeFlowAsync(logicType, *args) as FlowStateMachineImpl<T>
return FlowHandle(
id = stateMachine.id,
progress = stateMachine.logic.progressTracker?.changes ?: Observable.empty<ProgressTracker.Change>(),
progress = stateMachine.logic.track()?.second ?: Observable.empty(),
returnValue = stateMachine.resultFuture.toObservable()
)
}
override fun attachmentExists(id: SecureHash) = services.storageService.attachments.openAttachment(id) != null
override fun uploadAttachment(jar: InputStream) = services.storageService.attachments.importAttachment(jar)
override fun currentNodeTime(): Instant = Instant.now(services.clock)
override fun partyFromKey(key: CompositeKey) = services.identityService.partyFromKey(key)
override fun partyFromName(name: String) = services.identityService.partyFromName(name)
companion object {
fun stateMachineInfoFromFlowStateMachineImpl(stateMachine: FlowStateMachineImpl<*>): StateMachineInfo {
return StateMachineInfo(
id = stateMachine.id,
flowLogicClassName = stateMachine.logic.javaClass.name,
progressTrackerStepAndUpdates = stateMachine.logic.track()
)
}
fun stateMachineUpdateFromStateMachineChange(change: StateMachineManager.Change): StateMachineUpdate {
return when (change.addOrRemove) {
AddOrRemove.ADD -> {
val stateMachineInfo = StateMachineInfo(
id = change.id,
flowLogicClassName = change.logic.javaClass.name,
progressTrackerStepAndUpdates = change.logic.track()
)
StateMachineUpdate.Added(stateMachineInfo)
}
AddOrRemove.REMOVE -> {
StateMachineUpdate.Removed(change.id)
}
}
}
}
}

View File

@ -2,12 +2,14 @@ package net.corda.node.internal
import com.codahale.metrics.JmxReporter
import net.corda.core.div
import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.then
import net.corda.core.utilities.loggerFor
import net.corda.node.printBasicNodeInfo
import net.corda.node.serialization.NodeClock
@ -17,8 +19,9 @@ import net.corda.node.services.api.MessagingServiceInternal
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAddress
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.CordaRPCClient
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.messaging.RPCOps
import net.corda.node.services.startFlowPermission
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.transactions.RaftUniquenessProvider
import net.corda.node.services.transactions.RaftValidatingNotaryService
@ -50,6 +53,7 @@ import java.util.*
import javax.management.ObjectName
import javax.servlet.*
import kotlin.concurrent.thread
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
class ConfigurationException(message: String) : Exception(message)
@ -120,6 +124,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
override fun makeMessagingService(): MessagingServiceInternal {
userService = RPCUserServiceImpl(configuration)
val serverAddr = with(configuration) {
messagingServerAddress ?: {
messageBroker = ArtemisMessagingServer(this, artemisAddress, services.networkMapCache, userService)
@ -146,7 +151,8 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
net.start(rpcOps, userService)
}
private fun initWebServer(): Server {
// TODO: add flag to enable/disable webserver
private fun initWebServer(localRpc: CordaRPCOps): Server {
// Note that the web server handlers will all run concurrently, and not on the node thread.
val handlerCollection = HandlerCollection()
@ -167,7 +173,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
}
// API, data upload and download to services (attachments, rates oracles etc)
handlerCollection.addHandler(buildServletContextHandler())
handlerCollection.addHandler(buildServletContextHandler(localRpc))
val server = Server()
@ -204,7 +210,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
return server
}
private fun buildServletContextHandler(): ServletContextHandler {
private fun buildServletContextHandler(localRpc: CordaRPCOps): ServletContextHandler {
return ServletContextHandler().apply {
contextPath = "/"
setAttribute("node", this@Node)
@ -219,17 +225,11 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
val webAPIsOnClasspath = pluginRegistries.flatMap { x -> x.webApis }
for (webapi in webAPIsOnClasspath) {
log.info("Add plugin web API from attachment ${webapi.name}")
val constructor = try {
webapi.getConstructor(ServiceHub::class.java)
} catch (ex: NoSuchMethodException) {
log.error("Missing constructor ${webapi.name}(ServiceHub)")
continue
}
log.info("Add plugin web API from attachment $webapi")
val customAPI = try {
constructor.newInstance(services)
webapi.apply(localRpc)
} catch (ex: InvocationTargetException) {
log.error("Constructor ${webapi.name}(ServiceHub) threw an error: ", ex.targetException)
log.error("Constructor $webapi threw an error: ", ex.targetException)
continue
}
resourceConfig.register(customAPI)
@ -299,13 +299,20 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
super.initialiseDatabasePersistence(insideTransaction)
}
private fun connectLocalRpcAsNodeUser(): CordaRPCOps {
val client = CordaRPCClient(configuration.artemisAddress, configuration)
client.start(NODE_USER, NODE_USER)
return client.proxy()
}
override fun start(): Node {
alreadyRunningNodeCheck()
super.start()
// Only start the service API requests once the network map registration is complete
networkMapRegistrationFuture.then {
thread(name = "WebServer") {
networkMapRegistrationFuture.getOrThrow()
try {
webServer = initWebServer()
webServer = initWebServer(connectLocalRpcAsNodeUser())
} catch(ex: Exception) {
// TODO: We need to decide if this is a fatal error, given the API is unavailable, or whether the API
// is not critical and we continue anyway.

View File

@ -1,13 +1,14 @@
package net.corda.node.services
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.node.PluginServiceHub
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.flows.NotaryChangeFlow
import net.corda.core.node.CordaPluginRegistry
import java.util.function.Function
object NotaryChange {
class Plugin : CordaPluginRegistry() {
override val servicePlugins: List<Class<*>> = listOf(Service::class.java)
override val servicePlugins = listOf(Function(::Service))
}
/**

View File

@ -28,5 +28,6 @@ data class User(val username: String, val password: String, val permissions: Set
override fun toString(): String = "${javaClass.simpleName}($username, permissions=$permissions)"
}
fun <P : FlowLogic<*>> startFlowPermission(clazz: Class<P>) = "StartFlow.${clazz.name}"
fun startFlowPermission(className: String) = "StartFlow.$className"
fun <P : FlowLogic<*>> startFlowPermission(clazz: Class<P>) = startFlowPermission(clazz.name)
inline fun <reified P : FlowLogic<*>> startFlowPermission(): String = startFlowPermission(P::class.java)

View File

@ -184,7 +184,9 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
securityRoles["$INTERNAL_PREFIX#"] = setOf(nodeInternalRole) // Do not add any other roles here as it's only for the node
securityRoles[P2P_QUEUE] = setOf(nodeInternalRole, restrictedRole(PEER_ROLE, send = true))
securityRoles[RPC_REQUESTS_QUEUE] = setOf(nodeInternalRole, restrictedRole(RPC_ROLE, send = true))
for ((username) in userService.users) {
// TODO remove NODE_USER once webserver doesn't need it
val possibleClientUserNames = userService.users.map { it.username } + listOf(NODE_USER)
for (username in possibleClientUserNames) {
securityRoles["$CLIENTS_PREFIX$username.rpc.*"] = setOf(
nodeInternalRole,
restrictedRole("$CLIENTS_PREFIX$username", consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true))
@ -344,7 +346,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
principals += RolePrincipal("$CLIENTS_PREFIX$username") // This enables the RPC client to receive responses
username
}
principals += UserPrincipal(validatedUser)
loginSucceeded = true

View File

@ -0,0 +1,115 @@
package net.corda.node.services.messaging
import com.google.common.net.HostAndPort
import net.corda.core.ThreadBox
import net.corda.core.messaging.CordaRPCOps
import net.corda.node.services.config.NodeSSLConfiguration
import org.apache.activemq.artemis.api.core.ActiveMQException
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory
import rx.Observable
import java.io.Closeable
import java.time.Duration
import javax.annotation.concurrent.ThreadSafe
/**
* An RPC client connects to the specified server and allows you to make calls to the server that perform various
* useful tasks. See the documentation for [proxy] or review the docsite to learn more about how this API works.
*/
@ThreadSafe
class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfiguration) : Closeable, ArtemisMessagingComponent() {
// TODO: Certificate handling for clients needs more work.
private inner class State {
var running = false
lateinit var sessionFactory: ClientSessionFactory
lateinit var session: ClientSession
lateinit var clientImpl: CordaRPCClientImpl
}
private val state = ThreadBox(State())
/** Opens the connection to the server and registers a JVM shutdown hook to cleanly disconnect. */
@Throws(ActiveMQException::class)
fun start(username: String, password: String) {
state.locked {
check(!running)
checkStorePasswords()
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(ConnectionDirection.OUTBOUND, host.hostText, host.port))
serverLocator.threadPoolMaxSize = 1
// TODO: Configure session reconnection, confirmation window sizes and other Artemis features.
// This will allow reconnection in case of server restart/network outages/IP address changes, etc.
// See http://activemq.apache.org/artemis/docs/1.5.0/client-reconnection.html
sessionFactory = serverLocator.createSessionFactory()
session = sessionFactory.createSession(username, password, false, true, true, serverLocator.isPreAcknowledge, serverLocator.ackBatchSize)
session.start()
clientImpl = CordaRPCClientImpl(session, state.lock, username)
running = true
}
Runtime.getRuntime().addShutdownHook(Thread {
close()
})
}
/** Shuts down the client and lets the server know it can free the used resources (in a nice way) */
override fun close() {
state.locked {
if (!running) return
session.close()
sessionFactory.close()
running = false
}
}
/**
* Returns a fresh proxy that lets you invoke RPCs on the server. Calls on it block, and if the server throws an
* exception then it will be rethrown on the client. Proxies are thread safe but only one RPC can be in flight at
* once. If you'd like to perform multiple RPCs in parallel, use this function multiple times to get multiple
* proxies.
*
* Creation of a proxy is a somewhat expensive operation that involves calls to the server, so if you want to do
* calls from many threads at once you should cache one proxy per thread and reuse them. This function itself is
* thread safe though so requires no extra synchronisation.
*
* RPC sends and receives are logged on the net.corda.rpc logger.
*
* By default there are no timeouts on calls. This is deliberate, RPCs without timeouts can survive restarts,
* maintenance downtime and moves of the server. RPCs can survive temporary losses or changes in client connectivity,
* like switching between wifi networks. You can specify a timeout on the level of a proxy. If a call times
* out it will throw [RPCException.Deadline].
*
* The [CordaRPCOps] defines what client RPCs are available. If an RPC returns an [Observable] anywhere in the
* object graph returned then the server-side observable is transparently linked to a messaging queue, and that
* queue linked to another observable on the client side here. *You are expected to use it*. The server will begin
* buffering messages immediately that it will expect you to drain by subscribing to the returned observer. You can
* opt-out of this by simply casting the [Observable] to [Closeable] or [AutoCloseable] and then calling the close
* method on it. You don't have to explicitly close the observable if you actually subscribe to it: it will close
* itself and free up the server-side resources either when the client or JVM itself is shutdown, or when there are
* no more subscribers to it. Once all the subscribers to a returned observable are unsubscribed, the observable is
* closed and you can't then re-subscribe again: you'll have to re-request a fresh observable with another RPC.
*
* The proxy and linked observables consume some small amount of resources on the server. It's OK to just exit your
* process and let the server clean up, but in a long running process where you only need something for a short
* amount of time it is polite to cast the objects to [Closeable] or [AutoCloseable] and close it when you are done.
* Finalizers are in place to warn you if you lose a reference to an unclosed proxy or observable.
*
* @throws RPCException if the server version is too low or if the server isn't reachable within the given time.
*/
@Throws(RPCException::class)
fun proxy(timeout: Duration? = null, minVersion: Int = 0): CordaRPCOps {
return state.locked {
check(running) { "Client must have been started first" }
clientImpl.proxyFor(CordaRPCOps::class.java, timeout, minVersion)
}
}
private fun finalize() {
state.locked {
if (running) {
rpcLog.warn("A CordaMQClient is being finalised whilst still running, did you forget to call close?")
close()
}
}
}
}

View File

@ -0,0 +1,336 @@
package net.corda.node.services.messaging
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.google.common.cache.CacheBuilder
import net.corda.core.ErrorOr
import net.corda.core.bufferUntilSubscribed
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.RPCReturnsObservables
import net.corda.core.random63BitValue
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.debug
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import rx.Observable
import rx.subjects.PublishSubject
import java.io.Closeable
import java.lang.reflect.InvocationHandler
import java.lang.reflect.Method
import java.lang.reflect.Proxy
import java.time.Duration
import java.util.*
import java.util.concurrent.locks.ReentrantLock
import javax.annotation.concurrent.GuardedBy
import javax.annotation.concurrent.ThreadSafe
import kotlin.concurrent.withLock
import kotlin.reflect.jvm.javaMethod
/**
* Core RPC engine implementation, to learn how to use RPC you should be looking at [CordaRPCClient].
*
* # Design notes
*
* The way RPCs are handled is fairly standard except for the handling of observables. When an RPC might return
* an [Observable] it is specially tagged. This causes the client to create a new transient queue for the
* receiving of observables and their observations with a random ID in the name. This ID is sent to the server in
* a message header. All observations are sent via this single queue.
*
* The reason for doing it this way and not the more obvious approach of one-queue-per-observable is that we want
* the queues to be *transient*, meaning their lifetime in the broker is tied to the session that created them.
* A server side observable and its associated queue is not a cost-free thing, let alone the memory and resources
* needed to actually generate the observations themselves, therefore we want to ensure these cannot leak. A
* transient queue will be deleted automatically if the client session terminates, which by default happens on
* disconnect but can also be configured to happen after a short delay (this allows clients to e.g. switch IP
* address). On the server the deletion of the observations queue triggers unsubscription from the associated
* observables, which in turn may then be garbage collected.
*
* Creating a transient queue requires a roundtrip to the broker and thus doing an RPC that could return
* observables takes two server roundtrips instead of one. That's why we require RPCs to be marked with
* [RPCReturnsObservables] as needing this special treatment instead of always doing it.
*
* If the Artemis/JMS APIs allowed us to create transient queues assigned to someone else then we could
* potentially use a different design in which the node creates new transient queues (one per observable) on the
* fly. The client would then have to watch out for this and start consuming those queues as they were created.
*
* We use one queue per RPC because we don't know ahead of time how many observables the server might return and
* often the server doesn't know either, which pushes towards a single queue design, but at the same time the
* processing of observations returned by an RPC might be striped across multiple threads and we'd like
* backpressure management to not be scoped per client process but with more granularity. So we end up with
* a compromise where the unit of backpressure management is the response to a single RPC.
*
* TODO: Backpressure isn't propagated all the way through the MQ broker at the moment.
*/
class CordaRPCClientImpl(private val session: ClientSession,
private val sessionLock: ReentrantLock,
private val username: String) {
companion object {
private val closeableCloseMethod = Closeable::close.javaMethod
private val autocloseableCloseMethod = AutoCloseable::close.javaMethod
}
/**
* Builds a proxy for the given type, which must descend from [RPCOps].
*
* @see CordaRPCClient.proxy for more information about how to use the proxies.
*/
fun <T : RPCOps> proxyFor(rpcInterface: Class<T>, timeout: Duration? = null, minVersion: Int = 0): T {
sessionLock.withLock {
if (producer == null)
producer = session.createProducer()
}
val proxyImpl = RPCProxyHandler(timeout)
@Suppress("UNCHECKED_CAST")
val proxy = Proxy.newProxyInstance(rpcInterface.classLoader, arrayOf(rpcInterface, Closeable::class.java), proxyImpl) as T
proxyImpl.serverProtocolVersion = proxy.protocolVersion
if (minVersion > proxyImpl.serverProtocolVersion)
throw RPCException("Requested minimum protocol version $minVersion is higher than the server's supported protocol version (${proxyImpl.serverProtocolVersion})")
return proxy
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
//region RPC engine
//
// You can find docs on all this in the api doc for the proxyFor method, and in the docsite.
// Utility to quickly suck out the contents of an Artemis message. There's probably a more efficient way to
// do this.
private fun <T : Any> ClientMessage.deserialize(kryo: Kryo): T = ByteArray(bodySize).apply { bodyBuffer.readBytes(this) }.deserialize(kryo)
@GuardedBy("sessionLock")
private val addressToQueueObservables = CacheBuilder.newBuilder().build<String, QueuedObservable>()
private var producer: ClientProducer? = null
private inner class ObservableDeserializer(private val qName: String,
private val rpcName: String,
private val rpcLocation: Throwable) : Serializer<Observable<Any>>() {
override fun read(kryo: Kryo, input: Input, type: Class<Observable<Any>>): Observable<Any> {
val handle = input.readInt(true)
val ob = sessionLock.withLock {
addressToQueueObservables.getIfPresent(qName) ?: QueuedObservable(qName, rpcName, rpcLocation, this).apply {
addressToQueueObservables.put(qName, this)
}
}
val result = ob.getForHandle(handle)
rpcLog.debug { "Deserializing and connecting a new observable for $rpcName on $qName: $result" }
return result
}
override fun write(kryo: Kryo, output: Output, `object`: Observable<Any>) {
throw UnsupportedOperationException("not implemented")
}
}
/**
* The proxy class returned to the client is auto-generated on the fly by the java.lang.reflect Proxy
* infrastructure. The JDK Proxy class writes bytecode into memory for a class that implements the requested
* interfaces and then routes all method calls to the invoke method below in a conveniently reified form.
* We can then easily take the data about the method call and turn it into an RPC. This avoids the need
* for the compile-time code generation which is so common in RPC systems.
*/
@ThreadSafe
private inner class RPCProxyHandler(private val timeout: Duration?) : InvocationHandler, Closeable {
private val proxyId = random63BitValue()
private val consumer: ClientConsumer
var serverProtocolVersion = 0
init {
val proxyAddress = constructAddress(proxyId)
consumer = sessionLock.withLock {
session.createTemporaryQueue(proxyAddress, proxyAddress)
session.createConsumer(proxyAddress)
}
}
private fun constructAddress(addressId: Long) = "${ArtemisMessagingComponent.CLIENTS_PREFIX}$username.rpc.$addressId"
@Synchronized
override fun invoke(proxy: Any, method: Method, args: Array<out Any>?): Any? {
if (isCloseInvocation(method)) {
close()
return null
}
if (method.name == "toString" && args == null)
return "Client RPC proxy"
if (consumer.isClosed)
throw RPCException("RPC Proxy is closed")
// All invoked methods on the proxy end up here.
val location = Throwable()
rpcLog.debug {
val argStr = args?.joinToString() ?: ""
"-> RPC -> ${method.name}($argStr): ${method.returnType}"
}
checkMethodVersion(method)
// sendRequest may return a reconfigured Kryo if the method returns observables.
val kryo: Kryo = sendRequest(args, location, method) ?: createRPCKryo()
val next: ErrorOr<*> = receiveResponse(kryo, method, timeout)
rpcLog.debug { "<- RPC <- ${method.name} = $next" }
return unwrapOrThrow(next)
}
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
private fun unwrapOrThrow(next: ErrorOr<*>): Any? {
val ex = next.error
if (ex != null) {
// Replace the stack trace because that's an implementation detail of the server that isn't so
// helpful to the user who wants to see where the error was on their side, and serialising stack
// frame objects is a bit annoying. We slice it here to avoid the invoke() machinery being exposed.
// The resulting exception looks like it was thrown from inside the called method.
(ex as java.lang.Throwable).stackTrace = java.lang.Throwable().stackTrace.let { it.sliceArray(1..it.size - 1) }
throw ex
} else {
return next.value
}
}
private fun receiveResponse(kryo: Kryo, method: Method, timeout: Duration?): ErrorOr<*> {
val artemisMessage: ClientMessage =
if (timeout == null)
consumer.receive() ?: throw ActiveMQObjectClosedException()
else
consumer.receive(timeout.toMillis()) ?: throw RPCException.DeadlineExceeded(method.name)
artemisMessage.acknowledge()
val next = artemisMessage.deserialize<ErrorOr<*>>(kryo)
return next
}
private fun sendRequest(args: Array<out Any>?, location: Throwable, method: Method): Kryo? {
// We could of course also check the return type of the method to see if it's Observable, but I'd
// rather haved the annotation be used consistently.
val returnsObservables = method.isAnnotationPresent(RPCReturnsObservables::class.java)
sessionLock.withLock {
val msg: ClientMessage = createMessage(method)
val kryo = if (returnsObservables) maybePrepareForObservables(location, method, msg) else null
val serializedArgs = try {
(args ?: emptyArray<Any?>()).serialize(createRPCKryo())
} catch (e: KryoException) {
throw RPCException("Could not serialize RPC arguments", e)
}
msg.writeBodyBufferBytes(serializedArgs.bytes)
producer!!.send(ArtemisMessagingComponent.RPC_REQUESTS_QUEUE, msg)
return kryo
}
}
private fun maybePrepareForObservables(location: Throwable, method: Method, msg: ClientMessage): Kryo {
// Create a temporary queue just for the emissions on any observables that are returned.
val observationsId = random63BitValue()
val observationsQueueName = constructAddress(observationsId)
session.createTemporaryQueue(observationsQueueName, observationsQueueName)
msg.putLongProperty(ClientRPCRequestMessage.OBSERVATIONS_TO, observationsId)
// And make sure that we deserialise observable handles so that they're linked to the right
// queue. Also record a bit of metadata for debugging purposes.
return createRPCKryo(observableSerializer = ObservableDeserializer(observationsQueueName, method.name, location))
}
private fun createMessage(method: Method): ClientMessage {
return session.createMessage(false).apply {
putStringProperty(ClientRPCRequestMessage.METHOD_NAME, method.name)
putLongProperty(ClientRPCRequestMessage.REPLY_TO, proxyId)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
}
}
private fun checkMethodVersion(method: Method) {
val methodVersion = method.getAnnotation(RPCSinceVersion::class.java)?.version ?: 0
if (methodVersion > serverProtocolVersion)
throw UnsupportedOperationException("Method ${method.name} was added in RPC protocol version $methodVersion but the server is running $serverProtocolVersion")
}
private fun isCloseInvocation(method: Method) = method == closeableCloseMethod || method == autocloseableCloseMethod
override fun close() {
consumer.close()
sessionLock.withLock { session.deleteQueue(constructAddress(proxyId)) }
}
override fun toString() = "Corda RPC Proxy listening on queue ${constructAddress(proxyId)}"
}
/**
* When subscribed to, starts consuming from the given queue name and demultiplexing the observables being
* sent to it. The server queue is moved into in-memory buffers (one per attached server-side observable)
* until drained through a subscription. When the subscriptions are all gone, the server-side queue is deleted.
*/
@ThreadSafe
private inner class QueuedObservable(private val qName: String,
private val rpcName: String,
private val rpcLocation: Throwable,
private val observableDeserializer: ObservableDeserializer) {
private val root = PublishSubject.create<MarshalledObservation>()
private val rootShared = root.doOnUnsubscribe { close() }.share()
// This could be made more efficient by using a specialised IntMap
private val observables = HashMap<Int, Observable<Any>>()
private var consumer: ClientConsumer? = sessionLock.withLock { session.createConsumer(qName) }.setMessageHandler { deliver(it) }
@Synchronized
fun getForHandle(handle: Int): Observable<Any> {
return observables.getOrPut(handle) {
/**
* Note that the order of bufferUntilSubscribed() -> dematerialize() is very important here.
*
* In particular doing it the other way around may result in the following edge case:
* The RPC returns two (or more) Observables. The first Observable unsubscribes *during serialisation*,
* before the second one is hit, causing the [rootShared] to unsubscribe and consequently closing
* the underlying artemis queue, even though the second Observable was not even registered.
*
* The buffer -> dematerialize order ensures that the Observable may not unsubscribe until the caller
* subscribes, which must be after full deserialisation and registering of all top level Observables.
*/
rootShared.filter { it.forHandle == handle }.map { it.what }.bufferUntilSubscribed().dematerialize<Any>().share()
}
}
private fun deliver(msg: ClientMessage) {
msg.acknowledge()
val kryo = createRPCKryo(observableSerializer = observableDeserializer)
val received: MarshalledObservation = msg.deserialize(kryo)
rpcLog.debug { "<- Observable [$rpcName] <- Received $received" }
synchronized(this) {
// Force creation of the buffer if it doesn't already exist.
getForHandle(received.forHandle)
root.onNext(received)
}
}
@Synchronized
fun close() {
rpcLog.debug("Closing queue observable for call to $rpcName : $qName")
consumer?.close()
consumer = null
sessionLock.withLock { session.deleteQueue(qName) }
}
@Suppress("UNUSED")
fun finalize() {
val c = synchronized(this) { consumer }
if (c != null) {
rpcLog.warn("A hot observable returned from an RPC ($rpcName) was never subscribed to or explicitly closed. " +
"This wastes server-side resources because it was queueing observations for retrieval. " +
"It is being closed now, but please adjust your code to cast the observable to AutoCloseable and then close it explicitly.", rpcLocation)
c.close()
}
}
}
//endregion
}

View File

@ -1,165 +0,0 @@
package net.corda.node.services.messaging
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.StateMachineTransactionMapping
import net.corda.core.node.services.Vault
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.AddOrRemove
import rx.Observable
data class StateMachineInfo(
val id: StateMachineRunId,
val flowLogicClassName: String,
val progressTrackerStepAndUpdates: Pair<String, Observable<String>>?
) {
companion object {
fun fromFlowStateMachineImpl(psm: FlowStateMachineImpl<*>): StateMachineInfo {
return StateMachineInfo(
id = psm.id,
flowLogicClassName = psm.logic.javaClass.simpleName,
progressTrackerStepAndUpdates = psm.logic.track()
)
}
}
}
sealed class StateMachineUpdate(val id: StateMachineRunId) {
class Added(val stateMachineInfo: StateMachineInfo) : StateMachineUpdate(stateMachineInfo.id)
class Removed(id: StateMachineRunId) : StateMachineUpdate(id)
companion object {
fun fromStateMachineChange(change: StateMachineManager.Change): StateMachineUpdate {
return when (change.addOrRemove) {
AddOrRemove.ADD -> {
val stateMachineInfo = StateMachineInfo(
id = change.id,
flowLogicClassName = change.logic.javaClass.simpleName,
progressTrackerStepAndUpdates = change.logic.track()
)
StateMachineUpdate.Added(stateMachineInfo)
}
AddOrRemove.REMOVE -> {
StateMachineUpdate.Removed(change.id)
}
}
}
}
}
/**
* RPC operations that the node exposes to clients using the Java client library. These can be called from
* client apps and are implemented by the node in the [CordaRPCOpsImpl] class.
*/
interface CordaRPCOps : RPCOps {
/**
* Returns a pair of currently in-progress state machine infos and an observable of future state machine adds/removes.
*/
@RPCReturnsObservables
fun stateMachinesAndUpdates(): Pair<List<StateMachineInfo>, Observable<StateMachineUpdate>>
/**
* Returns a pair of head states in the vault and an observable of future updates to the vault.
*/
@RPCReturnsObservables
fun vaultAndUpdates(): Pair<List<StateAndRef<ContractState>>, Observable<Vault.Update>>
/**
* Returns a pair of all recorded transactions and an observable of future recorded ones.
*/
@RPCReturnsObservables
fun verifiedTransactions(): Pair<List<SignedTransaction>, Observable<SignedTransaction>>
/**
* Returns a snapshot list of existing state machine id - recorded transaction hash mappings, and a stream of future
* such mappings as well.
*/
@RPCReturnsObservables
fun stateMachineRecordedTransactionMapping(): Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>>
/**
* Returns all parties currently visible on the network with their advertised services and an observable of future updates to the network.
*/
@RPCReturnsObservables
fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>>
/**
* Start the given flow with the given arguments, returning an [Observable] with a single observation of the
* result of running the flow.
*/
@RPCReturnsObservables
fun <T : Any> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T>
/**
* Returns Node's identity, assuming this will not change while the node is running.
*/
fun nodeIdentity(): NodeInfo
/*
* Add note(s) to an existing Vault transaction
*/
fun addVaultTransactionNote(txnId: SecureHash, txnNote: String)
/*
* Retrieve existing note(s) for a given Vault transaction
*/
fun getVaultTransactionNotes(txnId: SecureHash): Iterable<String>
}
/**
* These allow type safe invocations of flows from Kotlin, e.g.:
*
* val rpc: CordaRPCOps = (..)
* rpc.startFlow(::ResolveTransactionsFlow, setOf<SecureHash>(), aliceIdentity)
*
* Note that the passed in constructor function is only used for unification of other type parameters and reification of
* the Class instance of the flow. This could be changed to use the constructor function directly.
*/
inline fun <T : Any, reified R : FlowLogic<T>> CordaRPCOps.startFlow(
@Suppress("UNUSED_PARAMETER")
flowConstructor: () -> R
) = startFlowDynamic(R::class.java)
inline fun <T : Any, A, reified R : FlowLogic<T>> CordaRPCOps.startFlow(
@Suppress("UNUSED_PARAMETER")
flowConstructor: (A) -> R,
arg0: A
) = startFlowDynamic(R::class.java, arg0)
inline fun <T : Any, A, B, reified R : FlowLogic<T>> CordaRPCOps.startFlow(
@Suppress("UNUSED_PARAMETER")
flowConstructor: (A, B) -> R,
arg0: A,
arg1: B
) = startFlowDynamic(R::class.java, arg0, arg1)
inline fun <T : Any, A, B, C, reified R : FlowLogic<T>> CordaRPCOps.startFlow(
@Suppress("UNUSED_PARAMETER")
flowConstructor: (A, B, C) -> R,
arg0: A,
arg1: B,
arg2: C
) = startFlowDynamic(R::class.java, arg0, arg1, arg2)
inline fun <T : Any, A, B, C, D, reified R : FlowLogic<T>> CordaRPCOps.startFlow(
@Suppress("UNUSED_PARAMETER")
flowConstructor: (A, B, C, D) -> R,
arg0: A,
arg1: B,
arg2: C,
arg3: D
) = startFlowDynamic(R::class.java, arg0, arg1, arg2, arg3)
data class FlowHandle<A>(
val id: StateMachineRunId,
val progress: Observable<ProgressTracker.Change>,
val returnValue: Observable<A>
)

View File

@ -158,7 +158,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
session.createTemporaryQueue(NOTIFICATIONS_ADDRESS, RPC_QUEUE_REMOVALS_QUEUE, "_AMQ_NotifType = 1")
rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE)
rpcNotificationConsumer = session.createConsumer(RPC_QUEUE_REMOVALS_QUEUE)
rpcDispatcher = createRPCDispatcher(rpcOps, userService)
rpcDispatcher = createRPCDispatcher(rpcOps, userService, config.myLegalName)
}
}
@ -436,16 +436,17 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
}
private fun createRPCDispatcher(ops: RPCOps, userService: RPCUserService) = object : RPCDispatcher(ops, userService) {
override fun send(data: SerializedBytes<*>, toAddress: String) {
state.locked {
val msg = session!!.createMessage(false).apply {
writeBodyBufferBytes(data.bytes)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
private fun createRPCDispatcher(ops: RPCOps, userService: RPCUserService, nodeLegalName: String) =
object : RPCDispatcher(ops, userService, nodeLegalName) {
override fun send(data: SerializedBytes<*>, toAddress: String) {
state.locked {
val msg = session!!.createMessage(false).apply {
writeBodyBufferBytes(data.bytes)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
}
producer!!.send(toAddress, msg)
}
}
producer!!.send(toAddress, msg)
}
}
}
}

View File

@ -8,16 +8,22 @@ import com.esotericsoftware.kryo.io.Output
import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.HashMultimap
import net.corda.core.ErrorOr
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.RPCReturnsObservables
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.debug
import net.corda.node.services.RPCUserService
import net.corda.node.services.User
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.node.utilities.AffinityExecutor
import org.apache.activemq.artemis.api.core.Message
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.bouncycastle.asn1.x500.X500Name
import org.bouncycastle.asn1.x500.style.BCStyle
import rx.Notification
import rx.Observable
import rx.Subscription
@ -30,7 +36,8 @@ import java.util.concurrent.atomic.AtomicInteger
* wrong system (you could just send a message). If you want complex customisation of how requests/responses
* are handled, this is probably the wrong system.
*/
abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService) {
// TODO remove the nodeLegalName parameter once the webserver doesn't need special privileges
abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService, val nodeLegalName: String) {
// Throw an exception if there are overloaded methods
private val methodTable = ops.javaClass.declaredMethods.groupBy { it.name }.mapValues { it.value.single() }
@ -153,9 +160,19 @@ abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService) {
return ClientRPCRequestMessage(SerializedBytes(argBytes), replyTo, observationsTo, methodName, user)
}
// TODO remove this User once webserver doesn't need it
val nodeUser = User(NODE_USER, NODE_USER, setOf())
@VisibleForTesting
protected open fun getUser(message: ClientMessage): User {
return userService.getUser(message.requiredString(Message.HDR_VALIDATED_USER.toString()))!!
val validatedUser = message.requiredString(Message.HDR_VALIDATED_USER.toString())
val rpcUser = userService.getUser(validatedUser)
if (rpcUser != null) {
return rpcUser
} else if (X500Name(validatedUser).getRDNs(BCStyle.CN).first().first.value.toString() == nodeLegalName) {
return nodeUser
} else {
throw IllegalArgumentException("Validated user '$validatedUser' is not an RPC user nor the NODE user")
}
}
private fun ClientMessage.getReturnAddress(user: User, property: String, required: Boolean): String? {

View File

@ -20,6 +20,9 @@ import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.StateMachineInfo
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.node.*
import net.corda.core.node.services.*
import net.corda.core.serialization.*
@ -28,6 +31,7 @@ import net.corda.core.transactions.WireTransaction
import net.corda.flows.CashFlowResult
import net.corda.node.internal.AbstractNode
import net.corda.node.services.User
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
import net.i2p.crypto.eddsa.EdDSAPrivateKey
import net.i2p.crypto.eddsa.EdDSAPublicKey
import org.apache.activemq.artemis.api.core.SimpleString
@ -37,6 +41,7 @@ import org.slf4j.LoggerFactory
import rx.Notification
import rx.Observable
import java.time.Instant
import java.time.LocalDateTime
import java.util.*
/** Global RPC logger */
@ -45,15 +50,6 @@ val rpcLog: Logger by lazy { LoggerFactory.getLogger("net.corda.rpc") }
/** Used in the RPC wire protocol to wrap an observation with the handle of the observable it's intended for. */
data class MarshalledObservation(val forHandle: Int, val what: Notification<*>)
/**
* If an RPC is tagged with this annotation it may return one or more observables anywhere in its response graph.
* Calling such a method comes with consequences: it's slower, and consumes server side resources as observations
* will buffer up on the server until they're consumed by the client.
*/
@Target(AnnotationTarget.FUNCTION)
@MustBeDocumented
annotation class RPCReturnsObservables
/** Records the protocol version in which this RPC was added. */
@Target(AnnotationTarget.FUNCTION)
@MustBeDocumented
@ -74,15 +70,6 @@ data class ClientRPCRequestMessage(
}
}
/**
* Base interface that all RPC servers must implement. Note: in Corda there's only one RPC interface. This base
* interface is here in case we split the RPC system out into a separate library one day.
*/
interface RPCOps {
/** Returns the RPC protocol version. Exists since version 0 so guaranteed to be present. */
val protocolVersion: Int
}
/**
* This is available to RPC implementations to query the validated [User] that is calling it. Each user has a set of
* permissions they're entitled to which can be used to control access.
@ -92,8 +79,11 @@ val CURRENT_RPC_USER: ThreadLocal<User> = ThreadLocal()
/** Helper method which checks that the current RPC user is entitled for the given permission. Throws a [PermissionException] otherwise. */
fun requirePermission(permission: String) {
if (permission !in CURRENT_RPC_USER.get().permissions) {
throw PermissionException("User not permissioned for $permission")
// TODO remove the NODE_USER condition once webserver doesn't need it
val currentUser = CURRENT_RPC_USER.get()
val currentUserPermissions = currentUser.permissions
if (currentUser.username != NODE_USER && permission !in currentUserPermissions) {
throw PermissionException("User not permissioned for $permission, permissions are $currentUserPermissions")
}
}
@ -236,6 +226,7 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
register(FlowHandle::class.java)
register(KryoException::class.java)
register(StringBuffer::class.java)
register(Unit::class.java)
for ((_flow, argumentTypes) in AbstractNode.defaultFlowWhiteList) {
for (type in argumentTypes) {
register(type)

View File

@ -3,19 +3,20 @@ package net.corda.node.services.persistence
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowLogic
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.node.PluginServiceHub
import net.corda.core.node.recordTransactions
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.loggerFor
import net.corda.flows.*
import net.corda.core.node.CordaPluginRegistry
import java.io.InputStream
import javax.annotation.concurrent.ThreadSafe
import java.util.function.Function
object DataVending {
class Plugin : CordaPluginRegistry() {
override val servicePlugins: List<Class<*>> = listOf(Service::class.java)
override val servicePlugins = listOf(Function(::Service))
}
/**
@ -37,8 +38,6 @@ object DataVending {
val logger = loggerFor<DataVending.Service>()
}
class TransactionRejectedError(msg: String) : Exception(msg)
init {
services.registerFlowInitiator(FetchTransactionsFlow::class, ::FetchTransactionsHandler)
services.registerFlowInitiator(FetchAttachmentsFlow::class, ::FetchAttachmentsHandler)

View File

@ -1,3 +1,3 @@
# Register a ServiceLoader service extending from net.corda.node.CordaPluginRegistry
# Register a ServiceLoader service extending from net.corda.core.node.CordaPluginRegistry
net.corda.node.services.NotaryChange$Plugin
net.corda.node.services.persistence.DataVending$Plugin

View File

@ -3,6 +3,8 @@ package net.corda.node
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.*
import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.startFlow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.Vault
import net.corda.core.serialization.OpaqueBytes
@ -13,8 +15,6 @@ import net.corda.node.internal.CordaRPCOpsImpl
import net.corda.node.services.User
import net.corda.node.services.messaging.CURRENT_RPC_USER
import net.corda.node.services.messaging.PermissionException
import net.corda.node.services.messaging.StateMachineUpdate
import net.corda.node.services.messaging.startFlow
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.startFlowPermission
import net.corda.node.services.transactions.SimpleNotaryService

View File

@ -0,0 +1,225 @@
package net.corda.node.messaging
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.RPCReturnsObservables
import net.corda.core.serialization.SerializedBytes
import net.corda.core.utilities.LogHelper
import net.corda.node.services.RPCUserService
import net.corda.node.services.User
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
import net.corda.node.services.messaging.CURRENT_RPC_USER
import net.corda.node.services.messaging.CordaRPCClientImpl
import net.corda.node.services.messaging.RPCDispatcher
import net.corda.node.services.messaging.RPCSinceVersion
import net.corda.node.utilities.AffinityExecutor
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Test
import rx.Observable
import rx.subjects.PublishSubject
import java.io.Closeable
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.locks.ReentrantLock
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
class ClientRPCInfrastructureTests {
// TODO: Test that timeouts work
lateinit var artemis: EmbeddedActiveMQ
lateinit var serverSession: ClientSession
lateinit var clientSession: ClientSession
lateinit var producer: ClientProducer
lateinit var serverThread: AffinityExecutor.ServiceAffinityExecutor
lateinit var proxy: TestOps
private val authenticatedUser = User("test", "password", permissions = setOf())
@Before
fun setup() {
// Set up an in-memory Artemis with an RPC requests queue.
artemis = EmbeddedActiveMQ()
artemis.setConfiguration(ConfigurationImpl().apply {
acceptorConfigurations = setOf(TransportConfiguration(InVMAcceptorFactory::class.java.name))
isSecurityEnabled = false
isPersistenceEnabled = false
})
artemis.start()
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(TransportConfiguration(InVMConnectorFactory::class.java.name))
val sessionFactory = serverLocator.createSessionFactory()
serverSession = sessionFactory.createSession()
serverSession.start()
serverSession.createTemporaryQueue(RPC_REQUESTS_QUEUE, RPC_REQUESTS_QUEUE)
producer = serverSession.createProducer()
val userService = object : RPCUserService {
override fun getUser(username: String): User? = throw UnsupportedOperationException()
override val users: List<User> get() = throw UnsupportedOperationException()
}
val dispatcher = object : RPCDispatcher(TestOpsImpl(), userService, "SomeName") {
override fun send(data: SerializedBytes<*>, toAddress: String) {
val msg = serverSession.createMessage(false).apply {
writeBodyBufferBytes(data.bytes)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
}
producer.send(toAddress, msg)
}
override fun getUser(message: ClientMessage): User = authenticatedUser
}
serverThread = AffinityExecutor.ServiceAffinityExecutor("unit-tests-rpc-dispatch-thread", 1)
val serverConsumer = serverSession.createConsumer(RPC_REQUESTS_QUEUE)
serverSession.createTemporaryQueue("activemq.notifications", "rpc.qremovals", "_AMQ_NotifType = 'BINDING_REMOVED'")
val serverNotifConsumer = serverSession.createConsumer("rpc.qremovals")
dispatcher.start(serverConsumer, serverNotifConsumer, serverThread)
clientSession = sessionFactory.createSession()
clientSession.start()
LogHelper.setLevel("+net.corda.rpc")
proxy = CordaRPCClientImpl(clientSession, ReentrantLock(), authenticatedUser.username).proxyFor(TestOps::class.java)
}
@After
fun shutdown() {
(proxy as Closeable?)?.close()
clientSession.stop()
serverSession.stop()
artemis.stop()
serverThread.shutdownNow()
}
interface TestOps : RPCOps {
@Throws(IllegalArgumentException::class)
fun barf()
fun void()
fun someCalculation(str: String, num: Int): String
@RPCReturnsObservables
fun makeObservable(): Observable<Int>
@RPCReturnsObservables
fun makeComplicatedObservable(): Observable<Pair<String, Observable<String>>>
@RPCSinceVersion(2)
fun addedLater()
fun captureUser(): String
}
lateinit var complicatedObservable: Observable<Pair<String, Observable<String>>>
inner class TestOpsImpl : TestOps {
override val protocolVersion = 1
override fun barf(): Unit = throw IllegalArgumentException("Barf!")
override fun void() {
}
override fun someCalculation(str: String, num: Int) = "$str $num"
override fun makeObservable(): Observable<Int> = Observable.just(1, 2, 3, 4)
override fun makeComplicatedObservable() = complicatedObservable
override fun addedLater(): Unit = throw UnsupportedOperationException("not implemented")
override fun captureUser(): String = CURRENT_RPC_USER.get().username
}
@Test
fun `simple RPCs`() {
// Does nothing, doesn't throw.
proxy.void()
assertEquals("Barf!", assertFailsWith<IllegalArgumentException> {
proxy.barf()
}.message)
assertEquals("hi 5", proxy.someCalculation("hi", 5))
}
@Test
fun `simple observable`() {
// This tests that the observations are transmitted correctly, also completion is transmitted.
val observations = proxy.makeObservable().toBlocking().toIterable().toList()
assertEquals(listOf(1, 2, 3, 4), observations)
}
@Test
fun `complex observables`() {
// This checks that we can return an object graph with complex usage of observables, like an observable
// that emits objects that contain more observables.
val serverQuotes = PublishSubject.create<Pair<String, Observable<String>>>()
val unsubscribeLatch = CountDownLatch(1)
complicatedObservable = serverQuotes.asObservable().doOnUnsubscribe { unsubscribeLatch.countDown() }
val twainQuotes = "Mark Twain" to Observable.just(
"I have never let my schooling interfere with my education.",
"Clothes make the man. Naked people have little or no influence on society."
)
val wildeQuotes = "Oscar Wilde" to Observable.just(
"I can resist everything except temptation.",
"Always forgive your enemies - nothing annoys them so much."
)
val clientQuotes = LinkedBlockingQueue<String>()
val clientObs = proxy.makeComplicatedObservable()
val subscription = clientObs.subscribe {
val name = it.first
it.second.subscribe {
clientQuotes += "Quote by $name: $it"
}
}
val rpcQueuesQuery = SimpleString("clients.${authenticatedUser.username}.rpc.*")
assertEquals(2, clientSession.addressQuery(rpcQueuesQuery).queueNames.size)
assertThat(clientQuotes).isEmpty()
serverQuotes.onNext(twainQuotes)
assertEquals("Quote by Mark Twain: I have never let my schooling interfere with my education.", clientQuotes.take())
assertEquals("Quote by Mark Twain: Clothes make the man. Naked people have little or no influence on society.", clientQuotes.take())
serverQuotes.onNext(wildeQuotes)
assertEquals("Quote by Oscar Wilde: I can resist everything except temptation.", clientQuotes.take())
assertEquals("Quote by Oscar Wilde: Always forgive your enemies - nothing annoys them so much.", clientQuotes.take())
assertTrue(serverQuotes.hasObservers())
subscription.unsubscribe()
unsubscribeLatch.await()
assertEquals(1, clientSession.addressQuery(rpcQueuesQuery).queueNames.size)
}
@Test
fun versioning() {
assertFailsWith<UnsupportedOperationException> { proxy.addedLater() }
}
@Test
fun `authenticated user is available to RPC`() {
assertThat(proxy.captureUser()).isEqualTo(authenticatedUser.username)
}
}

View File

@ -16,7 +16,7 @@ import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.messaging.RPCOps
import net.corda.core.messaging.RPCOps
import net.corda.node.services.network.InMemoryNetworkMapCache
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.PersistentUniquenessProvider