mirror of
https://github.com/corda/corda.git
synced 2025-06-08 10:21:44 +00:00
Merge pull request #5585 from corda/CORDA-3304-rpc-max-retries
[CORDA-3304] Introduce max number of retries per invocation for recon…
This commit is contained in:
commit
18fbd93268
6
.idea/codeStyles/Project.xml
generated
6
.idea/codeStyles/Project.xml
generated
@ -3,6 +3,10 @@
|
|||||||
<option name="LINE_SEPARATOR" value=" " />
|
<option name="LINE_SEPARATOR" value=" " />
|
||||||
<option name="RIGHT_MARGIN" value="140" />
|
<option name="RIGHT_MARGIN" value="140" />
|
||||||
<option name="SOFT_MARGINS" value="140" />
|
<option name="SOFT_MARGINS" value="140" />
|
||||||
|
<JavaCodeStyleSettings>
|
||||||
|
<option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />
|
||||||
|
<option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />
|
||||||
|
</JavaCodeStyleSettings>
|
||||||
<GroovyCodeStyleSettings>
|
<GroovyCodeStyleSettings>
|
||||||
<option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="100" />
|
<option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="100" />
|
||||||
<option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="100" />
|
<option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="100" />
|
||||||
@ -15,6 +19,8 @@
|
|||||||
<package name="tornadofx" withSubpackages="false" static="false" />
|
<package name="tornadofx" withSubpackages="false" static="false" />
|
||||||
</value>
|
</value>
|
||||||
</option>
|
</option>
|
||||||
|
<option name="NAME_COUNT_TO_USE_STAR_IMPORT" value="2147483647" />
|
||||||
|
<option name="NAME_COUNT_TO_USE_STAR_IMPORT_FOR_MEMBERS" value="2147483647" />
|
||||||
<option name="CONTINUATION_INDENT_IN_PARAMETER_LISTS" value="true" />
|
<option name="CONTINUATION_INDENT_IN_PARAMETER_LISTS" value="true" />
|
||||||
<option name="CONTINUATION_INDENT_IN_ARGUMENT_LISTS" value="true" />
|
<option name="CONTINUATION_INDENT_IN_ARGUMENT_LISTS" value="true" />
|
||||||
<option name="CONTINUATION_INDENT_FOR_EXPRESSION_BODIES" value="true" />
|
<option name="CONTINUATION_INDENT_FOR_EXPRESSION_BODIES" value="true" />
|
||||||
|
1
.idea/codeStyles/codeStyleConfig.xml
generated
1
.idea/codeStyles/codeStyleConfig.xml
generated
@ -1,6 +1,5 @@
|
|||||||
<component name="ProjectCodeStyleConfiguration">
|
<component name="ProjectCodeStyleConfiguration">
|
||||||
<state>
|
<state>
|
||||||
<option name="USE_PER_PROJECT_SETTINGS" value="true" />
|
<option name="USE_PER_PROJECT_SETTINGS" value="true" />
|
||||||
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
|
|
||||||
</state>
|
</state>
|
||||||
</component>
|
</component>
|
@ -282,10 +282,13 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
|
|||||||
*
|
*
|
||||||
* @param onDisconnect implement this callback to perform logic when the RPC disconnects on connection disconnect
|
* @param onDisconnect implement this callback to perform logic when the RPC disconnects on connection disconnect
|
||||||
* @param onReconnect implement this callback to perform logic when the RPC has reconnected after connection disconnect
|
* @param onReconnect implement this callback to perform logic when the RPC has reconnected after connection disconnect
|
||||||
|
* @param maxAttempts the maximum number of attempts per each individual RPC call. A negative number indicates infinite number of retries.
|
||||||
|
* The default value is 5.
|
||||||
*/
|
*/
|
||||||
class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -> Unit = {}) {
|
class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -> Unit = {}, val maxAttempts: Int = 5) {
|
||||||
constructor(onDisconnect: Runnable, onReconnect: Runnable ) :
|
@JvmOverloads
|
||||||
this(onDisconnect = { onDisconnect.run() }, onReconnect = { onReconnect.run() })
|
constructor(onDisconnect: Runnable, onReconnect: Runnable, maxAttempts: Int = 5) :
|
||||||
|
this(onDisconnect = { onDisconnect.run() }, onReconnect = { onReconnect.run() }, maxAttempts = maxAttempts)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -9,6 +9,14 @@ open class RPCException(message: String?, cause: Throwable?) : CordaRuntimeExcep
|
|||||||
constructor(msg: String) : this(msg, null)
|
constructor(msg: String) : this(msg, null)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thrown to indicate an RPC operation has been retried for the [maxNumberOfRetries] unsuccessfully.
|
||||||
|
* @param maxNumberOfRetries the number of retries that had been performed.
|
||||||
|
* @param cause the cause of the last failed attempt.
|
||||||
|
*/
|
||||||
|
class MaxRpcRetryException(maxNumberOfRetries: Int, cause: Throwable?):
|
||||||
|
RPCException("Max number of retries ($maxNumberOfRetries) was reached.", cause)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Signals that the underlying [RPCConnection] dropped.
|
* Signals that the underlying [RPCConnection] dropped.
|
||||||
*/
|
*/
|
||||||
|
@ -55,7 +55,7 @@ class ReconnectingCordaRPCOps private constructor(
|
|||||||
username: String,
|
username: String,
|
||||||
password: String,
|
password: String,
|
||||||
rpcConfiguration: CordaRPCClientConfiguration,
|
rpcConfiguration: CordaRPCClientConfiguration,
|
||||||
gracefulReconnect: GracefulReconnect? = null,
|
gracefulReconnect: GracefulReconnect = GracefulReconnect(),
|
||||||
sslConfiguration: ClientRpcSslOptions? = null,
|
sslConfiguration: ClientRpcSslOptions? = null,
|
||||||
classLoader: ClassLoader? = null,
|
classLoader: ClassLoader? = null,
|
||||||
observersPool: ExecutorService
|
observersPool: ExecutorService
|
||||||
@ -117,7 +117,7 @@ class ReconnectingCordaRPCOps private constructor(
|
|||||||
val rpcConfiguration: CordaRPCClientConfiguration,
|
val rpcConfiguration: CordaRPCClientConfiguration,
|
||||||
val sslConfiguration: ClientRpcSslOptions? = null,
|
val sslConfiguration: ClientRpcSslOptions? = null,
|
||||||
val classLoader: ClassLoader?,
|
val classLoader: ClassLoader?,
|
||||||
val gracefulReconnect: GracefulReconnect? = null,
|
val gracefulReconnect: GracefulReconnect = GracefulReconnect(),
|
||||||
val observersPool: ExecutorService
|
val observersPool: ExecutorService
|
||||||
) : RPCConnection<CordaRPCOps> {
|
) : RPCConnection<CordaRPCOps> {
|
||||||
private var currentRPCConnection: CordaRPCConnection? = null
|
private var currentRPCConnection: CordaRPCConnection? = null
|
||||||
@ -147,13 +147,13 @@ class ReconnectingCordaRPCOps private constructor(
|
|||||||
// First one to get here gets to do all the reconnect logic, including calling onDisconnect and onReconnect. This makes sure
|
// First one to get here gets to do all the reconnect logic, including calling onDisconnect and onReconnect. This makes sure
|
||||||
// that they're only called once per reconnect.
|
// that they're only called once per reconnect.
|
||||||
currentState = DIED
|
currentState = DIED
|
||||||
gracefulReconnect?.onDisconnect?.invoke()
|
gracefulReconnect.onDisconnect.invoke()
|
||||||
//TODO - handle error cases
|
//TODO - handle error cases
|
||||||
log.error("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}")
|
log.error("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}")
|
||||||
log.debug("", e)
|
log.debug("", e)
|
||||||
connect()
|
connect()
|
||||||
previousConnection?.forceClose()
|
previousConnection?.forceClose()
|
||||||
gracefulReconnect?.onReconnect?.invoke()
|
gracefulReconnect.onReconnect.invoke()
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* Called on external error.
|
* Called on external error.
|
||||||
@ -249,9 +249,16 @@ class ReconnectingCordaRPCOps private constructor(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun doInvoke(method: Method, args: Array<out Any>?): Any? {
|
/**
|
||||||
// will stop looping when [method.invoke] succeeds
|
* This method retries the invoked operation in a loop by re-establishing the connection when there is a problem
|
||||||
while (true) {
|
* and checking if the [maxNumberOfAttempts] has been exhausted.
|
||||||
|
*
|
||||||
|
* A negative number for [maxNumberOfAttempts] means an unlimited number of retries will be performed.
|
||||||
|
*/
|
||||||
|
private fun doInvoke(method: Method, args: Array<out Any>?, maxNumberOfAttempts: Int): Any? {
|
||||||
|
var remainingAttempts = maxNumberOfAttempts
|
||||||
|
var lastException: Throwable? = null
|
||||||
|
while (remainingAttempts != 0) {
|
||||||
try {
|
try {
|
||||||
log.debug { "Invoking RPC $method..." }
|
log.debug { "Invoking RPC $method..." }
|
||||||
return method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also {
|
return method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also {
|
||||||
@ -280,15 +287,20 @@ class ReconnectingCordaRPCOps private constructor(
|
|||||||
checkIfIsStartFlow(method, e)
|
checkIfIsStartFlow(method, e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
lastException = e.targetException
|
||||||
|
remainingAttempts--
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
throw MaxRpcRetryException(maxNumberOfAttempts, lastException)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun invoke(proxy: Any, method: Method, args: Array<out Any>?): Any? {
|
override fun invoke(proxy: Any, method: Method, args: Array<out Any>?): Any? {
|
||||||
return when (method.returnType) {
|
return when (method.returnType) {
|
||||||
DataFeed::class.java -> {
|
DataFeed::class.java -> {
|
||||||
// Intercept the data feed methods and return a ReconnectingObservable instance
|
// Intercept the data feed methods and return a ReconnectingObservable instance
|
||||||
val initialFeed: DataFeed<Any, Any?> = uncheckedCast(doInvoke(method, args))
|
val initialFeed: DataFeed<Any, Any?> = uncheckedCast(doInvoke(method, args,
|
||||||
|
reconnectingRPCConnection.gracefulReconnect.maxAttempts))
|
||||||
val observable = ReconnectingObservable(reconnectingRPCConnection, initialFeed) {
|
val observable = ReconnectingObservable(reconnectingRPCConnection, initialFeed) {
|
||||||
// This handles reconnecting and creates new feeds.
|
// This handles reconnecting and creates new feeds.
|
||||||
uncheckedCast(this.invoke(reconnectingRPCConnection.proxy, method, args))
|
uncheckedCast(this.invoke(reconnectingRPCConnection.proxy, method, args))
|
||||||
@ -296,7 +308,7 @@ class ReconnectingCordaRPCOps private constructor(
|
|||||||
initialFeed.copy(updates = observable)
|
initialFeed.copy(updates = observable)
|
||||||
}
|
}
|
||||||
// TODO - add handlers for Observable return types.
|
// TODO - add handlers for Observable return types.
|
||||||
else -> doInvoke(method, args)
|
else -> doInvoke(method, args, reconnectingRPCConnection.gracefulReconnect.maxAttempts)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -371,13 +371,19 @@ More specifically, the behaviour in the second case is a bit more subtle:
|
|||||||
|
|
||||||
.. warning:: In this approach, some events might be lost during a reconnection and not sent from the subscribed ``Observable``\s.
|
.. warning:: In this approach, some events might be lost during a reconnection and not sent from the subscribed ``Observable``\s.
|
||||||
|
|
||||||
You can enable this graceful form of reconnection by using the ``gracefulReconnect`` parameter in the following way:
|
You can enable this graceful form of reconnection by using the ``gracefulReconnect`` parameter, which is an object containing 3 optional fields:
|
||||||
|
|
||||||
|
* ``onDisconnect``: A callback handler that will be invoked every time the connection is disconnected.
|
||||||
|
* ``onReconnect``: A callback handler that will be invoked every time the connection is established again after a disconnection.
|
||||||
|
* ``maxAttempts``: The maximum number of attempts that will be performed per RPC operation. A negative value implies infinite retries. The default value is 5.
|
||||||
|
|
||||||
|
This can be used in the following way:
|
||||||
|
|
||||||
.. container:: codeset
|
.. container:: codeset
|
||||||
|
|
||||||
.. sourcecode:: kotlin
|
.. sourcecode:: kotlin
|
||||||
|
|
||||||
val gracefulReconnect = GracefulReconnect(onDisconnect={/*insert disconnect handling*/}, onReconnect{/*insert reconnect handling*/})
|
val gracefulReconnect = GracefulReconnect(onDisconnect={/*insert disconnect handling*/}, onReconnect{/*insert reconnect handling*/}, maxAttempts = 3)
|
||||||
val cordaClient = CordaRPCClient(nodeRpcAddress)
|
val cordaClient = CordaRPCClient(nodeRpcAddress)
|
||||||
val cordaRpcOps = cordaClient.start(rpcUserName, rpcUserPassword, gracefulReconnect = gracefulReconnect).proxy
|
val cordaRpcOps = cordaClient.start(rpcUserName, rpcUserPassword, gracefulReconnect = gracefulReconnect).proxy
|
||||||
|
|
||||||
@ -392,7 +398,7 @@ You can enable this graceful form of reconnection by using the ``gracefulReconne
|
|||||||
}
|
}
|
||||||
|
|
||||||
void method() {
|
void method() {
|
||||||
GracefulReconnect gracefulReconnect = new GracefulReconnect(this::onDisconnect, this::onReconnect);
|
GracefulReconnect gracefulReconnect = new GracefulReconnect(this::onDisconnect, this::onReconnect, 3);
|
||||||
CordaRPCClient cordaClient = new CordaRPCClient(nodeRpcAddress);
|
CordaRPCClient cordaClient = new CordaRPCClient(nodeRpcAddress);
|
||||||
CordaRPCConnection cordaRpcOps = cordaClient.start(rpcUserName, rpcUserPassword, gracefulReconnect);
|
CordaRPCConnection cordaRpcOps = cordaClient.start(rpcUserName, rpcUserPassword, gracefulReconnect);
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user