mirror of
https://github.com/corda/corda.git
synced 2025-04-07 19:34:41 +00:00
add RPC client examples of connection management (#3201)
This commit is contained in:
parent
5de2c2aa4b
commit
cda5a6a8f2
@ -301,6 +301,91 @@ requires a higher version of the protocol than the server supports, ``Unsupporte
|
||||
Otherwise, if the server implementation throws an exception, that exception is serialised and rethrown on the client
|
||||
side as if it was thrown from inside the called RPC method. These exceptions can be caught as normal.
|
||||
|
||||
Connection management
|
||||
---------------------
|
||||
It is possible to not be able to connect to the server on the first attempt. In that case, the ``CordaRPCCLient.start()``
|
||||
method will throw an exception. The following code snippet is an example of how to write a simple retry mechanism for
|
||||
such situations:
|
||||
|
||||
.. sourcecode:: Kotlin
|
||||
|
||||
fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): CordaRPCConnection {
|
||||
|
||||
val retryInterval = 5.seconds
|
||||
|
||||
do {
|
||||
val connection = try {
|
||||
logger.info("Connecting to: $nodeHostAndPort")
|
||||
val client = CordaRPCClient(
|
||||
nodeHostAndPort,
|
||||
object : CordaRPCClientConfiguration {
|
||||
override val connectionMaxRetryInterval = retryInterval
|
||||
}
|
||||
)
|
||||
val _connection = client.start(username, password)
|
||||
// Check connection is truly operational before returning it.
|
||||
val nodeInfo = _connection.proxy.nodeInfo()
|
||||
require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty())
|
||||
_connection
|
||||
} catch(secEx: ActiveMQSecurityException) {
|
||||
// Happens when incorrect credentials provided - no point to retry connecting.
|
||||
throw secEx
|
||||
}
|
||||
catch(ex: RPCException) {
|
||||
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
||||
logger.info("Exception upon establishing connection: " + ex.message)
|
||||
null
|
||||
}
|
||||
|
||||
if(connection != null) {
|
||||
logger.info("Connection successfully established with: $nodeHostAndPort")
|
||||
return connection
|
||||
}
|
||||
// Could not connect this time round - pause before giving another try.
|
||||
Thread.sleep(retryInterval.toMillis())
|
||||
} while (connection == null)
|
||||
}
|
||||
|
||||
After a successful connection, it is possible for the server to become unavailable. In this case, all RPC calls will throw
|
||||
an exception and created observables will no longer receive observations. Below is an example of how to reconnect and
|
||||
back-fill any data that might have been missed while the connection was down. This is done by using the ``onError`` handler
|
||||
on the ``Observable`` returned by ``CordaRPCOps``.
|
||||
|
||||
.. sourcecode:: Kotlin
|
||||
|
||||
fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) {
|
||||
|
||||
val connection = establishConnectionWithRetry(nodeHostAndPort, username, password)
|
||||
val proxy = connection.proxy
|
||||
|
||||
val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed()
|
||||
|
||||
val retryableStateMachineUpdatesSubscription: AtomicReference<Subscription?> = AtomicReference(null)
|
||||
val subscription: Subscription = stateMachineUpdatesRaw
|
||||
.startWith(stateMachineInfos.map { StateMachineUpdate.Added(it) })
|
||||
.subscribe({ clientCode(it) /* Client code here */ }, {
|
||||
// Terminate subscription such that nothing gets past this point to downstream Observables.
|
||||
retryableStateMachineUpdatesSubscription.get()?.unsubscribe()
|
||||
// It is good idea to close connection to properly mark the end of it. During re-connect we will create a new
|
||||
// client and a new connection, so no going back to this one. Also the server might be down, so we are
|
||||
// force closing the connection to avoid propagation of notification to the server side.
|
||||
connection.forceClose()
|
||||
// Perform re-connect.
|
||||
performRpcReconnect(nodeHostAndPort, username, password)
|
||||
})
|
||||
|
||||
retryableStateMachineUpdatesSubscription.set(subscription)
|
||||
}
|
||||
|
||||
In this code snippet it is possible to see that function ``performRpcReconnect`` creates an RPC connection and implements
|
||||
the error handler upon subscription to an ``Observable``. The call to this ``onError`` handler will be made when failover
|
||||
happens then the code will terminate existing subscription, closes RPC connection and recursively calls ``performRpcReconnect``
|
||||
which will re-subscribe once RPC connection comes back online.
|
||||
|
||||
Client code if fed with instances of ``StateMachineInfo`` using call ``clientCode(it)``. Upon re-connec, this code receives
|
||||
all the items. Some of these items might have already been delivered to client code prior to failover occurred.
|
||||
It is down to client code in this case handle those duplicate items as appropriate.
|
||||
|
||||
Wire protocol
|
||||
-------------
|
||||
The client RPC wire protocol is defined and documented in ``net/corda/client/rpc/RPCApi.kt``.
|
||||
|
Loading…
x
Reference in New Issue
Block a user