mirror of
https://github.com/corda/corda.git
synced 2025-01-29 15:43:55 +00:00
CORDA-3542: Use the config values for reconnecting retry interval and max reconnect attempts (#5869)
This commit is contained in:
parent
3a34b0d087
commit
d16e1126db
@ -7,7 +7,6 @@ import net.corda.client.rpc.GracefulReconnect
|
||||
import net.corda.client.rpc.MaxRpcRetryException
|
||||
import net.corda.client.rpc.RPCException
|
||||
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
|
||||
import net.corda.core.internal.concurrent.doneFuture
|
||||
import net.corda.core.messaging.startTrackedFlow
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
@ -26,7 +25,6 @@ import net.corda.testing.node.internal.FINANCE_CORDAPPS
|
||||
import net.corda.testing.node.internal.rpcDriver
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.ClassRule
|
||||
import org.junit.Test
|
||||
import java.lang.Thread.sleep
|
||||
import java.time.Duration
|
||||
@ -41,6 +39,10 @@ class CordaRPCClientReconnectionTest {
|
||||
private val portAllocator = incrementalPortAllocation()
|
||||
|
||||
private val gracefulReconnect = GracefulReconnect()
|
||||
private val config = CordaRPCClientConfiguration.DEFAULT.copy(
|
||||
connectionRetryInterval = Duration.ofSeconds(1),
|
||||
connectionRetryIntervalMultiplier = 1.0
|
||||
)
|
||||
|
||||
companion object {
|
||||
val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
|
||||
@ -61,7 +63,7 @@ class CordaRPCClientReconnectionTest {
|
||||
}
|
||||
|
||||
val node = startNode()
|
||||
val client = CordaRPCClient(node.rpcAddress)
|
||||
val client = CordaRPCClient(node.rpcAddress, config)
|
||||
|
||||
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
|
||||
val rpcOps = it.proxy as ReconnectingCordaRPCOps
|
||||
@ -99,7 +101,7 @@ class CordaRPCClientReconnectionTest {
|
||||
}
|
||||
|
||||
val node = startNode()
|
||||
val client = CordaRPCClient(node.rpcAddress)
|
||||
val client = CordaRPCClient(node.rpcAddress, config)
|
||||
|
||||
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
|
||||
val rpcOps = it.proxy as ReconnectingCordaRPCOps
|
||||
@ -138,7 +140,7 @@ class CordaRPCClientReconnectionTest {
|
||||
val addresses = listOf(NetworkHostAndPort("localhost", portAllocator.nextPort()), NetworkHostAndPort("localhost", portAllocator.nextPort()))
|
||||
|
||||
val node = startNode(addresses[0])
|
||||
val client = CordaRPCClient(addresses)
|
||||
val client = CordaRPCClient(addresses, config)
|
||||
|
||||
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
|
||||
val rpcOps = it.proxy as ReconnectingCordaRPCOps
|
||||
@ -175,7 +177,7 @@ class CordaRPCClientReconnectionTest {
|
||||
}
|
||||
|
||||
val node = startNode()
|
||||
val client = CordaRPCClient(node.rpcAddress)
|
||||
val client = CordaRPCClient(node.rpcAddress, config)
|
||||
|
||||
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = GracefulReconnect(maxAttempts = 1))).use {
|
||||
val rpcOps = it.proxy as ReconnectingCordaRPCOps
|
||||
@ -189,11 +191,11 @@ class CordaRPCClientReconnectionTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 60_000)
|
||||
fun `establishing an RPC connection fails if there is no node listening to the specified address`() {
|
||||
rpcDriver {
|
||||
assertThatThrownBy {
|
||||
CordaRPCClient(NetworkHostAndPort("localhost", portAllocator.nextPort()))
|
||||
CordaRPCClient(NetworkHostAndPort("localhost", portAllocator.nextPort()), config)
|
||||
.start(rpcUser.username, rpcUser.password, GracefulReconnect())
|
||||
}.isInstanceOf(RPCException::class.java)
|
||||
.hasMessage("Cannot connect to server(s). Tried with all available servers.")
|
||||
@ -213,7 +215,7 @@ class CordaRPCClientReconnectionTest {
|
||||
}
|
||||
|
||||
val node = startNode()
|
||||
CordaRPCClient(node.rpcAddress).start(rpcUser.username, rpcUser.password, gracefulReconnect).use {
|
||||
CordaRPCClient(node.rpcAddress, config).start(rpcUser.username, rpcUser.password, gracefulReconnect).use {
|
||||
node.stop()
|
||||
thread() {
|
||||
it.proxy.startTrackedFlow(
|
||||
@ -230,4 +232,23 @@ class CordaRPCClientReconnectionTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `RPC connection stops reconnecting after config number of retries`() {
|
||||
driver(DriverParameters(cordappsForAllNodes = emptyList())) {
|
||||
val address = NetworkHostAndPort("localhost", portAllocator.nextPort())
|
||||
val conf = config.copy(maxReconnectAttempts = 2)
|
||||
fun startNode(): NodeHandle = startNode(
|
||||
providedName = CHARLIE_NAME,
|
||||
rpcUsers = listOf(CordaRPCClientTest.rpcUser),
|
||||
customOverrides = mapOf("rpcSettings.address" to address.toString())
|
||||
).getOrThrow()
|
||||
|
||||
val node = startNode()
|
||||
val connection = CordaRPCClient(node.rpcAddress, conf).start(rpcUser.username, rpcUser.password, gracefulReconnect)
|
||||
node.stop()
|
||||
// After two tries we throw RPCException
|
||||
assertThatThrownBy { connection.proxy.isWaitingForShutdown() }
|
||||
.isInstanceOf(RPCException::class.java)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -285,8 +285,8 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
|
||||
*
|
||||
* @param onDisconnect implement this callback to perform logic when the RPC disconnects on connection disconnect
|
||||
* @param onReconnect implement this callback to perform logic when the RPC has reconnected after connection disconnect
|
||||
* @param maxAttempts the maximum number of attempts per each individual RPC call. A negative number indicates infinite number of retries.
|
||||
* The default value is 5.
|
||||
* @param maxAttempts the maximum number of attempts per each individual RPC call. A negative number indicates infinite
|
||||
* number of retries. The default value is 5.
|
||||
*/
|
||||
class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -> Unit = {}, val maxAttempts: Int = 5) {
|
||||
@Suppress("unused") // constructor for java
|
||||
|
@ -1,6 +1,7 @@
|
||||
package net.corda.client.rpc
|
||||
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import java.lang.reflect.Method
|
||||
|
||||
/**
|
||||
* Thrown to indicate a fatal error in the RPC system itself, as opposed to an error generated by the invoked method.
|
||||
@ -19,8 +20,8 @@ open class UnrecoverableRPCException(message: String?, cause: Throwable? = null)
|
||||
* @param maxNumberOfRetries the number of retries that had been performed.
|
||||
* @param cause the cause of the last failed attempt.
|
||||
*/
|
||||
class MaxRpcRetryException(maxNumberOfRetries: Int, cause: Throwable?):
|
||||
RPCException("Max number of retries ($maxNumberOfRetries) was reached.", cause)
|
||||
class MaxRpcRetryException(maxNumberOfRetries: Int, method: Method, cause: Throwable?):
|
||||
RPCException("Max number of retries ($maxNumberOfRetries) for this RPC operation (${method.name}) was reached.", cause)
|
||||
|
||||
/**
|
||||
* Signals that the underlying [RPCConnection] dropped.
|
||||
|
@ -16,7 +16,6 @@ import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConn
|
||||
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.UNCONNECTED
|
||||
import net.corda.client.rpc.reconnect.CouldNotStartFlowException
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.core.internal.times
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
@ -154,7 +153,7 @@ class ReconnectingCordaRPCOps private constructor(
|
||||
@Synchronized get() = when (currentState) {
|
||||
// The first attempt to establish a connection will try every address only once.
|
||||
UNCONNECTED ->
|
||||
connect(infiniteRetries = false) ?: throw IllegalArgumentException("The ReconnectingRPCConnection has been closed.")
|
||||
connect(nodeHostAndPorts.size) ?: throw IllegalArgumentException("The ReconnectingRPCConnection has been closed.")
|
||||
CONNECTED ->
|
||||
currentRPCConnection!!
|
||||
CLOSED ->
|
||||
@ -180,7 +179,7 @@ class ReconnectingCordaRPCOps private constructor(
|
||||
//TODO - handle error cases
|
||||
log.warn("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}")
|
||||
log.debug("", e)
|
||||
connect(infiniteRetries = true)
|
||||
connect(rpcConfiguration.maxReconnectAttempts)
|
||||
previousConnection?.forceClose()
|
||||
gracefulReconnect.onReconnect.invoke()
|
||||
}
|
||||
@ -192,14 +191,13 @@ class ReconnectingCordaRPCOps private constructor(
|
||||
val previousConnection = currentRPCConnection
|
||||
doReconnect(e, previousConnection)
|
||||
}
|
||||
private fun connect(infiniteRetries: Boolean): CordaRPCConnection? {
|
||||
private fun connect(maxConnectAttempts: Int): CordaRPCConnection? {
|
||||
currentState = CONNECTING
|
||||
synchronized(this) {
|
||||
currentRPCConnection = if (infiniteRetries) {
|
||||
establishConnectionWithRetry(rpcConfiguration.connectionRetryInterval)
|
||||
} else {
|
||||
establishConnectionWithRetry(rpcConfiguration.connectionRetryInterval, retries = nodeHostAndPorts.size)
|
||||
}
|
||||
currentRPCConnection = establishConnectionWithRetry(
|
||||
rpcConfiguration.connectionRetryInterval,
|
||||
retries = maxConnectAttempts
|
||||
)
|
||||
// It's possible we could get closed while waiting for the connection to establish.
|
||||
if (!isClosed()) {
|
||||
currentState = CONNECTED
|
||||
@ -257,17 +255,17 @@ class ReconnectingCordaRPCOps private constructor(
|
||||
log.warn("Unknown exception [${ex.javaClass.name}] upon establishing connection.", ex)
|
||||
}
|
||||
}
|
||||
|
||||
if (retries == 0) {
|
||||
throw RPCException("Cannot connect to server(s). Tried with all available servers.", ex)
|
||||
}
|
||||
}
|
||||
val remainingRetries = if (retries < 0) retries else (retries - 1)
|
||||
if (remainingRetries == 0) {
|
||||
throw RPCException("Cannot connect to server(s). Tried with all available servers.")
|
||||
}
|
||||
// Could not connect this time round - pause before giving another try.
|
||||
Thread.sleep(retryInterval.toMillis())
|
||||
// TODO - make the exponential retry factor configurable.
|
||||
val nextRoundRobinIndex = (roundRobinIndex + 1) % nodeHostAndPorts.size
|
||||
val remainingRetries = if (retries < 0) retries else (retries - 1)
|
||||
return establishConnectionWithRetry((retryInterval * 10) / 9, nextRoundRobinIndex, remainingRetries)
|
||||
val nextInterval = retryInterval * rpcConfiguration.connectionRetryIntervalMultiplier
|
||||
return establishConnectionWithRetry(nextInterval, nextRoundRobinIndex, remainingRetries)
|
||||
}
|
||||
override val proxy: CordaRPCOps
|
||||
get() = current.proxy
|
||||
@ -346,7 +344,7 @@ class ReconnectingCordaRPCOps private constructor(
|
||||
}
|
||||
}
|
||||
|
||||
throw MaxRpcRetryException(maxNumberOfAttempts, lastException)
|
||||
throw MaxRpcRetryException(maxNumberOfAttempts, method, lastException)
|
||||
}
|
||||
|
||||
private fun checkIfClosed() {
|
||||
|
@ -55,6 +55,7 @@ import java.util.zip.Deflater
|
||||
import java.util.zip.ZipEntry
|
||||
import java.util.zip.ZipOutputStream
|
||||
import kotlin.collections.LinkedHashSet
|
||||
import kotlin.math.roundToLong
|
||||
import kotlin.reflect.KClass
|
||||
import kotlin.reflect.full.createInstance
|
||||
|
||||
@ -75,6 +76,7 @@ infix fun Temporal.until(endExclusive: Temporal): Duration = Duration.between(th
|
||||
|
||||
operator fun Duration.div(divider: Long): Duration = dividedBy(divider)
|
||||
operator fun Duration.times(multiplicand: Long): Duration = multipliedBy(multiplicand)
|
||||
operator fun Duration.times(multiplicand: Double): Duration = Duration.ofNanos((toNanos() * multiplicand).roundToLong())
|
||||
|
||||
/**
|
||||
* Returns the single element matching the given [predicate], or `null` if the collection is empty, or throws exception
|
||||
|
@ -362,6 +362,7 @@ A more graceful form of reconnection is also available. This will:
|
||||
|
||||
- reconnect any existing ``Observable``\s after a reconnection, so that they keep emitting events to the existing subscriptions.
|
||||
- block any RPC calls that arrive during a reconnection or any RPC calls that were not acknowledged at the point of reconnection and will execute them after the connection is re-established.
|
||||
- by default continue retrying indefinitely until the connection is re-established. See ``CordaRPCClientConfiguration.maxReconnectAttempts`` for adjusting the number of retries.
|
||||
|
||||
More specifically, the behaviour in the second case is a bit more subtle:
|
||||
|
||||
@ -377,7 +378,7 @@ You can enable this graceful form of reconnection by using the ``gracefulReconne
|
||||
|
||||
* ``onDisconnect``: A callback handler that will be invoked every time the connection is disconnected.
|
||||
* ``onReconnect``: A callback handler that will be invoked every time the connection is established again after a disconnection.
|
||||
* ``maxAttempts``: The maximum number of attempts that will be performed per RPC operation. A negative value implies infinite retries. The default value is 5.
|
||||
* ``maxAttempts``: The maximum number of attempts that will be performed per *RPC operation*. A negative value implies infinite retries. The default value is 5.
|
||||
|
||||
This can be used in the following way:
|
||||
|
||||
|
@ -134,7 +134,10 @@ class RpcReconnectTests {
|
||||
Unit
|
||||
}
|
||||
val reconnect = GracefulReconnect(onDisconnect = { numDisconnects++ }, onReconnect = onReconnect)
|
||||
val config = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryInterval = 1.seconds)
|
||||
val config = CordaRPCClientConfiguration.DEFAULT.copy(
|
||||
connectionRetryInterval = 1.seconds,
|
||||
connectionRetryIntervalMultiplier = 1.0
|
||||
)
|
||||
val client = CordaRPCClient(addressesForRpc, configuration = config)
|
||||
val bankAReconnectingRPCConnection = client.start(demoUser.username, demoUser.password, gracefulReconnect = reconnect)
|
||||
val bankAReconnectingRpc = bankAReconnectingRPCConnection.proxy as ReconnectingCordaRPCOps
|
||||
|
@ -2,6 +2,7 @@ package net.corda.node.flows
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.client.rpc.CordaRPCClientConfiguration
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.flows.*
|
||||
@ -42,6 +43,8 @@ import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertNotNull
|
||||
|
||||
class FlowRetryTest {
|
||||
val config = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryIntervalMultiplier = 1.1)
|
||||
|
||||
@Before
|
||||
fun resetCounters() {
|
||||
InitiatorFlow.seen.clear()
|
||||
@ -69,7 +72,7 @@ class FlowRetryTest {
|
||||
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
|
||||
val result = CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
|
||||
val result = CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
|
||||
it.proxy.startFlow(::InitiatorFlow, numSessions, numIterations, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow()
|
||||
}
|
||||
result
|
||||
@ -87,7 +90,7 @@ class FlowRetryTest {
|
||||
)) {
|
||||
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
|
||||
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
|
||||
CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
|
||||
it.proxy.startFlow(::AsyncRetryFlow).returnValue.getOrThrow()
|
||||
}
|
||||
}
|
||||
@ -103,7 +106,7 @@ class FlowRetryTest {
|
||||
)) {
|
||||
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
|
||||
val result = CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
|
||||
val result = CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
|
||||
it.proxy.startFlow(::RetryFlow).returnValue.getOrThrow()
|
||||
}
|
||||
result
|
||||
@ -121,7 +124,7 @@ class FlowRetryTest {
|
||||
)) {
|
||||
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
|
||||
val result = CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
|
||||
val result = CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
|
||||
it.proxy.startFlow(::ThrowingFlow).returnValue.getOrThrow()
|
||||
}
|
||||
result
|
||||
@ -136,7 +139,7 @@ class FlowRetryTest {
|
||||
|
||||
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
|
||||
CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
|
||||
assertFailsWith<TimeoutException> {
|
||||
it.proxy.startFlow(::TransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity())
|
||||
.returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
|
||||
@ -155,7 +158,7 @@ class FlowRetryTest {
|
||||
|
||||
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
|
||||
CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
|
||||
assertFailsWith<TimeoutException> {
|
||||
it.proxy.startFlow(::WrappedTransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity())
|
||||
.returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
|
||||
@ -175,7 +178,7 @@ class FlowRetryTest {
|
||||
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
|
||||
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
|
||||
CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
|
||||
assertFailsWith<CordaRuntimeException> {
|
||||
it.proxy.startFlow(::GeneralExternalFailureFlow, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow()
|
||||
}
|
||||
@ -193,7 +196,7 @@ class FlowRetryTest {
|
||||
|
||||
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
|
||||
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
|
||||
CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
|
||||
assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy {
|
||||
it.proxy.startFlow(::AsyncRetryFlow).returnValue.getOrThrow()
|
||||
}.withMessageStartingWith("User not authorized to perform RPC call")
|
||||
|
Loading…
x
Reference in New Issue
Block a user