CORDA-3141: Add GracefulReconnect callbacks which allow logic to be performed when RPC disconnects unexpectedly (#5430)

Also removed potential for growing stack trace on reconnects.
This commit is contained in:
Ryan Fowler 2019-09-17 10:00:27 +01:00 committed by Shams Asari
parent fb5f4fadaf
commit 75e66f9db9
10 changed files with 192 additions and 87 deletions

View File

@ -1,6 +1,7 @@
package net.corda.client.jfx.model package net.corda.client.jfx.model
import javafx.beans.property.SimpleObjectProperty import javafx.beans.property.SimpleObjectProperty
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.core.contracts.ContractState import net.corda.core.contracts.ContractState
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
@ -71,7 +72,7 @@ class NodeMonitorModel : AutoCloseable {
* TODO provide an unsubscribe mechanism * TODO provide an unsubscribe mechanism
*/ */
fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) { fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) {
rpc = ReconnectingCordaRPCOps(nodeHostAndPort, username, password) rpc = ReconnectingCordaRPCOps(nodeHostAndPort, username, password, CordaRPCClientConfiguration.DEFAULT)
proxyObservable.value = rpc proxyObservable.value = rpc

View File

@ -3,6 +3,7 @@ package net.corda.client.rpcreconnect
import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.CordaRPCClientTest import net.corda.client.rpc.CordaRPCClientTest
import net.corda.client.rpc.GracefulReconnect
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.core.messaging.startTrackedFlow import net.corda.core.messaging.startTrackedFlow
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
@ -30,6 +31,8 @@ class CordaRPCClientReconnectionTest {
private val portAllocator = incrementalPortAllocation() private val portAllocator = incrementalPortAllocation()
private val gracefulReconnect = GracefulReconnect()
companion object { companion object {
val rpcUser = User("user1", "test", permissions = setOf(Permissions.all())) val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
} }
@ -53,7 +56,7 @@ class CordaRPCClientReconnectionTest {
maxReconnectAttempts = 5 maxReconnectAttempts = 5
)) ))
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = true).proxy as ReconnectingCordaRPCOps).use { (client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect).proxy as ReconnectingCordaRPCOps).use {
val rpcOps = it val rpcOps = it
val networkParameters = rpcOps.networkParameters val networkParameters = rpcOps.networkParameters
val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java) val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java)
@ -68,7 +71,7 @@ class CordaRPCClientReconnectionTest {
val networkParametersAfterCrash = rpcOps.networkParameters val networkParametersAfterCrash = rpcOps.networkParameters
assertThat(networkParameters).isEqualTo(networkParametersAfterCrash) assertThat(networkParameters).isEqualTo(networkParametersAfterCrash)
assertTrue { assertTrue {
latch.await(2, TimeUnit.SECONDS) latch.await(20, TimeUnit.SECONDS)
} }
} }
} }
@ -93,7 +96,7 @@ class CordaRPCClientReconnectionTest {
maxReconnectAttempts = 5 maxReconnectAttempts = 5
)) ))
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = true).proxy as ReconnectingCordaRPCOps).use { (client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect).proxy as ReconnectingCordaRPCOps).use {
val rpcOps = it val rpcOps = it
val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java) val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java)
val subscription = cashStatesFeed.updates.subscribe { latch.countDown() } val subscription = cashStatesFeed.updates.subscribe { latch.countDown() }
@ -133,7 +136,7 @@ class CordaRPCClientReconnectionTest {
maxReconnectAttempts = 5 maxReconnectAttempts = 5
)) ))
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = true).proxy as ReconnectingCordaRPCOps).use { (client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect).proxy as ReconnectingCordaRPCOps).use {
val rpcOps = it val rpcOps = it
val networkParameters = rpcOps.networkParameters val networkParameters = rpcOps.networkParameters
val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java) val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java)

View File

