From a204b50f5b05942208659570742af79cac48328d Mon Sep 17 00:00:00 2001 From: James Higgs <45565019+JamesHR3@users.noreply.github.com> Date: Fri, 19 Jul 2019 15:28:44 +0100 Subject: [PATCH] [CORDA-2923] Prevent connection threads leaking on reconnect (#5313) --- .../kotlin/net/corda/client/rpc/RPCStabilityTests.kt | 8 ++++---- .../corda/client/rpc/internal/ReconnectingCordaRPCOps.kt | 2 ++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index 752d6e7196..6397a27458 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -91,10 +91,10 @@ class RPCStabilityTests { block() } val threadsAfter = waitUntilNumberOfThreadsStable(executor) - // This is a less than check because threads from other tests may be shutting down while this test is running. - // This is therefore a "best effort" check. When this test is run on its own this should be a strict equality. - // In case of failure we output the threads along with their stacktraces to get an idea what was running at a time. - require(threadsBefore.keys.size >= threadsAfter.keys.size) { "threadsBefore: $threadsBefore\nthreadsAfter: $threadsAfter" } + val newThreads = threadsAfter.keys.minus(threadsBefore.keys) + require(newThreads.isEmpty()) { + "Threads have leaked. New threads created: $newThreads (total before: ${threadsBefore.size}, total after: ${threadsAfter.size})" + } } finally { executor.shutdownNow() } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt index a4f1f9212c..2d47dfe640 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt @@ -157,6 +157,8 @@ class ReconnectingCordaRPCOps private constructor( */ @Synchronized fun reconnectOnError(e: Throwable) { + // Ensure any resources on this side are cleaned up before building a new connection + currentRPCConnection?.close() currentState = CurrentState.DIED //TODO - handle error cases log.error("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}")