mirror of
https://github.com/corda/corda.git
synced 2025-04-07 11:27:01 +00:00
CORDA-2743: Tweak RPC reconnecting test. Adjust the exponential retry factor. (#5026)
This commit is contained in:
parent
870c930142
commit
5821ad5f5c
@ -141,24 +141,22 @@ class ReconnectingCordaRPCOps private constructor(
|
||||
) : RPCConnection<CordaRPCOps> {
|
||||
private var currentRPCConnection: CordaRPCConnection? = null
|
||||
|
||||
init {
|
||||
connect()
|
||||
}
|
||||
|
||||
enum class CurrentState {
|
||||
UNCONNECTED, CONNECTED, CONNECTING, CLOSED, DIED
|
||||
}
|
||||
|
||||
private var currentState = CurrentState.UNCONNECTED
|
||||
|
||||
init {
|
||||
current
|
||||
}
|
||||
|
||||
private val current: CordaRPCConnection
|
||||
@Synchronized get() = when (currentState) {
|
||||
CurrentState.UNCONNECTED -> connect()
|
||||
CurrentState.CONNECTED -> currentRPCConnection!!
|
||||
CurrentState.UNCONNECTED, CurrentState.CLOSED -> {
|
||||
connect()
|
||||
currentRPCConnection!!
|
||||
}
|
||||
CurrentState.CONNECTING, CurrentState.DIED -> throw IllegalArgumentException("Illegal state")
|
||||
CurrentState.CLOSED -> throw IllegalArgumentException("The ReconnectingRPCConnection has been closed.")
|
||||
CurrentState.CONNECTING, CurrentState.DIED -> throw IllegalArgumentException("Illegal state: $currentState ")
|
||||
}
|
||||
|
||||
/**
|
||||
@ -173,10 +171,12 @@ class ReconnectingCordaRPCOps private constructor(
|
||||
connect()
|
||||
}
|
||||
|
||||
private fun connect() {
|
||||
@Synchronized
|
||||
private fun connect(): CordaRPCConnection {
|
||||
currentState = CurrentState.CONNECTING
|
||||
currentRPCConnection = establishConnectionWithRetry()
|
||||
currentState = CurrentState.CONNECTED
|
||||
return currentRPCConnection!!
|
||||
}
|
||||
|
||||
private tailrec fun establishConnectionWithRetry(retryInterval: Duration = 1.seconds, nrRetries: Int = 0): CordaRPCConnection {
|
||||
@ -196,7 +196,10 @@ class ReconnectingCordaRPCOps private constructor(
|
||||
is ActiveMQSecurityException -> {
|
||||
// Happens when incorrect credentials provided.
|
||||
// It can happen at startup as well when the credentials are correct.
|
||||
if (nrRetries > 1) throw ex
|
||||
if (nrRetries > 1) {
|
||||
log.error("Failed to login to node.", ex)
|
||||
throw ex
|
||||
}
|
||||
}
|
||||
is RPCException -> {
|
||||
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
||||
@ -211,14 +214,15 @@ class ReconnectingCordaRPCOps private constructor(
|
||||
log.debug { "Exception upon establishing connection: ${ex.message}" }
|
||||
}
|
||||
else -> {
|
||||
log.debug("Unknown exception upon establishing connection.", ex)
|
||||
log.warn("Unknown exception upon establishing connection.", ex)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Could not connect this time round - pause before giving another try.
|
||||
Thread.sleep(retryInterval.toMillis())
|
||||
return establishConnectionWithRetry((retryInterval * 3) / 2, nrRetries + 1)
|
||||
// TODO - make the exponential retry factor configurable.
|
||||
return establishConnectionWithRetry((retryInterval * 10) / 9, nrRetries + 1)
|
||||
}
|
||||
|
||||
override val proxy: CordaRPCOps
|
||||
|
@ -244,7 +244,7 @@ class RpcReconnectTests {
|
||||
// Check that enough vault events were received.
|
||||
// This check is fuzzy because events can go missing during node restarts.
|
||||
// Ideally there should be nrOfFlowsToRun events receive but some might get lost for each restart.
|
||||
assertTrue(vaultEvents!!.size + nrFailures * 2 >= nrOfFlowsToRun, "Not all vault events were received")
|
||||
assertTrue(vaultEvents!!.size + nrFailures * 3 >= nrOfFlowsToRun, "Not all vault events were received")
|
||||
// DOCEND missingVaultEvents
|
||||
|
||||
// Query the vault and check that states were created for all flows.
|
||||
@ -264,8 +264,8 @@ class RpcReconnectTests {
|
||||
|
||||
log.info("SM EVENTS: ${stateMachineEvents!!.size}")
|
||||
// State machine events are very likely to get lost more often because they seem to be sent with a delay.
|
||||
assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Added } > nrOfFlowsToRun / 2, "Too many Added state machine events lost.")
|
||||
assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Removed } > nrOfFlowsToRun / 2, "Too many Removed state machine events lost.")
|
||||
assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Added } > nrOfFlowsToRun / 3, "Too many Added state machine events lost.")
|
||||
assertTrue(stateMachineEvents.count { it is StateMachineUpdate.Removed } > nrOfFlowsToRun / 3, "Too many Removed state machine events lost.")
|
||||
|
||||
// Stop the observers.
|
||||
vaultObserverHandle.stop()
|
||||
|
Loading…
x
Reference in New Issue
Block a user