mirror of
https://github.com/corda/corda.git
synced 2025-02-20 09:26:41 +00:00
CORDA-2741 RPC client connection management section not fully working (#4870)
* RPC Client using HA addresses. * Fix incorrect document code snippets by referencing working, compilable code. * Minor updates following PR review.
This commit is contained in:
parent
0551ba992b
commit
94d827ebe4
@ -358,82 +358,33 @@ It is possible to not be able to connect to the server on the first attempt. In
|
||||
method will throw an exception. The following code snippet is an example of how to write a simple retry mechanism for
|
||||
such situations:
|
||||
|
||||
.. sourcecode:: Kotlin
|
||||
.. literalinclude:: ../../samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt
|
||||
:language: kotlin
|
||||
:start-after: DOCSTART rpcClientConnectionWithRetry
|
||||
:end-before: DOCEND rpcClientConnectionWithRetry
|
||||
|
||||
fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): CordaRPCConnection {
|
||||
val retryInterval = 5.seconds
|
||||
|
||||
do {
|
||||
val connection = try {
|
||||
logger.info("Connecting to: $nodeHostAndPort")
|
||||
val client = CordaRPCClient(
|
||||
nodeHostAndPort,
|
||||
object : CordaRPCClientConfiguration {
|
||||
override val connectionMaxRetryInterval = retryInterval
|
||||
}
|
||||
)
|
||||
val _connection = client.start(username, password)
|
||||
// Check connection is truly operational before returning it.
|
||||
val nodeInfo = _connection.proxy.nodeInfo()
|
||||
require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty())
|
||||
_connection
|
||||
} catch(secEx: ActiveMQSecurityException) {
|
||||
// Happens when incorrect credentials provided - no point to retry connecting.
|
||||
throw secEx
|
||||
}
|
||||
catch(ex: RPCException) {
|
||||
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
||||
logger.info("Exception upon establishing connection: " + ex.message)
|
||||
null
|
||||
}
|
||||
|
||||
if(connection != null) {
|
||||
logger.info("Connection successfully established with: $nodeHostAndPort")
|
||||
return connection
|
||||
}
|
||||
// Could not connect this time round - pause before giving another try.
|
||||
Thread.sleep(retryInterval.toMillis())
|
||||
} while (connection == null)
|
||||
}
|
||||
.. warning:: The list of ``NetworkHostAndPort`` passed to this function should represent one or more addresses reflecting the number of
|
||||
instances of a node configured to service the client RPC request. See ``haAddressPool`` in `CordaRPCClient`_ for further information on
|
||||
using an RPC Client for load balancing and failover.
|
||||
|
||||
After a successful connection, it is possible for the server to become unavailable. In this case, all RPC calls will throw
|
||||
an exception and created observables will no longer receive observations. Below is an example of how to reconnect and
|
||||
back-fill any data that might have been missed while the connection was down. This is done by using the ``onError`` handler
|
||||
on the ``Observable`` returned by ``CordaRPCOps``.
|
||||
|
||||
.. sourcecode:: Kotlin
|
||||
.. literalinclude:: ../../samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt
|
||||
:language: kotlin
|
||||
:start-after: DOCSTART rpcClientConnectionRecovery
|
||||
:end-before: DOCEND rpcClientConnectionRecovery
|
||||
|
||||
fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) {
|
||||
val connection = establishConnectionWithRetry(nodeHostAndPort, username, password)
|
||||
val proxy = connection.proxy
|
||||
In this code snippet it is possible to see that the function ``performRpcReconnect`` creates an RPC connection and implements
|
||||
the error handler upon subscription to an ``Observable``. The call to this ``onError`` handler will be triggered upon failover, at which
|
||||
point the client will terminate its existing subscription, close its RPC connection and recursively call ``performRpcReconnect``,
|
||||
which will re-subscribe once the RPC connection is re-established.
|
||||
|
||||
val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed()
|
||||
|
||||
val retryableStateMachineUpdatesSubscription: AtomicReference<Subscription?> = AtomicReference(null)
|
||||
val subscription: Subscription = stateMachineUpdatesRaw
|
||||
.startWith(stateMachineInfos.map { StateMachineUpdate.Added(it) })
|
||||
.subscribe({ clientCode(it) /* Client code here */ }, {
|
||||
// Terminate subscription such that nothing gets past this point to downstream Observables.
|
||||
retryableStateMachineUpdatesSubscription.get()?.unsubscribe()
|
||||
// It is good idea to close connection to properly mark the end of it. During re-connect we will create a new
|
||||
// client and a new connection, so no going back to this one. Also the server might be down, so we are
|
||||
// force closing the connection to avoid propagation of notification to the server side.
|
||||
connection.forceClose()
|
||||
// Perform re-connect.
|
||||
performRpcReconnect(nodeHostAndPort, username, password)
|
||||
})
|
||||
|
||||
retryableStateMachineUpdatesSubscription.set(subscription)
|
||||
}
|
||||
|
||||
In this code snippet it is possible to see that function ``performRpcReconnect`` creates an RPC connection and implements
|
||||
the error handler upon subscription to an ``Observable``. The call to this ``onError`` handler will be made when failover
|
||||
happens then the code will terminate existing subscription, closes RPC connection and recursively calls ``performRpcReconnect``
|
||||
which will re-subscribe once RPC connection comes back online.
|
||||
|
||||
Client code if fed with instances of ``StateMachineInfo`` using call ``clientCode(it)``. Upon re-connecting, this code receives
|
||||
all the items. Some of these items might have already been delivered to client code prior to failover occurred.
|
||||
It is down to client code in this case handle those duplicate items as appropriate.
|
||||
Within the body of the ``subscribe`` function itself, the client code receives instances of ``StateMachineInfo``. Upon re-connecting, this code receives
|
||||
*all* the instances of ``StateMachineInfo``, some of which may already been delivered to the client code prior to previous disconnect.
|
||||
It is the responsibility of the client code to handle potential duplicated instances of ``StateMachineInfo`` as appropriate.
|
||||
|
||||
Wire security
|
||||
-------------
|
||||
|
@ -5,7 +5,6 @@ import net.corda.bank.api.BankOfCordaClientApi
|
||||
import net.corda.bank.api.BankOfCordaWebApi
|
||||
import net.corda.core.contracts.Amount
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.testing.core.BOC_NAME
|
||||
@ -48,8 +47,7 @@ object IssueCash {
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
fun requestRpcIssue(amount: Amount<Currency>): SignedTransaction {
|
||||
private fun requestRpcIssue(amount: Amount<Currency>): SignedTransaction {
|
||||
return BankOfCordaClientApi.requestRPCIssue(NetworkHostAndPort("localhost", BOC_RPC_PORT), createParams(amount, NOTARY_NAME))
|
||||
}
|
||||
|
||||
|
@ -2,13 +2,18 @@ package net.corda.bank.api
|
||||
|
||||
import net.corda.bank.api.BankOfCordaWebApi.IssueRequestParams
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.client.rpc.CordaRPCClientConfiguration
|
||||
import net.corda.client.rpc.CordaRPCConnection
|
||||
import net.corda.client.rpc.RPCException
|
||||
import net.corda.core.messaging.StateMachineUpdate
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.finance.flows.CashIssueAndPaymentFlow
|
||||
import net.corda.testing.http.HttpApi
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
|
||||
import rx.Subscription
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
/**
|
||||
* Interface for communicating with Bank of Corda node
|
||||
@ -17,6 +22,8 @@ object BankOfCordaClientApi {
|
||||
const val BOC_RPC_USER = "bankUser"
|
||||
const val BOC_RPC_PWD = "test"
|
||||
|
||||
private val logger = loggerFor<BankOfCordaClientApi>()
|
||||
|
||||
/**
|
||||
* HTTP API
|
||||
*/
|
||||
@ -29,12 +36,19 @@ object BankOfCordaClientApi {
|
||||
/**
|
||||
* RPC API
|
||||
*
|
||||
* @return a pair of the issuing and payment transactions.
|
||||
* @return a payment transaction (following successful issuance of cash to self).
|
||||
*/
|
||||
fun requestRPCIssue(rpcAddress: NetworkHostAndPort, params: IssueRequestParams): SignedTransaction {
|
||||
val client = CordaRPCClient(rpcAddress)
|
||||
fun requestRPCIssue(rpcAddress: NetworkHostAndPort, params: IssueRequestParams): SignedTransaction = requestRPCIssueHA(listOf(rpcAddress), params)
|
||||
|
||||
/**
|
||||
* RPC API
|
||||
*
|
||||
* @return a cash issue transaction.
|
||||
*/
|
||||
fun requestRPCIssueHA(availableRpcServers: List<NetworkHostAndPort>, params: IssueRequestParams): SignedTransaction {
|
||||
val client = performRpcReconnect(availableRpcServers, BOC_RPC_USER, BOC_RPC_PWD)
|
||||
// TODO: privileged security controls required
|
||||
client.start(BOC_RPC_USER, BOC_RPC_PWD).use { connection ->
|
||||
client.use { connection ->
|
||||
val rpc = connection.proxy
|
||||
rpc.waitUntilNetworkReady().getOrThrow()
|
||||
|
||||
@ -47,8 +61,68 @@ object BankOfCordaClientApi {
|
||||
val anonymous = true
|
||||
val issuerBankPartyRef = OpaqueBytes.of(params.issuerBankPartyRef.toByte())
|
||||
|
||||
logger.info("${rpc.nodeInfo()} issuing ${params.amount} to transfer to $issueToParty ...")
|
||||
return rpc.startFlow(::CashIssueAndPaymentFlow, params.amount, issuerBankPartyRef, issueToParty, anonymous, notaryLegalIdentity)
|
||||
.returnValue.getOrThrow().stx
|
||||
}
|
||||
}
|
||||
|
||||
// DOCSTART rpcClientConnectionRecovery
|
||||
fun performRpcReconnect(nodeHostAndPorts: List<NetworkHostAndPort>, username: String, password: String): CordaRPCConnection {
|
||||
val connection = establishConnectionWithRetry(nodeHostAndPorts, username, password)
|
||||
val proxy = connection.proxy
|
||||
|
||||
val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed()
|
||||
|
||||
val retryableStateMachineUpdatesSubscription: AtomicReference<Subscription?> = AtomicReference(null)
|
||||
val subscription: Subscription = stateMachineUpdatesRaw
|
||||
.startWith(stateMachineInfos.map { StateMachineUpdate.Added(it) })
|
||||
.subscribe({ /* Client code here */ }, {
|
||||
// Terminate subscription such that nothing gets past this point to downstream Observables.
|
||||
retryableStateMachineUpdatesSubscription.get()?.unsubscribe()
|
||||
// It is good idea to close connection to properly mark the end of it. During re-connect we will create a new
|
||||
// client and a new connection, so no going back to this one. Also the server might be down, so we are
|
||||
// force closing the connection to avoid propagation of notification to the server side.
|
||||
connection.forceClose()
|
||||
// Perform re-connect.
|
||||
performRpcReconnect(nodeHostAndPorts, username, password)
|
||||
})
|
||||
|
||||
retryableStateMachineUpdatesSubscription.set(subscription)
|
||||
return connection
|
||||
}
|
||||
// DOCEND rpcClientConnectionRecovery
|
||||
|
||||
// DOCSTART rpcClientConnectionWithRetry
|
||||
private fun establishConnectionWithRetry(nodeHostAndPorts: List<NetworkHostAndPort>, username: String, password: String): CordaRPCConnection {
|
||||
val retryInterval = 5.seconds
|
||||
var connection: CordaRPCConnection?
|
||||
do {
|
||||
connection = try {
|
||||
logger.info("Connecting to: $nodeHostAndPorts")
|
||||
val client = CordaRPCClient(
|
||||
nodeHostAndPorts,
|
||||
CordaRPCClientConfiguration(connectionMaxRetryInterval = retryInterval)
|
||||
)
|
||||
val _connection = client.start(username, password)
|
||||
// Check connection is truly operational before returning it.
|
||||
val nodeInfo = _connection.proxy.nodeInfo()
|
||||
require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty())
|
||||
_connection
|
||||
} catch (secEx: ActiveMQSecurityException) {
|
||||
// Happens when incorrect credentials provided - no point retrying connection
|
||||
logger.info("Security exception upon attempt to establish connection: " + secEx.message)
|
||||
throw secEx
|
||||
} catch (ex: RPCException) {
|
||||
logger.info("Exception upon attempt to establish connection: " + ex.message)
|
||||
null // force retry after sleep
|
||||
}
|
||||
// Could not connect this time round - pause before giving another try.
|
||||
Thread.sleep(retryInterval.toMillis())
|
||||
} while (connection == null)
|
||||
|
||||
logger.info("Connection successfully established with: ${connection.proxy.nodeInfo()}")
|
||||
return connection
|
||||
}
|
||||
// DOCEND rpcClientConnectionWithRetry
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user