diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index bcb479db1d..6397a27458 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -69,7 +69,6 @@ class RPCStabilityTests { } @Test - @Ignore("Ignored as it became increasingly flaky. CORDA-3098") fun `client and server dont leak threads`() { fun startAndStop() { rpcDriver { @@ -92,17 +91,16 @@ class RPCStabilityTests { block() } val threadsAfter = waitUntilNumberOfThreadsStable(executor) - // This is a less than check because threads from other tests may be shutting down while this test is running. - // This is therefore a "best effort" check. When this test is run on its own this should be a strict equality. - // In case of failure we output the threads along with their stacktraces to get an idea what was running at a time. - require(threadsBefore.keys.size >= threadsAfter.keys.size) { "threadsBefore: $threadsBefore\nthreadsAfter: $threadsAfter" } + val newThreads = threadsAfter.keys.minus(threadsBefore.keys) + require(newThreads.isEmpty()) { + "Threads have leaked. New threads created: $newThreads (total before: ${threadsBefore.size}, total after: ${threadsAfter.size})" + } } finally { executor.shutdownNow() } } @Test - @Ignore("Ignored as it became increasingly flaky. CORDA-3098") fun `client doesnt leak threads when it fails to start`() { fun startAndStop() { rpcDriver { diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 725ff71fff..6470cc9525 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -407,7 +407,24 @@ class RPCClientProxyHandler( } reaperScheduledFuture?.cancel(false) + val observablesMap = observableContext.observableMap.asMap() + observablesMap.keys.forEach { key -> + observationExecutorPool.run(key) { + try { + observablesMap[key]?.onError(ConnectionFailureException()) + } catch (e: Exception) { + log.error("Unexpected exception when RPC connection failure handling", e) + } + } + } observableContext.observableMap.invalidateAll() + rpcReplyMap.forEach { _, replyFuture -> + replyFuture.setException(ConnectionFailureException()) + } + + rpcReplyMap.clear() + callSiteMap?.clear() + reapObservables(notify) reaperExecutor?.shutdownNow() sendExecutor?.shutdownNow() diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt index db478b849e..e4bf248a1f 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt @@ -143,11 +143,13 @@ class ReconnectingCordaRPCOps private constructor( */ @Synchronized fun reconnectOnError(e: Throwable) { + val previousConnection = currentRPCConnection currentState = CurrentState.DIED //TODO - handle error cases log.error("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}") log.debug("", e) connect() + previousConnection?.forceClose() } @Synchronized private fun connect(): CordaRPCConnection { @@ -156,18 +158,20 @@ class ReconnectingCordaRPCOps private constructor( currentState = CurrentState.CONNECTED return currentRPCConnection!! } - private tailrec fun establishConnectionWithRetry(retryInterval: Duration = 1.seconds, currentAuthenticationRetries: Int = 0): CordaRPCConnection { + + private tailrec fun establishConnectionWithRetry(retryInterval: Duration = 1.seconds, currentAuthenticationRetries: Int = 0, roundRobinIndex: Int = 0): CordaRPCConnection { var _currentAuthenticationRetries = currentAuthenticationRetries - log.info("Connecting to: $nodeHostAndPorts") + val attemptedAddress = nodeHostAndPorts[roundRobinIndex] + log.info("Connecting to: $attemptedAddress") try { return CordaRPCClient( - nodeHostAndPorts, CordaRPCClientConfiguration(connectionMaxRetryInterval = retryInterval), sslConfiguration, classLoader + attemptedAddress, CordaRPCClientConfiguration(connectionMaxRetryInterval = retryInterval, maxReconnectAttempts = 1), sslConfiguration, classLoader ).start(username, password).also { // Check connection is truly operational before returning it. require(it.proxy.nodeInfo().legalIdentitiesAndCerts.isNotEmpty()) { - "Could not establish connection to $nodeHostAndPorts." + "Could not establish connection to $attemptedAddress." } - log.debug { "Connection successfully established with: $nodeHostAndPorts" } + log.debug { "Connection successfully established with: $attemptedAddress" } } } catch (ex: Exception) { when (ex) { @@ -199,7 +203,8 @@ class ReconnectingCordaRPCOps private constructor( // Could not connect this time round - pause before giving another try. Thread.sleep(retryInterval.toMillis()) // TODO - make the exponential retry factor configurable. - return establishConnectionWithRetry((retryInterval * 10) / 9, _currentAuthenticationRetries) + val nextRoundRobinIndex = (roundRobinIndex + 1) % nodeHostAndPorts.size + return establishConnectionWithRetry((retryInterval * 10) / 9, _currentAuthenticationRetries, nextRoundRobinIndex) } override val proxy: CordaRPCOps get() = current.proxy diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt index a8fea5e71f..545b40531e 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt @@ -49,7 +49,9 @@ class ReconnectingObservable private constructor(subscriber: ReconnectingSubs if (unsubscribed) return val subscriber = checkNotNull(this.subscriber.get()) try { + val previousSubscription = backingSubscription backingSubscription = dataFeed.updates.subscribe(subscriber::onNext, ::scheduleResubscribe, subscriber::onCompleted) + previousSubscription?.unsubscribe() } catch (e: Exception) { scheduleResubscribe(e) } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt index 35684b90fe..021d49219f 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt @@ -1,6 +1,9 @@ package net.corda.node.services.rpc +import net.corda.client.rpc.CordaRPCClient +import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.internal.ReconnectingCordaRPCOps +import net.corda.client.rpc.notUsed import net.corda.core.contracts.Amount import net.corda.core.flows.StateMachineRunId import net.corda.core.internal.concurrent.transpose @@ -9,10 +12,7 @@ import net.corda.core.node.services.Vault import net.corda.core.node.services.vault.PageSpecification import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.node.services.vault.builder -import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.OpaqueBytes -import net.corda.core.utilities.contextLogger -import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.* import net.corda.finance.contracts.asset.Cash import net.corda.finance.flows.CashIssueAndPaymentFlow import net.corda.finance.schemas.CashSchemaV1 @@ -28,9 +28,11 @@ import net.corda.testing.driver.internal.OutOfProcessImpl import net.corda.testing.driver.internal.incrementalPortAllocation import net.corda.testing.node.User import net.corda.testing.node.internal.FINANCE_CORDAPPS +import org.assertj.core.api.Assertions.assertThat import org.junit.Test import java.util.* import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import kotlin.concurrent.thread import kotlin.math.absoluteValue @@ -98,7 +100,7 @@ class RpcReconnectTests { fun startBankA(address: NetworkHostAndPort) = startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("rpcSettings.address" to address.toString())) fun startProxy(addressPair: AddressPair) = RandomFailingProxy(serverPort = addressPair.proxyAddress.port, remotePort = addressPair.nodeAddress.port).start() - val addresses = (1..3).map { getRandomAddressPair() } + val addresses = (1..2).map { getRandomAddressPair() } currentAddressPair = addresses[0] proxy = startProxy(currentAddressPair) @@ -114,7 +116,8 @@ class RpcReconnectTests { val addressesForRpc = addresses.map { it.proxyAddress } // DOCSTART rpcReconnectingRPC - val bankAReconnectingRpc = ReconnectingCordaRPCOps(addressesForRpc, demoUser.username, demoUser.password) + val client = CordaRPCClient(addressesForRpc) + val bankAReconnectingRpc = client.start(demoUser.username, demoUser.password, gracefulReconnect = true).proxy as ReconnectingCordaRPCOps // DOCEND rpcReconnectingRPC // Observe the vault and collect the observations. @@ -186,7 +189,7 @@ class RpcReconnectTests { log.info("Performing failover to a different node") node.stop() proxy.stop() - currentAddressPair = addresses[Random().nextInt(addresses.size)] + currentAddressPair = (addresses - currentAddressPair).first() node = startBankA(currentAddressPair.nodeAddress).get() proxy = startProxy(currentAddressPair) } @@ -214,6 +217,8 @@ class RpcReconnectTests { log.info("Started flow $amount with flowId: $flowId") flowProgressEvents.addEvent(flowId, null) + flowHandle.stepsTreeFeed?.updates?.notUsed() + flowHandle.stepsTreeIndexFeed?.updates?.notUsed() // No reconnecting possible. flowHandle.progress.subscribe( { prog -> @@ -246,9 +251,14 @@ class RpcReconnectTests { log.info("Started all flows") // Wait until all flows have been started. - flowsCountdownLatch.await() + val flowsConfirmed = flowsCountdownLatch.await(10, TimeUnit.MINUTES) + + if (flowsConfirmed) { + log.info("Confirmed all flows have started.") + } else { + log.info("Timed out waiting for confirmation that all flows have started. Remaining flows: ${flowsCountdownLatch.count}") + } - log.info("Confirmed all flows.") // Wait for all events to come in and flows to finish. Thread.sleep(4000) @@ -272,7 +282,7 @@ class RpcReconnectTests { val allCash = allCashStates.map { it.state.data.amount.quantity }.toSet() val missingCash = (1..NUMBER_OF_FLOWS_TO_RUN).filterNot { allCash.contains(it.toLong() * 100) } - log.info("MISSING: $missingCash") + log.info("Missing cash states: $missingCash") assertEquals(NUMBER_OF_FLOWS_TO_RUN, allCashStates.size, "Not all flows were executed successfully") @@ -284,17 +294,17 @@ class RpcReconnectTests { // Check that enough vault events were received. // This check is fuzzy because events can go missing during node restarts. // Ideally there should be nrOfFlowsToRun events receive but some might get lost for each restart. - assertTrue(vaultEvents!!.size + nrFailures * 3 >= NUMBER_OF_FLOWS_TO_RUN, "Not all vault events were received") + assertThat(vaultEvents!!.size + nrFailures * 3).isGreaterThanOrEqualTo(NUMBER_OF_FLOWS_TO_RUN) // DOCEND missingVaultEvents // Check that no flow was triggered twice. val duplicates = allCashStates.groupBy { it.state.data.amount }.filterValues { it.size > 1 } assertTrue(duplicates.isEmpty(), "${duplicates.size} flows were retried illegally.") - log.info("SM EVENTS: ${stateMachineEvents!!.size}") + log.info("State machine events seen: ${stateMachineEvents!!.size}") // State machine events are very likely to get lost more often because they seem to be sent with a delay. - assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Added } > NUMBER_OF_FLOWS_TO_RUN / 3, "Too many Added state machine events lost.") - assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Removed } > NUMBER_OF_FLOWS_TO_RUN / 3, "Too many Removed state machine events lost.") + assertThat(stateMachineEvents.count { it is StateMachineUpdate.Added }).isGreaterThanOrEqualTo(NUMBER_OF_FLOWS_TO_RUN / 3) + assertThat(stateMachineEvents.count { it is StateMachineUpdate.Removed }).isGreaterThanOrEqualTo(NUMBER_OF_FLOWS_TO_RUN / 3) // Stop the observers. vaultSubscription.unsubscribe()