Explain the design of the RPC protocol around observables more.

This commit is contained in:
Mike Hearn 2016-11-22 16:40:06 +01:00
parent 033a525001
commit ff75cb444e
2 changed files with 39 additions and 6 deletions

View File

@ -44,6 +44,9 @@ class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfigur
checkStorePasswords() // Check the password.
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(ConnectionDirection.OUTBOUND, host.hostText, host.port))
serverLocator.threadPoolMaxSize = 1
// TODO: Configure session reconnection, confirmation window sizes and other Artemis features.
// This will allow reconnection in case of server restart/network outages/IP address changes, etc.
// See http://activemq.apache.org/artemis/docs/1.5.0/client-reconnection.html
sessionFactory = serverLocator.createSessionFactory()
session = sessionFactory.createSession(username, password, false, true, true, serverLocator.isPreAcknowledge, serverLocator.ackBatchSize)
session.start()
@ -79,7 +82,8 @@ class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfigur
*
* RPC sends and receives are logged on the net.corda.rpc logger.
*
* By default there are no timeouts on calls. RPCs can survive temporary losses or changes in connectivity,
* By default there are no timeouts on calls. This is deliberate, RPCs without timeouts can survive restarts,
* maintenance downtime and moves of the server. RPCs can survive temporary losses or changes in client connectivity,
* like switching between wifi networks. You can specify a timeout on the level of a proxy. If a call times
* out it will throw [RPCException.Deadline].
*

View File

@ -13,7 +13,6 @@ import net.corda.core.random63BitValue
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.debug
import net.corda.core.utilities.trace
import net.corda.node.services.messaging.*
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
@ -37,9 +36,39 @@ import kotlin.concurrent.withLock
import kotlin.reflect.jvm.javaMethod
/**
* Core RPC engine implementation, you should be looking at [CordaRPCClient].
* Core RPC engine implementation, to learn how to use RPC you should be looking at [CordaRPCClient].
*
* @suppress
* # Design notes
*
* The way RPCs are handled is fairly standard except for the handling of observables. When an RPC might return
* an [rx.Observable] it is specially tagged. This causes the client to create a new transient queue for the
* receiving of observables and their observations with a random ID in the name. This ID is sent to the server in
* a message header. All observations are sent via this single queue.
*
* The reason for doing it this way and not the more obvious approach of one-queue-per-observable is that we want
* the queues to be *transient*, meaning their lifetime in the broker is tied to the session that created them.
* A server side observable and its associated queue is not a cost-free thing, let alone the memory and resources
* needed to actually generate the observations themselves, therefore we want to ensure these cannot leak. A
* transient queue will be deleted automatically if the client session terminates, which by default happens on
* disconnect but can also be configured to happen after a short delay (this allows clients to e.g. switch IP
* address). On the server the deletion of the observations queue triggers unsubscription from the associated
* observables, which in turn may then be garbage collected.
*
* Creating a transient queue requires a roundtrip to the broker and thus doing an RPC that could return
* observables takes two server roundtrips instead of one. That's why we require RPCs to be marked with
* [RPCReturnsObservables] as needing this special treatment instead of always doing it.
*
* If the Artemis/JMS APIs allowed us to create transient queues assigned to someone else then we could
* potentially use a different design in which the node creates new transient queues (one per observable) on the
* fly. The client would then have to watch out for this and start consuming those queues as they were created.
*
* We use one queue per RPC because we don't know ahead of time how many observables the server might return and
* often the server doesn't know either, which pushes towards a single queue design, but at the same time the
* processing of observations returned by an RPC might be striped across multiple threads and we'd like
* backpressure management to not be scoped per client process but with more granularity. So we end up with
* a compromise where the unit of backpressure management is the response to a single RPC.
*
* TODO: Backpressure isn't propagated all the way through the MQ broker at the moment.
*/
class CordaRPCClientImpl(private val session: ClientSession,
private val sessionLock: ReentrantLock,
@ -149,7 +178,7 @@ class CordaRPCClientImpl(private val session: ClientSession,
// sendRequest may return a reconfigured Kryo if the method returns observables.
val kryo: Kryo = sendRequest(args, location, method) ?: createRPCKryo()
val next = receiveResponse(kryo, method, timeout)
val next: ErrorOr<*> = receiveResponse(kryo, method, timeout)
rpcLog.debug { "<- RPC <- ${method.name} = $next" }
return unwrapOrThrow(next)
}
@ -186,7 +215,7 @@ class CordaRPCClientImpl(private val session: ClientSession,
val returnsObservables = method.isAnnotationPresent(RPCReturnsObservables::class.java)
sessionLock.withLock {
val msg = createMessage(method)
val msg: ClientMessage = createMessage(method)
val kryo = if (returnsObservables) maybePrepareForObservables(location, method, msg) else null
val serializedArgs = try {
(args ?: emptyArray<Any?>()).serialize()