diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index b4d4016824..197dee96d1 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -282,12 +282,13 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor( * * @param onDisconnect implement this callback to perform logic when the RPC disconnects on connection disconnect * @param onReconnect implement this callback to perform logic when the RPC has reconnected after connection disconnect - * @param maxRetries the maximum number of retries per each individual RPC call. A negative number indicates infinite number of retries. + * @param maxAttempts the maximum number of attempts per each individual RPC call. A negative number indicates infinite number of retries. * The default value is 5. */ -class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -> Unit = {}, val maxRetries: Int = 5) { - constructor(onDisconnect: Runnable, onReconnect: Runnable, maxRetries: Int = 5) : - this(onDisconnect = { onDisconnect.run() }, onReconnect = { onReconnect.run() }, maxRetries = maxRetries) +class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -> Unit = {}, val maxAttempts: Int = 5) { + @JvmOverloads + constructor(onDisconnect: Runnable, onReconnect: Runnable, maxAttempts: Int = 5) : + this(onDisconnect = { onDisconnect.run() }, onReconnect = { onReconnect.run() }, maxAttempts = maxAttempts) } /** diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt index af81268a58..89e60ec7f7 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt @@ -20,7 +20,6 @@ import net.corda.nodeapi.exceptions.RejectedCommandException import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import org.apache.activemq.artemis.api.core.ActiveMQUnBlockedException -import java.lang.RuntimeException import java.lang.reflect.InvocationHandler import java.lang.reflect.InvocationTargetException import java.lang.reflect.Method @@ -252,14 +251,14 @@ class ReconnectingCordaRPCOps private constructor( /** * This method retries the invoked operation in a loop by re-establishing the connection when there is a problem - * and checking if the [maxNumberOfRetries] has been exhausted. + * and checking if the [maxNumberOfAttempts] has been exhausted. * - * A negative number for [maxNumberOfRetries] means an unlimited number of retries will be performed. + * A negative number for [maxNumberOfAttempts] means an unlimited number of retries will be performed. */ - private fun doInvoke(method: Method, args: Array?, maxNumberOfRetries: Int): Any? { - var remainingRetries = maxNumberOfRetries + private fun doInvoke(method: Method, args: Array?, maxNumberOfAttempts: Int): Any? { + var remainingAttempts = maxNumberOfAttempts var lastException: Throwable? = null - while (remainingRetries != 0) { + while (remainingAttempts != 0) { try { log.debug { "Invoking RPC $method..." } return method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also { @@ -289,11 +288,11 @@ class ReconnectingCordaRPCOps private constructor( } } lastException = e.targetException - remainingRetries-- + remainingAttempts-- } } - throw MaxRpcRetryException(maxNumberOfRetries, lastException) + throw MaxRpcRetryException(maxNumberOfAttempts, lastException) } override fun invoke(proxy: Any, method: Method, args: Array?): Any? { @@ -301,7 +300,7 @@ class ReconnectingCordaRPCOps private constructor( DataFeed::class.java -> { // Intercept the data feed methods and return a ReconnectingObservable instance val initialFeed: DataFeed = uncheckedCast(doInvoke(method, args, - reconnectingRPCConnection.gracefulReconnect.maxRetries)) + reconnectingRPCConnection.gracefulReconnect.maxAttempts)) val observable = ReconnectingObservable(reconnectingRPCConnection, initialFeed) { // This handles reconnecting and creates new feeds. uncheckedCast(this.invoke(reconnectingRPCConnection.proxy, method, args)) @@ -309,7 +308,7 @@ class ReconnectingCordaRPCOps private constructor( initialFeed.copy(updates = observable) } // TODO - add handlers for Observable return types. - else -> doInvoke(method, args, reconnectingRPCConnection.gracefulReconnect.maxRetries) + else -> doInvoke(method, args, reconnectingRPCConnection.gracefulReconnect.maxAttempts) } } } diff --git a/docs/source/clientrpc.rst b/docs/source/clientrpc.rst index 02e00552b2..61feef9a80 100644 --- a/docs/source/clientrpc.rst +++ b/docs/source/clientrpc.rst @@ -371,13 +371,19 @@ More specifically, the behaviour in the second case is a bit more subtle: .. warning:: In this approach, some events might be lost during a reconnection and not sent from the subscribed ``Observable``\s. -You can enable this graceful form of reconnection by using the ``gracefulReconnect`` parameter in the following way: +You can enable this graceful form of reconnection by using the ``gracefulReconnect`` parameter, which is an object containing 3 optional fields: + +* ``onDisconnect``: A callback handler that will be invoked every time the connection is disconnected. +* ``onReconnect``: A callback handler that will be invoked every time the connection is established again after a disconnection. +* ``maxAttempts``: The maximum number of attempts that will be performed per RPC operation. A negative value implies infinite retries. The default value is 5. + +This can be used in the following way: .. container:: codeset .. sourcecode:: kotlin - val gracefulReconnect = GracefulReconnect(onDisconnect={/*insert disconnect handling*/}, onReconnect{/*insert reconnect handling*/}) + val gracefulReconnect = GracefulReconnect(onDisconnect={/*insert disconnect handling*/}, onReconnect{/*insert reconnect handling*/}, maxAttempts = 3) val cordaClient = CordaRPCClient(nodeRpcAddress) val cordaRpcOps = cordaClient.start(rpcUserName, rpcUserPassword, gracefulReconnect = gracefulReconnect).proxy @@ -392,7 +398,7 @@ You can enable this graceful form of reconnection by using the ``gracefulReconne } void method() { - GracefulReconnect gracefulReconnect = new GracefulReconnect(this::onDisconnect, this::onReconnect); + GracefulReconnect gracefulReconnect = new GracefulReconnect(this::onDisconnect, this::onReconnect, 3); CordaRPCClient cordaClient = new CordaRPCClient(nodeRpcAddress); CordaRPCConnection cordaRpcOps = cordaClient.start(rpcUserName, rpcUserPassword, gracefulReconnect); }