CORDA-2858: Wire-up Corda components with better RPC reconnect logic (#4933)
Implemented the ReconnectingRPC into the WebServer, Standalone Shell, Explorer and BankOfCordaClientApi
@ -32,13 +32,13 @@ class NetworkIdentityModel {
private val identityCache = Caffeine.newBuilder()
.build<PublicKey, ObservableValue<NodeInfo?>>(CacheLoader { publicKey: PublicKey ->
publicKey.let { rpcProxy.map { it?.cordaRPCOps?.nodeInfoFromParty(AnonymousParty(publicKey)) } }
publicKey.let { rpcProxy.map { it?.nodeInfoFromParty(AnonymousParty(publicKey)) } }
val notaries = ChosenList(rpcProxy.map { FXCollections.observableList(it?.cordaRPCOps?.notaryIdentities() ?: emptyList()) }, "notaries")
val notaryNodes: ObservableList<NodeInfo> = notaries.map { rpcProxy.value?.cordaRPCOps?.nodeInfoFromParty(it) }.filterNotNull()
val notaries = ChosenList(rpcProxy.map { FXCollections.observableList(it?.notaryIdentities() ?: emptyList()) }, "notaries")
val notaryNodes: ObservableList<NodeInfo> = notaries.map { rpcProxy.value?.nodeInfoFromParty(it) }.filterNotNull()
val parties: ObservableList<NodeInfo> = networkIdentities
.filtered { it.legalIdentities.all { it !in notaries } }.unique()
val myIdentity = rpcProxy.map { it?.cordaRPCOps?.nodeInfo()?.legalIdentitiesAndCerts?.first()?.party }
val myIdentity = rpcProxy.map { it?.nodeInfo()?.legalIdentitiesAndCerts?.first()?.party }
fun partyFromPublicKey(publicKey: PublicKey): ObservableValue<NodeInfo?> = identityCache[publicKey]!!
@ -1,15 +1,12 @@
package net.corda.client.jfx.model
import com.sun.javafx.application.PlatformImpl
import javafx.application.Platform
import javafx.beans.property.SimpleObjectProperty
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.CordaRPCConnection
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.client.rpc.internal.asReconnectingWithInitialValues
import net.corda.core.contracts.ContractState
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.staticField
import net.corda.core.messaging.*
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.Vault
@ -20,12 +17,8 @@ import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.seconds
import rx.Observable
import rx.Subscription
import rx.subjects.PublishSubject
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
data class ProgressTrackingEvent(val stateMachineId: StateMachineRunId, val message: String) {
companion object {
@ -42,14 +35,12 @@ data class ProgressTrackingEvent(val stateMachineId: StateMachineRunId, val mess
class NodeMonitorModel : AutoCloseable {
private val retryableStateMachineUpdatesSubject = PublishSubject.create<StateMachineUpdate>()
private val stateMachineUpdatesSubject = PublishSubject.create<StateMachineUpdate>()
private val vaultUpdatesSubject = PublishSubject.create<Vault.Update<ContractState>>()
private val transactionsSubject = PublishSubject.create<SignedTransaction>()
private val stateMachineTransactionMappingSubject = PublishSubject.create<StateMachineTransactionMapping>()
private val progressTrackingSubject = PublishSubject.create<ProgressTrackingEvent>()
private val networkMapSubject = PublishSubject.create<MapChange>()
private var rpcConnection: CordaRPCConnection? = null
val stateMachineUpdates: Observable<StateMachineUpdate> = stateMachineUpdatesSubject
val vaultUpdates: Observable<Vault.Update<ContractState>> = vaultUpdatesSubject
@ -58,39 +49,20 @@ class NodeMonitorModel : AutoCloseable {
val progressTracking: Observable<ProgressTrackingEvent> = progressTrackingSubject
val networkMap: Observable<MapChange> = networkMapSubject
val proxyObservable = SimpleObjectProperty<CordaRPCOpsWrapper?>()
private lateinit var rpc: CordaRPCOps
val proxyObservable = SimpleObjectProperty<CordaRPCOps>()
lateinit var notaryIdentities: List<Party>
companion object {
val logger = contextLogger()
private fun runLaterIfInitialized(op: () -> Unit) {
val initialized = PlatformImpl::class.java.staticField<AtomicBoolean>("initialized")
// Only execute using "runLater()" if JavaFX been initialized.
// It may not be initialized in the unit test.
// Also if we are already in the JavaFX thread - perform direct invocation without postponing it.
if (initialized.value.get() && !Platform.isFxApplicationThread()) {
} else {
* This is needed as JavaFX listener framework attempts to call `equals()` before dispatching notification change.
* And calling `CordaRPCOps.equals()` results in (unhandled) remote call.
class CordaRPCOpsWrapper(val cordaRPCOps: CordaRPCOps)
* Disconnects from the Corda node for a clean client shutdown.
override fun close() {
try {
(rpc as ReconnectingCordaRPCOps).close()
} catch (e: Exception) {
logger.error("Error closing RPC connection to node", e)
@ -101,41 +73,40 @@ class NodeMonitorModel : AutoCloseable {
* TODO provide an unsubscribe mechanism
fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) {
// `retryableStateMachineUpdatesSubject` will change it's upstream subscriber in case of RPC connection failure, this `Observable` should
// never produce an error.
// `stateMachineUpdatesSubject` will stay firmly subscribed to `retryableStateMachineUpdatesSubject`
rpc = ReconnectingCordaRPCOps(nodeHostAndPort, username, password)
// Proxy may change during re-connect, ensure that subject wiring accurately reacts to this activity.
proxyObservable.addListener { _, _, wrapper ->
if (wrapper != null) {
val proxy = wrapper.cordaRPCOps
// Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates
val (statesSnapshot, vaultUpdates) = proxy.vaultTrackBy<ContractState>(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL),
val unconsumedStates = statesSnapshot.states.filterIndexed { index, _ ->
statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED
val consumedStates = statesSnapshot.states.toSet() - unconsumedStates
val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates, references = emptySet())
vaultUpdates.startWith(initialVaultUpdate).subscribe(vaultUpdatesSubject::onNext, {})
proxyObservable.value = rpc
// Transactions
val (transactions, newTransactions) = proxy.internalVerifiedTransactionsFeed()
newTransactions.startWith(transactions).subscribe(transactionsSubject::onNext, {})
// Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates
val (statesSnapshot, vaultUpdates) = rpc.vaultTrackBy<ContractState>(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL),
val unconsumedStates = statesSnapshot.states.filterIndexed { index, _ ->
statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED
val consumedStates = statesSnapshot.states.toSet() - unconsumedStates
val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates, references = emptySet())
onNext = vaultUpdatesSubject::onNext,
onDisconnect = { Platform.runLater { proxyObservable.value = null } },
onReconnect = { Platform.runLater { proxyObservable.value = rpc } },
onStop = {})
// SM -> TX mapping
val (smTxMappings, futureSmTxMappings) = proxy.stateMachineRecordedTransactionMappingFeed()
futureSmTxMappings.startWith(smTxMappings).subscribe(stateMachineTransactionMappingSubject::onNext, {})
// Transactions
val (transactions, newTransactions) = rpc.internalVerifiedTransactionsFeed()
// Parties on network
val (parties, futurePartyUpdate) = proxy.networkMapFeed()
futurePartyUpdate.startWith(parties.map(MapChange::Added)).subscribe(networkMapSubject::onNext, {})
// SM -> TX mapping
val (smTxMappings, futureSmTxMappings) = rpc.stateMachineRecordedTransactionMappingFeed()
val stateMachines = performRpcReconnect(nodeHostAndPort, username, password, shouldRetry = false)
// Parties on network
val (parties, futurePartyUpdate) = rpc.networkMapFeed()
val stateMachines = rpc.stateMachinesSnapshot()
notaryIdentities = rpc.notaryIdentities()
// Extract the flow tracking stream
// TODO is there a nicer way of doing this? Stream of streams in general results in code like this...
@ -155,73 +126,4 @@ class NodeMonitorModel : AutoCloseable {
// We need to retry, because when flow errors, we unsubscribe from progressTrackingSubject. So we end up with stream of state machine updates and no progress trackers.
futureProgressTrackerUpdates.startWith(currentProgressTrackerUpdates).flatMap { it }.retry().subscribe(progressTrackingSubject)
private fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String, shouldRetry: Boolean): List<StateMachineInfo> {
val proxy = establishConnectionWithRetry(nodeHostAndPort, username, password, shouldRetry).let { connection ->
rpcConnection = connection
val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed()
val retryableStateMachineUpdatesSubscription: AtomicReference<Subscription?> = AtomicReference(null)
val subscription: Subscription = stateMachineUpdatesRaw
.startWith(stateMachineInfos.map { StateMachineUpdate.Added(it) })
.subscribe({ retryableStateMachineUpdatesSubject.onNext(it) }, {
// Terminate subscription such that nothing gets past this point to downstream Observables.
// Flag to everyone that proxy is no longer available.
runLaterIfInitialized { proxyObservable.set(null) }
// It is good idea to close connection to properly mark the end of it. During re-connect we will create a new
// client and a new connection, so no going back to this one. Also the server might be down, so we are
// force closing the connection to avoid propagation of notification to the server side.
// Perform re-connect.
performRpcReconnect(nodeHostAndPort, username, password, shouldRetry = true)
runLaterIfInitialized { proxyObservable.set(CordaRPCOpsWrapper(proxy)) }
notaryIdentities = proxy.notaryIdentities()
return stateMachineInfos
private fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String, shouldRetry: Boolean): CordaRPCConnection {
val retryInterval = 5.seconds
val client = CordaRPCClient(
connectionMaxRetryInterval = retryInterval
do {
val connection = try {
logger.info("Connecting to: $nodeHostAndPort")
val _connection = client.start(username, password)
// Check connection is truly operational before returning it.
val nodeInfo = _connection.proxy.nodeInfo()
require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty()){"No identity certificates found"}
} catch (exception: Exception) {
if (shouldRetry) {
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
logger.info("Exception upon establishing connection: {}", exception.message)
} else {
throw exception
if (connection != null) {
logger.info("Connection successfully established with: $nodeHostAndPort")
return connection
// Could not connect this time round - pause before giving another try.
} while (connection == null)
throw IllegalArgumentException("Never reaches here")
@ -115,7 +115,7 @@ class TransactionDataModel {
val partiallyResolvedTransactions = collectedTransactions.map {
it.inputs.map { stateRef ->
stateRef to rpcProxy.value!!.cordaRPCOps.internalFindVerifiedTransaction(stateRef.txhash)
stateRef to rpcProxy.value!!.internalFindVerifiedTransaction(stateRef.txhash)
@ -1,19 +1,15 @@
package net.corda.bank.api
import net.corda.bank.api.BankOfCordaWebApi.IssueRequestParams
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.CordaRPCConnection
import net.corda.client.rpc.RPCException
import net.corda.core.messaging.StateMachineUpdate
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.*
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.finance.flows.CashIssueAndPaymentFlow
import net.corda.testing.http.HttpApi
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import rx.Subscription
import java.util.concurrent.atomic.AtomicReference
* Interface for communicating with Bank of Corda node
@ -46,10 +42,8 @@ object BankOfCordaClientApi {
* @return a cash issue transaction.
fun requestRPCIssueHA(availableRpcServers: List<NetworkHostAndPort>, params: IssueRequestParams): SignedTransaction {
val client = performRpcReconnect(availableRpcServers, BOC_RPC_USER, BOC_RPC_PWD)
// TODO: privileged security controls required
client.use { connection ->
val rpc = connection.proxy
ReconnectingCordaRPCOps(availableRpcServers, BOC_RPC_USER, BOC_RPC_PWD).use { rpc->
// Resolve parties via RPC
@ -66,63 +60,4 @@ object BankOfCordaClientApi {
// DOCSTART rpcClientConnectionRecovery
fun performRpcReconnect(nodeHostAndPorts: List<NetworkHostAndPort>, username: String, password: String): CordaRPCConnection {
val connection = establishConnectionWithRetry(nodeHostAndPorts, username, password)
val proxy = connection.proxy
val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed()
val retryableStateMachineUpdatesSubscription: AtomicReference<Subscription?> = AtomicReference(null)
val subscription: Subscription = stateMachineUpdatesRaw
.startWith(stateMachineInfos.map { StateMachineUpdate.Added(it) })
.subscribe({ /* Client code here */ }, {
// Terminate subscription such that nothing gets past this point to downstream Observables.
// It is good idea to close connection to properly mark the end of it. During re-connect we will create a new
// client and a new connection, so no going back to this one. Also the server might be down, so we are
// force closing the connection to avoid propagation of notification to the server side.
// Perform re-connect.
performRpcReconnect(nodeHostAndPorts, username, password)
return connection
// DOCEND rpcClientConnectionRecovery
// DOCSTART rpcClientConnectionWithRetry
private fun establishConnectionWithRetry(nodeHostAndPorts: List<NetworkHostAndPort>, username: String, password: String): CordaRPCConnection {
val retryInterval = 5.seconds
var connection: CordaRPCConnection?
do {
connection = try {
logger.info("Connecting to: $nodeHostAndPorts")
val client = CordaRPCClient(
CordaRPCClientConfiguration(connectionMaxRetryInterval = retryInterval)
val _connection = client.start(username, password)
// Check connection is truly operational before returning it.
val nodeInfo = _connection.proxy.nodeInfo()
} catch (secEx: ActiveMQSecurityException) {
// Happens when incorrect credentials provided - no point retrying connection
logger.info("Security exception upon attempt to establish connection: " + secEx.message)
throw secEx
} catch (ex: RPCException) {
logger.info("Exception upon attempt to establish connection: " + ex.message)
null // force retry after sleep
// Could not connect this time round - pause before giving another try.
} while (connection == null)
logger.info("Connection successfully established with: ${connection.proxy.nodeInfo()}")
return connection
// DOCEND rpcClientConnectionWithRetry
@ -16,7 +16,7 @@ class IssuerModel {
private val defaultCurrency = Currency.getInstance("USD")
private val proxy by observableValue(NodeMonitorModel::proxyObservable)
private val cashAppConfiguration = proxy.map { it?.cordaRPCOps?.startFlow(::CashConfigDataFlow)?.returnValue?.getOrThrow() }
private val cashAppConfiguration = proxy.map { it?.startFlow(::CashConfigDataFlow)?.returnValue?.getOrThrow() }
val supportedCurrencies = ChosenList(cashAppConfiguration.map { it?.supportedCurrencies?.observable() ?: FXCollections.singletonObservableList(defaultCurrency) }, "supportedCurrencies")
val currencyTypes = ChosenList(cashAppConfiguration.map { it?.issuableCurrencies?.observable() ?: FXCollections.emptyObservableList() }, "currencyTypes")
@ -95,9 +95,9 @@ class NewTransaction : Fragment() {
val handle: FlowHandle<AbstractCashFlow.Result> = when (request) {
is IssueAndPaymentRequest -> rpcProxy.value!!.cordaRPCOps.startFlow(::CashIssueAndPaymentFlow, request)
is PaymentRequest -> rpcProxy.value!!.cordaRPCOps.startFlow(::CashPaymentFlow, request)
is ExitRequest -> rpcProxy.value!!.cordaRPCOps.startFlow(::CashExitFlow, request)
is IssueAndPaymentRequest -> rpcProxy.value!!.startFlow(::CashIssueAndPaymentFlow, request)
is PaymentRequest -> rpcProxy.value!!.startFlow(::CashPaymentFlow, request)
is ExitRequest -> rpcProxy.value!!.startFlow(::CashExitFlow, request)
else -> throw IllegalArgumentException("Unexpected request type: $request")
runAsync {
@ -81,7 +81,7 @@ class StandaloneShell : CordaCliWrapper("corda-shell", "The Corda standalone she
password = String(readPassword("Password:"))
InteractiveShell.startShell(configuration, classLoader)
InteractiveShell.startShell(configuration, classLoader, true)
try {
//connecting to node by requesting node info to fail fast
@ -11,8 +11,10 @@ import net.corda.client.jackson.JacksonSupport
import net.corda.client.jackson.StringToMethodCallParser
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.CordaRPCConnection
import net.corda.client.rpc.PermissionException
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.client.rpc.internal.ReconnectingObservable
import net.corda.client.rpc.internal.asReconnectingWithInitialValues
import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.UniqueIdentifier
@ -71,9 +73,9 @@ import kotlin.concurrent.thread
object InteractiveShell {
private val log = LoggerFactory.getLogger(javaClass)
private lateinit var rpcOps: (username: String, credentials: String) -> CordaRPCOps
private lateinit var rpcOps: (username: String, password: String) -> CordaRPCOps
private lateinit var ops: CordaRPCOps
private lateinit var connection: CordaRPCConnection
private lateinit var rpcConn: AutoCloseable
private var shell: Shell? = null
private var classLoader: ClassLoader? = null
private lateinit var shellConfiguration: ShellConfiguration
@ -87,20 +89,23 @@ object InteractiveShell {
* Starts an interactive shell connected to the local terminal. This shell gives administrator access to the node
* internals.
fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null) {
rpcOps = { username: String, credentials: String ->
val client = CordaRPCClient(hostAndPort = configuration.hostAndPort,
configuration = CordaRPCClientConfiguration.DEFAULT.copy(
maxReconnectAttempts = 1
sslConfiguration = configuration.ssl,
classLoader = classLoader)
this.connection = client.start(username, credentials)
fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null, standalone: Boolean = false) {
rpcOps = { username: String, password: String ->
if (standalone) {
ReconnectingCordaRPCOps(configuration.hostAndPort, username, password, configuration.ssl, classLoader).also {
rpcConn = it
} else {
val client = CordaRPCClient(hostAndPort = configuration.hostAndPort,
configuration = CordaRPCClientConfiguration.DEFAULT.copy(
maxReconnectAttempts = 1
sslConfiguration = configuration.ssl,
classLoader = classLoader)
val connection = client.start(username, password)
rpcConn = connection
_startShell(configuration, classLoader)
@ -457,7 +462,11 @@ object InteractiveShell {
val (stateMachines, stateMachineUpdates) = proxy.stateMachinesFeed()
val currentStateMachines = stateMachines.map { StateMachineUpdate.Added(it) }
val subscriber = FlowWatchPrintingSubscriber(out)
if (stateMachineUpdates is ReconnectingObservable<*>) {
} else {
var result: Any? = subscriber.future
if (result is Future<*>) {
if (!result.isDone) {
@ -561,7 +570,7 @@ object InteractiveShell {
// When completed.
// This will only show up in the standalone Shell, because the embedded one is killed as part of a node's shutdown.
display { println("...done, quitting the shell now.") }
@ -5,6 +5,7 @@ import io.netty.channel.unix.Errors
import net.corda.client.jackson.JacksonSupport
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.RPCException
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.core.internal.errors.AddressBindingException
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.contextLogger
@ -43,7 +44,7 @@ class NodeWebServer(val config: WebServerConfig) {
fun start() {
logAndMaybePrint("Starting as webserver: ${config.webAddress}")
server = initWebServer(retryConnectLocalRpc())
server = initWebServer(reconnectingCordaRPCOps())
fun run() {
@ -175,34 +176,7 @@ class NodeWebServer(val config: WebServerConfig) {
private fun retryConnectLocalRpc(): CordaRPCOps {
while (true) {
try {
return connectLocalRpcAsNodeUser()
} catch (e: RPCException) {
log.debug("Could not connect to ${config.rpcAddress} due to exception: ", e)
// This error will happen if the server has yet to create the keystore
// Keep the fully qualified package name due to collisions with the Kotlin stdlib
// exception of the same name
} catch (e: NoSuchFileException) {
log.debug("Tried to open a file that doesn't yet exist, retrying", e)
} catch (e: Throwable) {
// E.g. a plugin cannot be instantiated?
// Note that we do want the exception stacktrace.
log.error("Cannot start WebServer", e)
throw e
private fun connectLocalRpcAsNodeUser(): CordaRPCOps {
log.info("Connecting to node at ${config.rpcAddress} as ${config.runAs}")
val client = CordaRPCClient(hostAndPort = config.rpcAddress, classLoader = javaClass.classLoader)
val connection = client.start(config.runAs.username, config.runAs.password)
return connection.proxy
private fun reconnectingCordaRPCOps() = ReconnectingCordaRPCOps(config.rpcAddress, config.runAs.username , config.runAs.password, null, javaClass.classLoader)
/** Fetch WebServerPluginRegistry classes registered in META-INF/services/net.corda.webserver.services.WebServerPluginRegistry files that exist in the classpath */
val pluginRegistries: List<WebServerPluginRegistry> by lazy {
