From bfcd1d8791bef3b8335b2d7c4c3b280e1805e435 Mon Sep 17 00:00:00 2001 From: Tamas Veingartner Date: Mon, 8 Jun 2020 18:03:02 +0100 Subject: [PATCH 1/2] ENT-5379 Reconnecting RPC fixed to recognize shutdown calls and break reconnect attempts (#6316) --- .../CordaRPCClientReconnectionTest.kt | 23 +++++++++++++++++++ .../rpc/internal/ReconnectingCordaRPCOps.kt | 8 ++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt index 031b990b0a..70ae13731f 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt @@ -293,4 +293,27 @@ class CordaRPCClientReconnectionTest { .isInstanceOf(RPCException::class.java) } } + + @Test(timeout=300_000) + fun `rpc client does not attempt to reconnect after shutdown`() { + driver(DriverParameters(cordappsForAllNodes = emptyList())) { + val address = NetworkHostAndPort("localhost", portAllocator.nextPort()) + fun startNode(): NodeHandle { + return startNode( + providedName = CHARLIE_NAME, + rpcUsers = listOf(CordaRPCClientTest.rpcUser), + customOverrides = mapOf("rpcSettings.address" to address.toString()) + ).getOrThrow() + } + + val node = startNode() + val client = CordaRPCClient(node.rpcAddress, config) + (client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use { + val rpcOps = it.proxy as ReconnectingCordaRPCOps + rpcOps.shutdown() + // If we get here we know we're not stuck in a reconnect cycle with a node that's been shut down + assertThat(rpcOps.reconnectingRPCConnection.isClosed()) + } + } + } } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt index 9fdfa6089a..af3c7ab1b5 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt @@ -311,13 +311,18 @@ class ReconnectingCordaRPCOps private constructor( checkIfClosed() var remainingAttempts = maxNumberOfAttempts var lastException: Throwable? = null - while (remainingAttempts != 0) { + while (remainingAttempts != 0 && !reconnectingRPCConnection.isClosed()) { try { log.debug { "Invoking RPC $method..." } return method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also { log.debug { "RPC $method invoked successfully." } } } catch (e: InvocationTargetException) { + if (method.name.equals("shutdown", true)) { + log.debug("Shutdown invoked, stop reconnecting.", e) + reconnectingRPCConnection.notifyServerAndClose() + break + } when (e.targetException) { is RejectedCommandException -> { log.warn("Node is being shutdown. Operation ${method.name} rejected. Retrying when node is up...", e) @@ -349,6 +354,7 @@ class ReconnectingCordaRPCOps private constructor( } } + if (reconnectingRPCConnection.isClosed()) return null throw MaxRpcRetryException(maxNumberOfAttempts, method, lastException) } From a5a0c64e2d7064aaf43ecbcb7eae5af1a55a3c53 Mon Sep 17 00:00:00 2001 From: Ryan Fowler Date: Mon, 27 Jul 2020 14:09:26 +0100 Subject: [PATCH 2/2] CORDA-3918: Port of ENT-5417: Allow exceptions to propagate when shutdown commands are called (#6516) --- .../rpc/internal/ReconnectingCordaRPCOps.kt | 19 ++++++++++--------- .../net/corda/tools/shell/InteractiveShell.kt | 10 ++++++---- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt index af3c7ab1b5..7db8597ba3 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt @@ -292,6 +292,7 @@ class ReconnectingCordaRPCOps private constructor( } private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection) : InvocationHandler { private fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow") + private fun Method.isShutdown() = name == "shutdown" || name == "gracefulShutdown" || name == "terminate" private fun checkIfIsStartFlow(method: Method, e: InvocationTargetException) { if (method.isStartFlow()) { @@ -306,7 +307,7 @@ class ReconnectingCordaRPCOps private constructor( * * A negative number for [maxNumberOfAttempts] means an unlimited number of retries will be performed. */ - @Suppress("ThrowsCount", "ComplexMethod") + @Suppress("ThrowsCount", "ComplexMethod", "NestedBlockDepth") private fun doInvoke(method: Method, args: Array?, maxNumberOfAttempts: Int): Any? { checkIfClosed() var remainingAttempts = maxNumberOfAttempts @@ -318,20 +319,20 @@ class ReconnectingCordaRPCOps private constructor( log.debug { "RPC $method invoked successfully." } } } catch (e: InvocationTargetException) { - if (method.name.equals("shutdown", true)) { - log.debug("Shutdown invoked, stop reconnecting.", e) - reconnectingRPCConnection.notifyServerAndClose() - break - } when (e.targetException) { is RejectedCommandException -> { log.warn("Node is being shutdown. Operation ${method.name} rejected. Retrying when node is up...", e) reconnectingRPCConnection.reconnectOnError(e) } is ConnectionFailureException -> { - log.warn("Failed to perform operation ${method.name}. Connection dropped. Retrying....", e) - reconnectingRPCConnection.reconnectOnError(e) - checkIfIsStartFlow(method, e) + if (method.isShutdown()) { + log.debug("Shutdown invoked, stop reconnecting.", e) + reconnectingRPCConnection.notifyServerAndClose() + } else { + log.warn("Failed to perform operation ${method.name}. Connection dropped. Retrying....", e) + reconnectingRPCConnection.reconnectOnError(e) + checkIfIsStartFlow(method, e) + } } is RPCException -> { rethrowIfUnrecoverable(e.targetException as RPCException) diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt index bc00b7b53a..d78f4be24f 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt @@ -111,6 +111,8 @@ object InteractiveShell { YAML } + private fun isShutdownCmd(cmd: String) = cmd == "shutdown" || cmd == "gracefulShutdown" || cmd == "terminate" + fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null, standalone: Boolean = false) { makeRPCConnection = { username: String, password: String -> val connection = if (standalone) { @@ -623,6 +625,10 @@ object InteractiveShell { throw e.rootCause } } + if (isShutdownCmd(cmd)) { + out.println("Called 'shutdown' on the node.\nQuitting the shell now.").also { out.flush() } + onExit.invoke() + } } catch (e: StringToMethodCallParser.UnparseableCallException) { out.println(e.message, Decoration.bold, Color.red) if (e !is StringToMethodCallParser.UnparseableCallException.NoSuchFile) { @@ -634,10 +640,6 @@ object InteractiveShell { InputStreamSerializer.invokeContext = null InputStreamDeserializer.closeAll() } - if (cmd == "shutdown") { - out.println("Called 'shutdown' on the node.\nQuitting the shell now.").also { out.flush() } - onExit.invoke() - } return result }