From d983b7dd7073272c76553d937b3900608aa2bfa5 Mon Sep 17 00:00:00 2001 From: Ryan Fowler Date: Wed, 9 Oct 2019 13:43:22 +0100 Subject: [PATCH] CORDA-3281: Add a check for shutdown to avoid some of the errors (#5562) from the reconnecting RPC when we know the server is going away --- .../rpc/internal/ReconnectingCordaRPCOps.kt | 60 ++++++++++--------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt index a6542d1451..0967e99edc 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt @@ -89,7 +89,12 @@ class ReconnectingCordaRPCOps private constructor( * * Note that this method does not guarantee 100% that the flow will not be started twice. */ - fun runFlowWithLogicalRetry(runFlow: (CordaRPCOps) -> StateMachineRunId, hasFlowStarted: (CordaRPCOps) -> Boolean, onFlowConfirmed: () -> Unit = {}, timeout: Duration = 4.seconds) { + fun runFlowWithLogicalRetry( + runFlow: (CordaRPCOps) -> StateMachineRunId, + hasFlowStarted: (CordaRPCOps) -> Boolean, + onFlowConfirmed: () -> Unit = {}, + timeout: Duration = 4.seconds + ) { try { runFlow(this) onFlowConfirmed() @@ -249,39 +254,40 @@ class ReconnectingCordaRPCOps private constructor( } } - private fun doInvoke(method: Method, args: Array?): Any? { - // will stop looping when [method.invoke] succeeds - while (true) { - try { - log.debug { "Invoking RPC $method..." } - return method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also { - log.debug { "RPC $method invoked successfully." } + private tailrec fun doInvoke(method: Method, args: Array?): Any? { + // will stop recursing when [method.invoke] succeeds + try { + log.debug { "Invoking RPC $method..." } + return method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also { + log.debug { "RPC $method invoked successfully." } + } + } catch (e: InvocationTargetException) { + when (e.targetException) { + is RejectedCommandException -> { + log.error("Node is being shutdown. Operation ${method.name} rejected. Retrying when node is up...", e) + reconnectingRPCConnection.reconnectOnError(e) } - } catch (e: InvocationTargetException) { - when (e.targetException) { - is RejectedCommandException -> { - log.error("Node is being shutdown. Operation ${method.name} rejected. Retrying when node is up...", e) - reconnectingRPCConnection.reconnectOnError(e) - } - is ConnectionFailureException -> { + is ConnectionFailureException -> { + if (!reconnectingRPCConnection.proxy.isWaitingForShutdown()) { log.error("Failed to perform operation ${method.name}. Connection dropped. Retrying....", e) reconnectingRPCConnection.reconnectOnError(e) - checkIfIsStartFlow(method, e) - } - is RPCException -> { - log.error("Failed to perform operation ${method.name}. RPCException. Retrying....", e) - reconnectingRPCConnection.reconnectOnError(e) - Thread.sleep(1000) // TODO - explain why this sleep is necessary - checkIfIsStartFlow(method, e) - } - else -> { - log.error("Failed to perform operation ${method.name}. Unknown error. Retrying....", e) - reconnectingRPCConnection.reconnectOnError(e) - checkIfIsStartFlow(method, e) } + checkIfIsStartFlow(method, e) + } + is RPCException -> { + log.error("Failed to perform operation ${method.name}. RPCException. Retrying....", e) + reconnectingRPCConnection.reconnectOnError(e) + Thread.sleep(1000) // TODO - explain why this sleep is necessary + checkIfIsStartFlow(method, e) + } + else -> { + log.error("Failed to perform operation ${method.name}. Unknown error. Retrying....", e) + reconnectingRPCConnection.reconnectOnError(e) + checkIfIsStartFlow(method, e) } } } + return doInvoke(method, args) } override fun invoke(proxy: Any, method: Method, args: Array?): Any? {