@ -41,8 +41,20 @@ class CordaRPCConnection private constructor(
companion object { companion object {
@CordaInternal @CordaInternal
internal fun createWithGracefulReconnection(username: String, password: String, addresses: List<NetworkHostAndPort>): CordaRPCConnection { internal fun createWithGracefulReconnection(
return CordaRPCConnection(null, ReconnectingCordaRPCOps(addresses, username, password)) username: String,
password: String,
addresses: List<NetworkHostAndPort>,
rpcConfiguration: CordaRPCClientConfiguration,
gracefulReconnect: GracefulReconnect
): CordaRPCConnection {
return CordaRPCConnection(null, ReconnectingCordaRPCOps(
addresses,
username,
password,
rpcConfiguration,
gracefulReconnect
))
} }
} }
@ -241,6 +253,20 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
} }
/**
* GracefulReconnect provides the opportunity to perform certain logic when the RPC encounters a connection disconnect
* during communication with the node.
*
* NOTE: The callbacks provided may be executed on a separate thread to that which called the RPC command.
*
* @param onDisconnect implement this callback to perform logic when the RPC disconnects on connection disconnect
* @param onReconnect implement this callback to perform logic when the RPC has reconnected after connection disconnect
*/
class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -> Unit = {}) {
constructor(onDisconnect: Runnable, onReconnect: Runnable ) :
this(onDisconnect = { onDisconnect.run() }, onReconnect = { onReconnect.run() })
}
/** /**
* An RPC client connects to the specified server and allows you to make calls to the server that perform various * An RPC client connects to the specified server and allows you to make calls to the server that perform various
* useful tasks. Please see the Client RPC section of docs.corda.net to learn more about how this API works. A brief * useful tasks. Please see the Client RPC section of docs.corda.net to learn more about how this API works. A brief
@ -371,11 +397,11 @@ class CordaRPCClient private constructor(
* *
* @param username The username to authenticate with. * @param username The username to authenticate with.
* @param password The password to authenticate with. * @param password The password to authenticate with.
* @param gracefulReconnect whether the connection will reconnect gracefully. * @param gracefulReconnect a [GracefulReconnect] class containing callback logic when the RPC is dis/reconnected unexpectedly
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout. * @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/ */
@JvmOverloads @JvmOverloads
fun start(username: String, password: String, gracefulReconnect: Boolean = false): CordaRPCConnection { fun start(username: String, password: String, gracefulReconnect: GracefulReconnect? = null): CordaRPCConnection {
return start(username, password, null, null, gracefulReconnect) return start(username, password, null, null, gracefulReconnect)
} }
@ -388,11 +414,11 @@ class CordaRPCClient private constructor(
* @param username The username to authenticate with. * @param username The username to authenticate with.
* @param password The password to authenticate with. * @param password The password to authenticate with.
* @param targetLegalIdentity in case of multi-identity RPC endpoint specific legal identity to which the calls must be addressed. * @param targetLegalIdentity in case of multi-identity RPC endpoint specific legal identity to which the calls must be addressed.
* @param gracefulReconnect whether the connection will reconnect gracefully. * @param gracefulReconnect a [GracefulReconnect] class containing callback logic when the RPC is dis/reconnected unexpectedly
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout. * @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/ */
@JvmOverloads @JvmOverloads
fun start(username: String, password: String, targetLegalIdentity: CordaX500Name, gracefulReconnect: Boolean = false): CordaRPCConnection { fun start(username: String, password: String, targetLegalIdentity: CordaX500Name, gracefulReconnect: GracefulReconnect? = null): CordaRPCConnection {
return start(username, password, null, null, targetLegalIdentity, gracefulReconnect) return start(username, password, null, null, targetLegalIdentity, gracefulReconnect)
} }
@ -406,11 +432,11 @@ class CordaRPCClient private constructor(
* @param password The password to authenticate with. * @param password The password to authenticate with.
* @param externalTrace external [Trace] for correlation. * @param externalTrace external [Trace] for correlation.
* @param impersonatedActor the actor on behalf of which all the invocations will be made. * @param impersonatedActor the actor on behalf of which all the invocations will be made.
* @param gracefulReconnect whether the connection will reconnect gracefully. * @param gracefulReconnect a [GracefulReconnect] class containing callback logic when the RPC is dis/reconnected unexpectedly
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout. * @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/ */
@JvmOverloads @JvmOverloads
fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, gracefulReconnect: Boolean = false): CordaRPCConnection { fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, gracefulReconnect: GracefulReconnect? = null): CordaRPCConnection {
return start(username, password, externalTrace, impersonatedActor, null, gracefulReconnect) return start(username, password, externalTrace, impersonatedActor, null, gracefulReconnect)
} }
@ -425,19 +451,21 @@ class CordaRPCClient private constructor(
* @param externalTrace external [Trace] for correlation. * @param externalTrace external [Trace] for correlation.
* @param impersonatedActor the actor on behalf of which all the invocations will be made. * @param impersonatedActor the actor on behalf of which all the invocations will be made.
* @param targetLegalIdentity in case of multi-identity RPC endpoint specific legal identity to which the calls must be addressed. * @param targetLegalIdentity in case of multi-identity RPC endpoint specific legal identity to which the calls must be addressed.
* @param gracefulReconnect whether the connection will reconnect gracefully. * @param gracefulReconnect a [GracefulReconnect] class containing callback logic when the RPC is dis/reconnected unexpectedly.
* Note that when using graceful reconnect the values for [CordaRPCClientConfiguration.connectionMaxRetryInterval] and
* [CordaRPCClientConfiguration.maxReconnectAttempts] will be overridden in order to mangage the reconnects.
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout. * @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/ */
@JvmOverloads @JvmOverloads
fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, targetLegalIdentity: CordaX500Name?, gracefulReconnect: Boolean = false): CordaRPCConnection { fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, targetLegalIdentity: CordaX500Name?, gracefulReconnect: GracefulReconnect? = null): CordaRPCConnection {
val addresses = if (haAddressPool.isEmpty()) { val addresses = if (haAddressPool.isEmpty()) {
listOf(hostAndPort!!) listOf(hostAndPort!!)
} else { } else {
haAddressPool haAddressPool
} }
return if (gracefulReconnect) { return if (gracefulReconnect != null) {
CordaRPCConnection.createWithGracefulReconnection(username, password, addresses) CordaRPCConnection.createWithGracefulReconnection(username, password, addresses, configuration, gracefulReconnect)
} else { } else {
CordaRPCConnection(getRpcClient().start(InternalCordaRPCOps::class.java, username, password, externalTrace, impersonatedActor, targetLegalIdentity)) CordaRPCConnection(getRpcClient().start(InternalCordaRPCOps::class.java, username, password, externalTrace, impersonatedActor, targetLegalIdentity))
} }

View File

@ -1,6 +1,7 @@
package net.corda.client.rpc.internal package net.corda.client.rpc.internal
import net.corda.client.rpc.* import net.corda.client.rpc.*
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.*
import net.corda.client.rpc.reconnect.CouldNotStartFlowException import net.corda.client.rpc.reconnect.CouldNotStartFlowException
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.div import net.corda.core.internal.div
@ -11,12 +12,14 @@ import net.corda.core.messaging.ClientRpcSslOptions
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowHandle import net.corda.core.messaging.FlowHandle
import net.corda.core.utilities.* import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.seconds
import net.corda.nodeapi.exceptions.RejectedCommandException import net.corda.nodeapi.exceptions.RejectedCommandException
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.apache.activemq.artemis.api.core.ActiveMQUnBlockedException import org.apache.activemq.artemis.api.core.ActiveMQUnBlockedException
import rx.Observable
import java.lang.reflect.InvocationHandler import java.lang.reflect.InvocationHandler
import java.lang.reflect.InvocationTargetException import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method import java.lang.reflect.Method
@ -52,22 +55,25 @@ class ReconnectingCordaRPCOps private constructor(
nodeHostAndPort: NetworkHostAndPort, nodeHostAndPort: NetworkHostAndPort,
username: String, username: String,
password: String, password: String,
rpcConfiguration: CordaRPCClientConfiguration,
sslConfiguration: ClientRpcSslOptions? = null, sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null, classLoader: ClassLoader? = null,
observersPool: ExecutorService? = null observersPool: ExecutorService? = null
) : this( ) : this(
ReconnectingRPCConnection(listOf(nodeHostAndPort), username, password, sslConfiguration, classLoader), ReconnectingRPCConnection(listOf(nodeHostAndPort), username, password, rpcConfiguration, sslConfiguration, classLoader),
observersPool ?: Executors.newCachedThreadPool(), observersPool ?: Executors.newCachedThreadPool(),
observersPool != null) observersPool != null)
constructor( constructor(
nodeHostAndPorts: List<NetworkHostAndPort>, nodeHostAndPorts: List<NetworkHostAndPort>,
username: String, username: String,
password: String, password: String,
rpcConfiguration: CordaRPCClientConfiguration,
gracefulReconnect: GracefulReconnect? = null,
sslConfiguration: ClientRpcSslOptions? = null, sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null, classLoader: ClassLoader? = null,
observersPool: ExecutorService? = null observersPool: ExecutorService? = null
) : this( ) : this(
ReconnectingRPCConnection(nodeHostAndPorts, username, password, sslConfiguration, classLoader), ReconnectingRPCConnection(nodeHostAndPorts, username, password, rpcConfiguration, sslConfiguration, classLoader, gracefulReconnect),
observersPool ?: Executors.newCachedThreadPool(), observersPool ?: Executors.newCachedThreadPool(),
observersPool != null) observersPool != null)
private companion object { private companion object {
@ -116,43 +122,59 @@ class ReconnectingCordaRPCOps private constructor(
val nodeHostAndPorts: List<NetworkHostAndPort>, val nodeHostAndPorts: List<NetworkHostAndPort>,
val username: String, val username: String,
val password: String, val password: String,
val rpcConfiguration: CordaRPCClientConfiguration,
val sslConfiguration: ClientRpcSslOptions? = null, val sslConfiguration: ClientRpcSslOptions? = null,
val classLoader: ClassLoader? val classLoader: ClassLoader?,
val gracefulReconnect: GracefulReconnect? = null
) : RPCConnection<CordaRPCOps> { ) : RPCConnection<CordaRPCOps> {
private var currentRPCConnection: CordaRPCConnection? = null private var currentRPCConnection: CordaRPCConnection? = null
enum class CurrentState { enum class CurrentState {
UNCONNECTED, CONNECTED, CONNECTING, CLOSED, DIED UNCONNECTED, CONNECTED, CONNECTING, CLOSED, DIED
} }
private var currentState = CurrentState.UNCONNECTED
private var currentState = UNCONNECTED
init { init {
current current
} }
private val current: CordaRPCConnection private val current: CordaRPCConnection
@Synchronized get() = when (currentState) { @Synchronized get() = when (currentState) {
CurrentState.UNCONNECTED -> connect() UNCONNECTED -> connect()
CurrentState.CONNECTED -> currentRPCConnection!! CONNECTED -> currentRPCConnection!!
CurrentState.CLOSED -> throw IllegalArgumentException("The ReconnectingRPCConnection has been closed.") CLOSED -> throw IllegalArgumentException("The ReconnectingRPCConnection has been closed.")
CurrentState.CONNECTING, CurrentState.DIED -> throw IllegalArgumentException("Illegal state: $currentState ") CONNECTING, DIED -> throw IllegalArgumentException("Illegal state: $currentState ")
} }
/**
* Called on external error.
* Will block until the connection is established again.
*/
@Synchronized @Synchronized
fun reconnectOnError(e: Throwable) { private fun doReconnect(e: Throwable, previousConnection: CordaRPCConnection?) {
val previousConnection = currentRPCConnection if (previousConnection != currentRPCConnection) {
currentState = CurrentState.DIED // We've already done this, skip
return
}
// First one to get here gets to do all the reconnect logic, including calling onDisconnect and onReconnect. This makes sure
// that they're only called once per reconnect.
currentState = DIED
gracefulReconnect?.onDisconnect?.invoke()
//TODO - handle error cases //TODO - handle error cases
log.error("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}") log.error("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}")
log.debug("", e) log.debug("", e)
connect() connect()
previousConnection?.forceClose() previousConnection?.forceClose()
gracefulReconnect?.onReconnect?.invoke()
}
/**
* Called on external error.
* Will block until the connection is established again.
*/
fun reconnectOnError(e: Throwable) {
val previousConnection = currentRPCConnection
doReconnect(e, previousConnection)
} }
@Synchronized @Synchronized
private fun connect(): CordaRPCConnection { private fun connect(): CordaRPCConnection {
currentState = CurrentState.CONNECTING currentState = CONNECTING
currentRPCConnection = establishConnectionWithRetry() currentRPCConnection = establishConnectionWithRetry()
currentState = CurrentState.CONNECTED currentState = CONNECTED
return currentRPCConnection!! return currentRPCConnection!!
} }
@ -161,7 +183,7 @@ class ReconnectingCordaRPCOps private constructor(
log.info("Connecting to: $attemptedAddress") log.info("Connecting to: $attemptedAddress")
try { try {
return CordaRPCClient( return CordaRPCClient(
attemptedAddress, CordaRPCClientConfiguration(connectionMaxRetryInterval = retryInterval, maxReconnectAttempts = 1), sslConfiguration, classLoader attemptedAddress, rpcConfiguration.copy(connectionMaxRetryInterval = retryInterval, maxReconnectAttempts = 1), sslConfiguration, classLoader
).start(username, password).also { ).start(username, password).also {
// Check connection is truly operational before returning it. // Check connection is truly operational before returning it.
require(it.proxy.nodeInfo().legalIdentitiesAndCerts.isNotEmpty()) { require(it.proxy.nodeInfo().legalIdentitiesAndCerts.isNotEmpty()) {
@ -204,63 +226,70 @@ class ReconnectingCordaRPCOps private constructor(
get() = current.serverProtocolVersion get() = current.serverProtocolVersion
@Synchronized @Synchronized
override fun notifyServerAndClose() { override fun notifyServerAndClose() {
currentState = CurrentState.CLOSED currentState = CLOSED
currentRPCConnection?.notifyServerAndClose() currentRPCConnection?.notifyServerAndClose()
} }
@Synchronized @Synchronized
override fun forceClose() { override fun forceClose() {
currentState = CurrentState.CLOSED currentState = CLOSED
currentRPCConnection?.forceClose() currentRPCConnection?.forceClose()
} }
@Synchronized @Synchronized
override fun close() { override fun close() {
currentState = CurrentState.CLOSED currentState = CLOSED
currentRPCConnection?.close() currentRPCConnection?.close()
} }
} }
private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection, val observersPool: ExecutorService) : InvocationHandler { private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection, val observersPool: ExecutorService) : InvocationHandler {
private fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow") private fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow")
override fun invoke(proxy: Any, method: Method, args: Array<out Any>?): Any? {
val result: Any? = try { private fun checkIfIsStartFlow(method: Method, e: InvocationTargetException) {
log.debug { "Invoking RPC $method..." } if (method.isStartFlow()) {
method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also { // Don't retry flows
log.debug { "RPC $method invoked successfully." } throw CouldNotStartFlowException(e.targetException)
} }
} catch (e: InvocationTargetException) { }
fun retry() = if (method.isStartFlow()) {
// Don't retry flows private fun doInvoke(method: Method, args: Array<out Any>?): Any? {
throw CouldNotStartFlowException(e.targetException) // will stop looping when [method.invoke] succeeds
} else { while (true) {
this.invoke(proxy, method, args) try {
} log.debug { "Invoking RPC $method..." }
when (e.targetException) { return method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also {
is RejectedCommandException -> { log.debug { "RPC $method invoked successfully." }
log.error("Node is being shutdown. Operation ${method.name} rejected. Retrying when node is up...", e)
reconnectingRPCConnection.reconnectOnError(e)
this.invoke(proxy, method, args)
} }
is ConnectionFailureException -> { } catch (e: InvocationTargetException) {
log.error("Failed to perform operation ${method.name}. Connection dropped. Retrying....", e) when (e.targetException) {
reconnectingRPCConnection.reconnectOnError(e) is RejectedCommandException -> {
retry() log.error("Node is being shutdown. Operation ${method.name} rejected. Retrying when node is up...", e)
} reconnectingRPCConnection.reconnectOnError(e)
is RPCException -> { }
log.error("Failed to perform operation ${method.name}. RPCException. Retrying....", e) is ConnectionFailureException -> {
reconnectingRPCConnection.reconnectOnError(e) log.error("Failed to perform operation ${method.name}. Connection dropped. Retrying....", e)
Thread.sleep(1000) // TODO - explain why this sleep is necessary reconnectingRPCConnection.reconnectOnError(e)
retry() checkIfIsStartFlow(method, e)
} }
else -> { is RPCException -> {
log.error("Failed to perform operation ${method.name}. Unknown error. Retrying....", e) log.error("Failed to perform operation ${method.name}. RPCException. Retrying....", e)
reconnectingRPCConnection.reconnectOnError(e) reconnectingRPCConnection.reconnectOnError(e)
retry() Thread.sleep(1000) // TODO - explain why this sleep is necessary
checkIfIsStartFlow(method, e)
}
else -> {
log.error("Failed to perform operation ${method.name}. Unknown error. Retrying....", e)
reconnectingRPCConnection.reconnectOnError(e)
checkIfIsStartFlow(method, e)
}
} }
} }
} }
}
override fun invoke(proxy: Any, method: Method, args: Array<out Any>?): Any? {
return when (method.returnType) { return when (method.returnType) {
DataFeed::class.java -> { DataFeed::class.java -> {
// Intercept the data feed methods and returned a ReconnectingObservable instance // Intercept the data feed methods and return a ReconnectingObservable instance
val initialFeed: DataFeed<Any, Any?> = uncheckedCast(result) val initialFeed: DataFeed<Any, Any?> = uncheckedCast(doInvoke(method, args))
val observable = ReconnectingObservable(reconnectingRPCConnection, observersPool, initialFeed) { val observable = ReconnectingObservable(reconnectingRPCConnection, observersPool, initialFeed) {
// This handles reconnecting and creates new feeds. // This handles reconnecting and creates new feeds.
uncheckedCast(this.invoke(reconnectingRPCConnection.proxy, method, args)) uncheckedCast(this.invoke(reconnectingRPCConnection.proxy, method, args))
@ -268,10 +297,11 @@ class ReconnectingCordaRPCOps private constructor(
initialFeed.copy(updates = observable) initialFeed.copy(updates = observable)
} }
// TODO - add handlers for Observable return types. // TODO - add handlers for Observable return types.
else -> result else -> doInvoke(method, args)
} }
} }
} }
override fun close() { override fun close() {
if (!userPool) observersPool.shutdown() if (!userPool) observersPool.shutdown()
retryFlowsPool.shutdown() retryFlowsPool.shutdown()

View File

@ -44,8 +44,9 @@ Unreleased
* Added ``nodeDiagnosticInfo`` to the RPC API. The new RPC is also available as the ``run nodeDiagnosticInfo`` command executable from * Added ``nodeDiagnosticInfo`` to the RPC API. The new RPC is also available as the ``run nodeDiagnosticInfo`` command executable from
the Corda shell. It retrieves version information about the Corda platform and the CorDapps installed on the node. the Corda shell. It retrieves version information about the Corda platform and the CorDapps installed on the node.
* ``CordaRPCClient.start`` has a new ``gracefulReconnect`` parameter. When ``true`` (the default is ``false``) it will cause the RPC client * ``CordaRPCClient.start`` has a new ``gracefulReconnect`` parameter. The class ``GracefulReconnect`` takes two lambdas - one for callbacks
to try to automatically reconnect to the node on disconnect. Further any ``Observable`` s previously created will continue to vend new on disconnect, and one for callbacks on reconnection. When provided (ie. the ``gracefulReconnect`` parameter is not null) the RPC client
will to try to automatically reconnect to the node on disconnect. Further any ``Observable`` s previously created will continue to vend new
events on reconnect. events on reconnect.
.. note:: This is only best-effort and there are no guarantees of reliability. .. note:: This is only best-effort and there are no guarantees of reliability.

View File

@ -373,10 +373,29 @@ More specifically, the behaviour in the second case is a bit more subtle:
You can enable this graceful form of reconnection by using the ``gracefulReconnect`` parameter in the following way: You can enable this graceful form of reconnection by using the ``gracefulReconnect`` parameter in the following way:
.. sourcecode:: kotlin .. container:: codeset
val cordaClient = CordaRPCClient(nodeRpcAddress) .. sourcecode:: kotlin
val cordaRpcOps = cordaClient.start(rpcUserName, rpcUserPassword, gracefulReconnect = true).proxy
val gracefulReconnect = GracefulReconnect(onDisconnect={/*insert disconnect handling*/}, onReconnect{/*insert reconnect handling*/})
val cordaClient = CordaRPCClient(nodeRpcAddress)
val cordaRpcOps = cordaClient.start(rpcUserName, rpcUserPassword, gracefulReconnect = gracefulReconnect).proxy
.. sourcecode:: java
private void onDisconnect() {
// Insert implementation
}
private void onReconnect() {
// Insert implementation
}
void method() {
GracefulReconnect gracefulReconnect = new GracefulReconnect(this::onDisconnect, this::onReconnect);
CordaRPCClient cordaClient = new CordaRPCClient(nodeRpcAddress);
CordaRPCConnection cordaRpcOps = cordaClient.start(rpcUserName, rpcUserPassword, gracefulReconnect);
}
Retrying flow invocations Retrying flow invocations
~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@ -1,7 +1,7 @@
package net.corda.node.services.rpc package net.corda.node.services.rpc
import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.GracefulReconnect
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.client.rpc.notUsed import net.corda.client.rpc.notUsed
import net.corda.core.contracts.Amount import net.corda.core.contracts.Amount
@ -12,7 +12,10 @@ import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.PageSpecification import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.builder import net.corda.core.node.services.vault.builder
import net.corda.core.utilities.* import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.finance.contracts.asset.Cash import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.CashIssueAndPaymentFlow import net.corda.finance.flows.CashIssueAndPaymentFlow
import net.corda.finance.schemas.CashSchemaV1 import net.corda.finance.schemas.CashSchemaV1
@ -36,8 +39,10 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread import kotlin.concurrent.thread
import kotlin.math.absoluteValue import kotlin.math.absoluteValue
import kotlin.math.max
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertTrue import kotlin.test.assertTrue
import kotlin.test.currentStackTrace
/** /**
* This is a stress test for the rpc reconnection logic, which triggers failures in a probabilistic way. * This is a stress test for the rpc reconnection logic, which triggers failures in a probabilistic way.
@ -114,10 +119,21 @@ class RpcReconnectTests {
val baseAmount = Amount.parseCurrency("0 USD") val baseAmount = Amount.parseCurrency("0 USD")
val issuerRef = OpaqueBytes.of(0x01) val issuerRef = OpaqueBytes.of(0x01)
var numDisconnects = 0
var numReconnects = 0
val maxStackOccurrences = AtomicInteger()
val addressesForRpc = addresses.map { it.proxyAddress } val addressesForRpc = addresses.map { it.proxyAddress }
// DOCSTART rpcReconnectingRPC // DOCSTART rpcReconnectingRPC
val onReconnect = {
numReconnects++
// We only expect to see a single reconnectOnError in the stack trace. Otherwise we're in danger of stack overflow recursion
maxStackOccurrences.set(max(maxStackOccurrences.get(), currentStackTrace().count { it.methodName == "reconnectOnError" }))
Unit
}
val reconnect = GracefulReconnect(onDisconnect = { numDisconnects++ }, onReconnect = onReconnect)
val client = CordaRPCClient(addressesForRpc) val client = CordaRPCClient(addressesForRpc)
val bankAReconnectingRpc = client.start(demoUser.username, demoUser.password, gracefulReconnect = true).proxy as ReconnectingCordaRPCOps val bankAReconnectingRpc = client.start(demoUser.username, demoUser.password, gracefulReconnect = reconnect).proxy as ReconnectingCordaRPCOps
// DOCEND rpcReconnectingRPC // DOCEND rpcReconnectingRPC
// Observe the vault and collect the observations. // Observe the vault and collect the observations.
@ -266,6 +282,11 @@ class RpcReconnectTests {
val nrFailures = nrRestarts.get() val nrFailures = nrRestarts.get()
log.info("Checking results after $nrFailures restarts.") log.info("Checking results after $nrFailures restarts.")
// We should get one disconnect and one reconnect for each failure
assertThat(numDisconnects).isEqualTo(numReconnects)
assertThat(numReconnects).isLessThanOrEqualTo(nrFailures)
assertThat(maxStackOccurrences.get()).isLessThan(2)
// Query the vault and check that states were created for all flows. // Query the vault and check that states were created for all flows.
fun readCashStates() = bankAReconnectingRpc fun readCashStates() = bankAReconnectingRpc
.vaultQueryByWithPagingSpec(Cash.State::class.java, QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.CONSUMED), PageSpecification(1, 10000)) .vaultQueryByWithPagingSpec(Cash.State::class.java, QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.CONSUMED), PageSpecification(1, 10000))

View File

@ -1,6 +1,7 @@
package net.corda.bank.api package net.corda.bank.api
import net.corda.bank.api.BankOfCordaWebApi.IssueRequestParams import net.corda.bank.api.BankOfCordaWebApi.IssueRequestParams
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
@ -43,7 +44,7 @@ object BankOfCordaClientApi {
*/ */
fun requestRPCIssueHA(availableRpcServers: List<NetworkHostAndPort>, params: IssueRequestParams): SignedTransaction { fun requestRPCIssueHA(availableRpcServers: List<NetworkHostAndPort>, params: IssueRequestParams): SignedTransaction {
// TODO: privileged security controls required // TODO: privileged security controls required
ReconnectingCordaRPCOps(availableRpcServers, BOC_RPC_USER, BOC_RPC_PWD).use { rpc-> ReconnectingCordaRPCOps(availableRpcServers, BOC_RPC_USER, BOC_RPC_PWD, CordaRPCClientConfiguration.DEFAULT).use { rpc->
rpc.waitUntilNetworkReady().getOrThrow() rpc.waitUntilNetworkReady().getOrThrow()
// Resolve parties via RPC // Resolve parties via RPC

View File

@ -91,7 +91,7 @@ object InteractiveShell {
fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null, standalone: Boolean = false) { fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null, standalone: Boolean = false) {
rpcOps = { username: String, password: String -> rpcOps = { username: String, password: String ->
if (standalone) { if (standalone) {
ReconnectingCordaRPCOps(configuration.hostAndPort, username, password, configuration.ssl, classLoader).also { ReconnectingCordaRPCOps(configuration.hostAndPort, username, password, CordaRPCClientConfiguration.DEFAULT, configuration.ssl, classLoader).also {
rpcConn = it rpcConn = it
} }
} else { } else {

View File

@ -3,6 +3,7 @@ package net.corda.webserver.internal
import com.google.common.html.HtmlEscapers.htmlEscaper import com.google.common.html.HtmlEscapers.htmlEscaper
import io.netty.channel.unix.Errors import io.netty.channel.unix.Errors
import net.corda.client.jackson.JacksonSupport import net.corda.client.jackson.JacksonSupport
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.core.internal.errors.AddressBindingException import net.corda.core.internal.errors.AddressBindingException
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
@ -174,7 +175,7 @@ class NodeWebServer(val config: WebServerConfig) {
} }
} }
private fun reconnectingCordaRPCOps() = ReconnectingCordaRPCOps(config.rpcAddress, config.runAs.username , config.runAs.password, null, javaClass.classLoader) private fun reconnectingCordaRPCOps() = ReconnectingCordaRPCOps(config.rpcAddress, config.runAs.username , config.runAs.password, CordaRPCClientConfiguration.DEFAULT, null, javaClass.classLoader)
/** Fetch WebServerPluginRegistry classes registered in META-INF/services/net.corda.webserver.services.WebServerPluginRegistry files that exist in the classpath */ /** Fetch WebServerPluginRegistry classes registered in META-INF/services/net.corda.webserver.services.WebServerPluginRegistry files that exist in the classpath */
val pluginRegistries: List<WebServerPluginRegistry> by lazy { val pluginRegistries: List<WebServerPluginRegistry> by lazy {