[CORDA-3098] - Close previous connection after reconnection (#5339)

This commit is contained in:
Dimos Raptis 2019-08-28 16:44:57 +01:00 committed by Shams Asari
parent 20585266a5
commit b97062bacc
5 changed files with 58 additions and 26 deletions

View File

@ -69,7 +69,6 @@ class RPCStabilityTests {
} }
@Test @Test
@Ignore("Ignored as it became increasingly flaky. CORDA-3098")
fun `client and server dont leak threads`() { fun `client and server dont leak threads`() {
fun startAndStop() { fun startAndStop() {
rpcDriver { rpcDriver {
@ -92,17 +91,16 @@ class RPCStabilityTests {
block() block()
} }
val threadsAfter = waitUntilNumberOfThreadsStable(executor) val threadsAfter = waitUntilNumberOfThreadsStable(executor)
// This is a less than check because threads from other tests may be shutting down while this test is running. val newThreads = threadsAfter.keys.minus(threadsBefore.keys)
// This is therefore a "best effort" check. When this test is run on its own this should be a strict equality. require(newThreads.isEmpty()) {
// In case of failure we output the threads along with their stacktraces to get an idea what was running at a time. "Threads have leaked. New threads created: $newThreads (total before: ${threadsBefore.size}, total after: ${threadsAfter.size})"
require(threadsBefore.keys.size >= threadsAfter.keys.size) { "threadsBefore: $threadsBefore\nthreadsAfter: $threadsAfter" } }
} finally { } finally {
executor.shutdownNow() executor.shutdownNow()
} }
} }
@Test @Test
@Ignore("Ignored as it became increasingly flaky. CORDA-3098")
fun `client doesnt leak threads when it fails to start`() { fun `client doesnt leak threads when it fails to start`() {
fun startAndStop() { fun startAndStop() {
rpcDriver { rpcDriver {

View File

@ -407,7 +407,24 @@ class RPCClientProxyHandler(
} }
reaperScheduledFuture?.cancel(false) reaperScheduledFuture?.cancel(false)
val observablesMap = observableContext.observableMap.asMap()
observablesMap.keys.forEach { key ->
observationExecutorPool.run(key) {
try {
observablesMap[key]?.onError(ConnectionFailureException())
} catch (e: Exception) {
log.error("Unexpected exception when RPC connection failure handling", e)
}
}
}
observableContext.observableMap.invalidateAll() observableContext.observableMap.invalidateAll()
rpcReplyMap.forEach { _, replyFuture ->
replyFuture.setException(ConnectionFailureException())
}
rpcReplyMap.clear()
callSiteMap?.clear()
reapObservables(notify) reapObservables(notify)
reaperExecutor?.shutdownNow() reaperExecutor?.shutdownNow()
sendExecutor?.shutdownNow() sendExecutor?.shutdownNow()

View File

@ -143,11 +143,13 @@ class ReconnectingCordaRPCOps private constructor(
*/ */
@Synchronized @Synchronized
fun reconnectOnError(e: Throwable) { fun reconnectOnError(e: Throwable) {
val previousConnection = currentRPCConnection
currentState = CurrentState.DIED currentState = CurrentState.DIED
//TODO - handle error cases //TODO - handle error cases
log.error("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}") log.error("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}")
log.debug("", e) log.debug("", e)
connect() connect()
previousConnection?.forceClose()
} }
@Synchronized @Synchronized
private fun connect(): CordaRPCConnection { private fun connect(): CordaRPCConnection {
@ -156,18 +158,20 @@ class ReconnectingCordaRPCOps private constructor(
currentState = CurrentState.CONNECTED currentState = CurrentState.CONNECTED
return currentRPCConnection!! return currentRPCConnection!!
} }
private tailrec fun establishConnectionWithRetry(retryInterval: Duration = 1.seconds, currentAuthenticationRetries: Int = 0): CordaRPCConnection {
private tailrec fun establishConnectionWithRetry(retryInterval: Duration = 1.seconds, currentAuthenticationRetries: Int = 0, roundRobinIndex: Int = 0): CordaRPCConnection {
var _currentAuthenticationRetries = currentAuthenticationRetries var _currentAuthenticationRetries = currentAuthenticationRetries
log.info("Connecting to: $nodeHostAndPorts") val attemptedAddress = nodeHostAndPorts[roundRobinIndex]
log.info("Connecting to: $attemptedAddress")
try { try {
return CordaRPCClient( return CordaRPCClient(
nodeHostAndPorts, CordaRPCClientConfiguration(connectionMaxRetryInterval = retryInterval), sslConfiguration, classLoader attemptedAddress, CordaRPCClientConfiguration(connectionMaxRetryInterval = retryInterval, maxReconnectAttempts = 1), sslConfiguration, classLoader
).start(username, password).also { ).start(username, password).also {
// Check connection is truly operational before returning it. // Check connection is truly operational before returning it.
require(it.proxy.nodeInfo().legalIdentitiesAndCerts.isNotEmpty()) { require(it.proxy.nodeInfo().legalIdentitiesAndCerts.isNotEmpty()) {
"Could not establish connection to $nodeHostAndPorts." "Could not establish connection to $attemptedAddress."
} }
log.debug { "Connection successfully established with: $nodeHostAndPorts" } log.debug { "Connection successfully established with: $attemptedAddress" }
} }
} catch (ex: Exception) { } catch (ex: Exception) {
when (ex) { when (ex) {
@ -199,7 +203,8 @@ class ReconnectingCordaRPCOps private constructor(
// Could not connect this time round - pause before giving another try. // Could not connect this time round - pause before giving another try.
Thread.sleep(retryInterval.toMillis()) Thread.sleep(retryInterval.toMillis())
// TODO - make the exponential retry factor configurable. // TODO - make the exponential retry factor configurable.
return establishConnectionWithRetry((retryInterval * 10) / 9, _currentAuthenticationRetries) val nextRoundRobinIndex = (roundRobinIndex + 1) % nodeHostAndPorts.size
return establishConnectionWithRetry((retryInterval * 10) / 9, _currentAuthenticationRetries, nextRoundRobinIndex)
} }
override val proxy: CordaRPCOps override val proxy: CordaRPCOps
get() = current.proxy get() = current.proxy

View File

@ -49,7 +49,9 @@ class ReconnectingObservable<T> private constructor(subscriber: ReconnectingSubs
if (unsubscribed) return if (unsubscribed) return
val subscriber = checkNotNull(this.subscriber.get()) val subscriber = checkNotNull(this.subscriber.get())
try { try {
val previousSubscription = backingSubscription
backingSubscription = dataFeed.updates.subscribe(subscriber::onNext, ::scheduleResubscribe, subscriber::onCompleted) backingSubscription = dataFeed.updates.subscribe(subscriber::onNext, ::scheduleResubscribe, subscriber::onCompleted)
previousSubscription?.unsubscribe()
} catch (e: Exception) { } catch (e: Exception) {
scheduleResubscribe(e) scheduleResubscribe(e)
} }

View File

@ -1,6 +1,9 @@
package net.corda.node.services.rpc package net.corda.node.services.rpc
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.client.rpc.notUsed
import net.corda.core.contracts.Amount import net.corda.core.contracts.Amount
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.concurrent.transpose import net.corda.core.internal.concurrent.transpose
@ -9,10 +12,7 @@ import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.PageSpecification import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.builder import net.corda.core.node.services.vault.builder
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.*
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.finance.contracts.asset.Cash import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.CashIssueAndPaymentFlow import net.corda.finance.flows.CashIssueAndPaymentFlow
import net.corda.finance.schemas.CashSchemaV1 import net.corda.finance.schemas.CashSchemaV1
@ -28,9 +28,11 @@ import net.corda.testing.driver.internal.OutOfProcessImpl
import net.corda.testing.driver.internal.incrementalPortAllocation import net.corda.testing.driver.internal.incrementalPortAllocation
import net.corda.testing.node.User import net.corda.testing.node.User
import net.corda.testing.node.internal.FINANCE_CORDAPPS import net.corda.testing.node.internal.FINANCE_CORDAPPS
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test import org.junit.Test
import java.util.* import java.util.*
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread import kotlin.concurrent.thread
import kotlin.math.absoluteValue import kotlin.math.absoluteValue
@ -98,7 +100,7 @@ class RpcReconnectTests {
fun startBankA(address: NetworkHostAndPort) = startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("rpcSettings.address" to address.toString())) fun startBankA(address: NetworkHostAndPort) = startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("rpcSettings.address" to address.toString()))
fun startProxy(addressPair: AddressPair) = RandomFailingProxy(serverPort = addressPair.proxyAddress.port, remotePort = addressPair.nodeAddress.port).start() fun startProxy(addressPair: AddressPair) = RandomFailingProxy(serverPort = addressPair.proxyAddress.port, remotePort = addressPair.nodeAddress.port).start()
val addresses = (1..3).map { getRandomAddressPair() } val addresses = (1..2).map { getRandomAddressPair() }
currentAddressPair = addresses[0] currentAddressPair = addresses[0]
proxy = startProxy(currentAddressPair) proxy = startProxy(currentAddressPair)
@ -114,7 +116,8 @@ class RpcReconnectTests {
val addressesForRpc = addresses.map { it.proxyAddress } val addressesForRpc = addresses.map { it.proxyAddress }
// DOCSTART rpcReconnectingRPC // DOCSTART rpcReconnectingRPC
val bankAReconnectingRpc = ReconnectingCordaRPCOps(addressesForRpc, demoUser.username, demoUser.password) val client = CordaRPCClient(addressesForRpc)
val bankAReconnectingRpc = client.start(demoUser.username, demoUser.password, gracefulReconnect = true).proxy as ReconnectingCordaRPCOps
// DOCEND rpcReconnectingRPC // DOCEND rpcReconnectingRPC
// Observe the vault and collect the observations. // Observe the vault and collect the observations.
@ -186,7 +189,7 @@ class RpcReconnectTests {
log.info("Performing failover to a different node") log.info("Performing failover to a different node")
node.stop() node.stop()
proxy.stop() proxy.stop()
currentAddressPair = addresses[Random().nextInt(addresses.size)] currentAddressPair = (addresses - currentAddressPair).first()
node = startBankA(currentAddressPair.nodeAddress).get() node = startBankA(currentAddressPair.nodeAddress).get()
proxy = startProxy(currentAddressPair) proxy = startProxy(currentAddressPair)
} }
@ -214,6 +217,8 @@ class RpcReconnectTests {
log.info("Started flow $amount with flowId: $flowId") log.info("Started flow $amount with flowId: $flowId")
flowProgressEvents.addEvent(flowId, null) flowProgressEvents.addEvent(flowId, null)
flowHandle.stepsTreeFeed?.updates?.notUsed()
flowHandle.stepsTreeIndexFeed?.updates?.notUsed()
// No reconnecting possible. // No reconnecting possible.
flowHandle.progress.subscribe( flowHandle.progress.subscribe(
{ prog -> { prog ->
@ -246,9 +251,14 @@ class RpcReconnectTests {
log.info("Started all flows") log.info("Started all flows")
// Wait until all flows have been started. // Wait until all flows have been started.
flowsCountdownLatch.await() val flowsConfirmed = flowsCountdownLatch.await(10, TimeUnit.MINUTES)
if (flowsConfirmed) {
log.info("Confirmed all flows have started.")
} else {
log.info("Timed out waiting for confirmation that all flows have started. Remaining flows: ${flowsCountdownLatch.count}")
}
log.info("Confirmed all flows.")
// Wait for all events to come in and flows to finish. // Wait for all events to come in and flows to finish.
Thread.sleep(4000) Thread.sleep(4000)
@ -272,7 +282,7 @@ class RpcReconnectTests {
val allCash = allCashStates.map { it.state.data.amount.quantity }.toSet() val allCash = allCashStates.map { it.state.data.amount.quantity }.toSet()
val missingCash = (1..NUMBER_OF_FLOWS_TO_RUN).filterNot { allCash.contains(it.toLong() * 100) } val missingCash = (1..NUMBER_OF_FLOWS_TO_RUN).filterNot { allCash.contains(it.toLong() * 100) }
log.info("MISSING: $missingCash") log.info("Missing cash states: $missingCash")
assertEquals(NUMBER_OF_FLOWS_TO_RUN, allCashStates.size, "Not all flows were executed successfully") assertEquals(NUMBER_OF_FLOWS_TO_RUN, allCashStates.size, "Not all flows were executed successfully")
@ -284,17 +294,17 @@ class RpcReconnectTests {
// Check that enough vault events were received. // Check that enough vault events were received.
// This check is fuzzy because events can go missing during node restarts. // This check is fuzzy because events can go missing during node restarts.
// Ideally there should be nrOfFlowsToRun events receive but some might get lost for each restart. // Ideally there should be nrOfFlowsToRun events receive but some might get lost for each restart.
assertTrue(vaultEvents!!.size + nrFailures * 3 >= NUMBER_OF_FLOWS_TO_RUN, "Not all vault events were received") assertThat(vaultEvents!!.size + nrFailures * 3).isGreaterThanOrEqualTo(NUMBER_OF_FLOWS_TO_RUN)
// DOCEND missingVaultEvents // DOCEND missingVaultEvents
// Check that no flow was triggered twice. // Check that no flow was triggered twice.
val duplicates = allCashStates.groupBy { it.state.data.amount }.filterValues { it.size > 1 } val duplicates = allCashStates.groupBy { it.state.data.amount }.filterValues { it.size > 1 }
assertTrue(duplicates.isEmpty(), "${duplicates.size} flows were retried illegally.") assertTrue(duplicates.isEmpty(), "${duplicates.size} flows were retried illegally.")
log.info("SM EVENTS: ${stateMachineEvents!!.size}") log.info("State machine events seen: ${stateMachineEvents!!.size}")
// State machine events are very likely to get lost more often because they seem to be sent with a delay. // State machine events are very likely to get lost more often because they seem to be sent with a delay.
assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Added } > NUMBER_OF_FLOWS_TO_RUN / 3, "Too many Added state machine events lost.") assertThat(stateMachineEvents.count { it is StateMachineUpdate.Added }).isGreaterThanOrEqualTo(NUMBER_OF_FLOWS_TO_RUN / 3)
assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Removed } > NUMBER_OF_FLOWS_TO_RUN / 3, "Too many Removed state machine events lost.") assertThat(stateMachineEvents.count { it is StateMachineUpdate.Removed }).isGreaterThanOrEqualTo(NUMBER_OF_FLOWS_TO_RUN / 3)
// Stop the observers. // Stop the observers.
vaultSubscription.unsubscribe() vaultSubscription.unsubscribe()