From cda5a6a8f2163a9cdd50637bfceadde7c9e16441 Mon Sep 17 00:00:00 2001 From: bpaunescu Date: Mon, 21 May 2018 13:39:38 +0100 Subject: [PATCH] add RPC client examples of connection management (#3201) --- docs/source/clientrpc.rst | 85 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/docs/source/clientrpc.rst b/docs/source/clientrpc.rst index 1c0999ad8c..6bf0f91483 100644 --- a/docs/source/clientrpc.rst +++ b/docs/source/clientrpc.rst @@ -301,6 +301,91 @@ requires a higher version of the protocol than the server supports, ``Unsupporte Otherwise, if the server implementation throws an exception, that exception is serialised and rethrown on the client side as if it was thrown from inside the called RPC method. These exceptions can be caught as normal. +Connection management +--------------------- +It is possible to not be able to connect to the server on the first attempt. In that case, the ``CordaRPCCLient.start()`` +method will throw an exception. The following code snippet is an example of how to write a simple retry mechanism for +such situations: + +.. sourcecode:: Kotlin + + fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): CordaRPCConnection { + + val retryInterval = 5.seconds + + do { + val connection = try { + logger.info("Connecting to: $nodeHostAndPort") + val client = CordaRPCClient( + nodeHostAndPort, + object : CordaRPCClientConfiguration { + override val connectionMaxRetryInterval = retryInterval + } + ) + val _connection = client.start(username, password) + // Check connection is truly operational before returning it. + val nodeInfo = _connection.proxy.nodeInfo() + require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty()) + _connection + } catch(secEx: ActiveMQSecurityException) { + // Happens when incorrect credentials provided - no point to retry connecting. + throw secEx + } + catch(ex: RPCException) { + // Deliberately not logging full stack trace as it will be full of internal stacktraces. + logger.info("Exception upon establishing connection: " + ex.message) + null + } + + if(connection != null) { + logger.info("Connection successfully established with: $nodeHostAndPort") + return connection + } + // Could not connect this time round - pause before giving another try. + Thread.sleep(retryInterval.toMillis()) + } while (connection == null) + } + +After a successful connection, it is possible for the server to become unavailable. In this case, all RPC calls will throw +an exception and created observables will no longer receive observations. Below is an example of how to reconnect and +back-fill any data that might have been missed while the connection was down. This is done by using the ``onError`` handler +on the ``Observable`` returned by ``CordaRPCOps``. + +.. sourcecode:: Kotlin + + fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) { + + val connection = establishConnectionWithRetry(nodeHostAndPort, username, password) + val proxy = connection.proxy + + val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed() + + val retryableStateMachineUpdatesSubscription: AtomicReference = AtomicReference(null) + val subscription: Subscription = stateMachineUpdatesRaw + .startWith(stateMachineInfos.map { StateMachineUpdate.Added(it) }) + .subscribe({ clientCode(it) /* Client code here */ }, { + // Terminate subscription such that nothing gets past this point to downstream Observables. + retryableStateMachineUpdatesSubscription.get()?.unsubscribe() + // It is good idea to close connection to properly mark the end of it. During re-connect we will create a new + // client and a new connection, so no going back to this one. Also the server might be down, so we are + // force closing the connection to avoid propagation of notification to the server side. + connection.forceClose() + // Perform re-connect. + performRpcReconnect(nodeHostAndPort, username, password) + }) + + retryableStateMachineUpdatesSubscription.set(subscription) + } + +In this code snippet it is possible to see that function ``performRpcReconnect`` creates an RPC connection and implements +the error handler upon subscription to an ``Observable``. The call to this ``onError`` handler will be made when failover +happens then the code will terminate existing subscription, closes RPC connection and recursively calls ``performRpcReconnect`` +which will re-subscribe once RPC connection comes back online. + +Client code if fed with instances of ``StateMachineInfo`` using call ``clientCode(it)``. Upon re-connec, this code receives +all the items. Some of these items might have already been delivered to client code prior to failover occurred. +It is down to client code in this case handle those duplicate items as appropriate. + Wire protocol ------------- The client RPC wire protocol is defined and documented in ``net/corda/client/rpc/RPCApi.kt``.