From aff7d0bb0938eb94ba16ba02bd2551c9be0ab2ff Mon Sep 17 00:00:00 2001 From: Viktor Kolomeyko Date: Mon, 14 May 2018 10:51:04 +0100 Subject: [PATCH] ENT-1865: Guidelines for creating RPC clients compatible with Hot/Cold node deployment. (#826) * ENT-1865: Guidelines for creating RPC clients compatible with Hot/Cold node deployment. * ENT-1865: Changes to documentation following review by @thschroeter * ENT-1865: Changes to documentation following review by @mnesbit --- docs/source/high-availablility.rst | 97 +++++++++++++++++++++++++++++- 1 file changed, 95 insertions(+), 2 deletions(-) diff --git a/docs/source/high-availablility.rst b/docs/source/high-availablility.rst index 61fdce53e9..cf843eb9d9 100644 --- a/docs/source/high-availablility.rst +++ b/docs/source/high-availablility.rst @@ -7,9 +7,9 @@ Hot Cold ~~~~~~~~ In the hot cold configuration, failover is handled manually, by promoting the cold node after the former hot node -failed or was taken offline for maintainance. +failed or was taken offline for maintenance. -RPC clients have to handle ``RPCException`` and implement application specific recovery and retry. +For RPC clients there is a way to recover in case of failover, see section below. Prerequisites ------------- @@ -53,6 +53,99 @@ Fields :waitInterval: Amount of time(milliseconds) to wait since last mutual exclusion lease update before being able to become the master node. This has to be greater than updateInterval. +RPC failover +------------ + +In case of hot-cold there will be a short period of time when none of the nodes available and accepting connections. +If the RPC client has not been connected at all and makes its first RPC connection during this instability window, the connection will be rejected +as if server address does not exists. The only choice client has in this case is to catch corresponding exception during ``CordaRPCClient.start()`` +and keep on re-trying. + +The following code snippet illustrates that. + +.. sourcecode:: Kotlin + + fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): CordaRPCConnection { + + val retryInterval = 5.seconds + + do { + val connection = try { + logger.info("Connecting to: $nodeHostAndPort") + val client = CordaRPCClient( + nodeHostAndPort, + object : CordaRPCClientConfiguration { + override val connectionMaxRetryInterval = retryInterval + } + ) + val _connection = client.start(username, password) + // Check connection is truly operational before returning it. + val nodeInfo = _connection.proxy.nodeInfo() + require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty()) + _connection + } catch(secEx: ActiveMQSecurityException) { + // Happens when incorrect credentials provided - no point to retry connecting. + throw secEx + } + catch(th: Throwable) { + // Deliberately not logging full stack trace as it will be full of internal stacktraces. + logger.info("Exception upon establishing connection: " + th.message) + null + } + + if(connection != null) { + logger.info("Connection successfully established with: $nodeHostAndPort") + return connection + } + // Could not connect this time round - pause before giving another try. + Thread.sleep(retryInterval.toMillis()) + } while (connection == null) + + throw IllegalArgumentException("Never reaches here") + } + +If, however, the RPC client was connected through load-balancer to a node and failover occurred it will take sometime for cold instance to start-up. +Acceptable behavior in this case would be for RPC client to keep re-trying to connect and once connected - back-fill any data that might have been missed since connection was down. +In a way this scenario is no different to a temporal loss of connectivity with a node even without any form of High Availability. + +In order to achieve said re-try/back-fill functionality the client needs to install ``onError`` handler on the ``Observable`` returned by ``CordaRPCOps``. +Please see code below which illustrates how this can be achieved. + +.. sourcecode:: Kotlin + + fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) { + + val connection = establishConnectionWithRetry(nodeHostAndPort, username, password) + val proxy = connection.proxy + + val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed() + + val retryableStateMachineUpdatesSubscription: AtomicReference = AtomicReference(null) + val subscription: Subscription = stateMachineUpdatesRaw + .startWith(stateMachineInfos.map { StateMachineUpdate.Added(it) }) + .subscribe({ clientCode(it) /* Client code here */ }, { + // Terminate subscription such that nothing gets past this point to downstream Observables. + retryableStateMachineUpdatesSubscription.get()?.unsubscribe() + // It is good idea to close connection to properly mark the end of it. During re-connect we will create a new + // client and a new connection, so no going back to this one. Also the server might be down, so we are + // force closing the connection to avoid propagation of notification to the server side. + connection.forceClose() + // Perform re-connect. + performRpcReconnect(nodeHostAndPort, username, password) + }) + + retryableStateMachineUpdatesSubscription.set(subscription) + } + +In this code snippet it is possible to see that function ``performRpcReconnect`` creates RPC connection and installs error handler +upon subscription to an ``Observable``. The call to this ``onError`` handler will be made when failover happens then the code +will terminate existing subscription, closes RPC connection and recursively calls ``performRpcReconnect`` which will re-subscribe +once RPC connection comes back online. + +Client code if fed with instances of ``StateMachineInfo`` using call ``clientCode(it)``. Upon re-connect this code receives +all the items. Some of these items might have already been delivered to client code prior to failover occurred. +It is down to client code in this case to have a memory and handle those duplicating items as appropriate. + Hot Warm ~~~~~~~~