corda / net.corda.client.rpc / CordaRPCClientImpl / <init>

<init>

CordaRPCClientImpl(session: ClientSession, sessionLock: ReentrantLock, username: String)

Core RPC engine implementation, to learn how to use RPC you should be looking at CordaRPCClient.

Design notes

The way RPCs are handled is fairly standard except for the handling of observables. When an RPC might return an Observable it is specially tagged. This causes the client to create a new transient queue for the receiving of observables and their observations with a random ID in the name. This ID is sent to the server in a message header. All observations are sent via this single queue.

The reason for doing it this way and not the more obvious approach of one-queue-per-observable is that we want the queues to be transient, meaning their lifetime in the broker is tied to the session that created them. A server side observable and its associated queue is not a cost-free thing, let alone the memory and resources needed to actually generate the observations themselves, therefore we want to ensure these cannot leak. A transient queue will be deleted automatically if the client session terminates, which by default happens on disconnect but can also be configured to happen after a short delay (this allows clients to e.g. switch IP address). On the server the deletion of the observations queue triggers unsubscription from the associated observables, which in turn may then be garbage collected.

Creating a transient queue requires a roundtrip to the broker and thus doing an RPC that could return observables takes two server roundtrips instead of one. That's why we require RPCs to be marked with RPCReturnsObservables as needing this special treatment instead of always doing it.

If the Artemis/JMS APIs allowed us to create transient queues assigned to someone else then we could potentially use a different design in which the node creates new transient queues (one per observable) on the fly. The client would then have to watch out for this and start consuming those queues as they were created.

We use one queue per RPC because we don't know ahead of time how many observables the server might return and often the server doesn't know either, which pushes towards a single queue design, but at the same time the processing of observations returned by an RPC might be striped across multiple threads and we'd like backpressure management to not be scoped per client process but with more granularity. So we end up with a compromise where the unit of backpressure management is the response to a single RPC.

TODO: Backpressure isn't propagated all the way through the MQ broker at the moment.