[CORDA-3304] Introduce max number of retries per invocation for reconnecting rpc

This commit is contained in:
Dimos Raptis 2019-10-14 11:11:42 +01:00
parent c18c3aed95
commit a7884c53aa
5 changed files with 39 additions and 13 deletions

View File

@ -3,6 +3,10 @@
<option name="LINE_SEPARATOR" value="&#10;" /> <option name="LINE_SEPARATOR" value="&#10;" />
<option name="RIGHT_MARGIN" value="140" /> <option name="RIGHT_MARGIN" value="140" />
<option name="SOFT_MARGINS" value="140" /> <option name="SOFT_MARGINS" value="140" />
<JavaCodeStyleSettings>
<option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />
<option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />
</JavaCodeStyleSettings>
<JetCodeStyleSettings> <JetCodeStyleSettings>
<option name="PACKAGES_TO_USE_STAR_IMPORTS"> <option name="PACKAGES_TO_USE_STAR_IMPORTS">
<value> <value>
@ -11,6 +15,8 @@
<package name="tornadofx" withSubpackages="false" static="false" /> <package name="tornadofx" withSubpackages="false" static="false" />
</value> </value>
</option> </option>
<option name="NAME_COUNT_TO_USE_STAR_IMPORT" value="2147483647" />
<option name="NAME_COUNT_TO_USE_STAR_IMPORT_FOR_MEMBERS" value="2147483647" />
<option name="CONTINUATION_INDENT_IN_PARAMETER_LISTS" value="true" /> <option name="CONTINUATION_INDENT_IN_PARAMETER_LISTS" value="true" />
<option name="CONTINUATION_INDENT_IN_ARGUMENT_LISTS" value="true" /> <option name="CONTINUATION_INDENT_IN_ARGUMENT_LISTS" value="true" />
<option name="CONTINUATION_INDENT_FOR_EXPRESSION_BODIES" value="true" /> <option name="CONTINUATION_INDENT_FOR_EXPRESSION_BODIES" value="true" />

View File

@ -1,6 +1,5 @@
<component name="ProjectCodeStyleConfiguration"> <component name="ProjectCodeStyleConfiguration">
<state> <state>
<option name="USE_PER_PROJECT_SETTINGS" value="true" /> <option name="USE_PER_PROJECT_SETTINGS" value="true" />
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
</state> </state>
</component> </component>

View File

@ -282,10 +282,12 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
* *
* @param onDisconnect implement this callback to perform logic when the RPC disconnects on connection disconnect * @param onDisconnect implement this callback to perform logic when the RPC disconnects on connection disconnect
* @param onReconnect implement this callback to perform logic when the RPC has reconnected after connection disconnect * @param onReconnect implement this callback to perform logic when the RPC has reconnected after connection disconnect
* @param maxRetries the maximum number of retries per each individual RPC call. A negative number indicates infinite number of retries.
* The default value is 5.
*/ */
class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -> Unit = {}) { class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -> Unit = {}, val maxRetries: Int = 5) {
constructor(onDisconnect: Runnable, onReconnect: Runnable ) : constructor(onDisconnect: Runnable, onReconnect: Runnable, maxRetries: Int = 5) :
this(onDisconnect = { onDisconnect.run() }, onReconnect = { onReconnect.run() }) this(onDisconnect = { onDisconnect.run() }, onReconnect = { onReconnect.run() }, maxRetries = maxRetries)
} }
/** /**

View File

@ -9,6 +9,13 @@ open class RPCException(message: String?, cause: Throwable?) : CordaRuntimeExcep
constructor(msg: String) : this(msg, null) constructor(msg: String) : this(msg, null)
} }
/**
* Thrown to indicate an RPC operation has been retried for the [maxNumberOfRetries] unsuccessfully.
* @param maxNumberOfRetries the number of retries that had been performed.
* @param cause the cause of the last failed attempt.
*/
class MaxRpcRetryException(maxNumberOfRetries: Int, cause: Throwable?): RPCException("Max number of retries ($maxNumberOfRetries) was reached.", cause)
/** /**
* Signals that the underlying [RPCConnection] dropped. * Signals that the underlying [RPCConnection] dropped.
*/ */

View File

@ -20,6 +20,7 @@ import net.corda.nodeapi.exceptions.RejectedCommandException
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.apache.activemq.artemis.api.core.ActiveMQUnBlockedException import org.apache.activemq.artemis.api.core.ActiveMQUnBlockedException
import java.lang.RuntimeException
import java.lang.reflect.InvocationHandler import java.lang.reflect.InvocationHandler
import java.lang.reflect.InvocationTargetException import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method import java.lang.reflect.Method
@ -55,7 +56,7 @@ class ReconnectingCordaRPCOps private constructor(
username: String, username: String,
password: String, password: String,
rpcConfiguration: CordaRPCClientConfiguration, rpcConfiguration: CordaRPCClientConfiguration,
gracefulReconnect: GracefulReconnect? = null, gracefulReconnect: GracefulReconnect = GracefulReconnect(),
sslConfiguration: ClientRpcSslOptions? = null, sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null, classLoader: ClassLoader? = null,
observersPool: ExecutorService observersPool: ExecutorService
@ -117,7 +118,7 @@ class ReconnectingCordaRPCOps private constructor(
val rpcConfiguration: CordaRPCClientConfiguration, val rpcConfiguration: CordaRPCClientConfiguration,
val sslConfiguration: ClientRpcSslOptions? = null, val sslConfiguration: ClientRpcSslOptions? = null,
val classLoader: ClassLoader?, val classLoader: ClassLoader?,
val gracefulReconnect: GracefulReconnect? = null, val gracefulReconnect: GracefulReconnect = GracefulReconnect(),
val observersPool: ExecutorService val observersPool: ExecutorService
) : RPCConnection<CordaRPCOps> { ) : RPCConnection<CordaRPCOps> {
private var currentRPCConnection: CordaRPCConnection? = null private var currentRPCConnection: CordaRPCConnection? = null
@ -147,13 +148,13 @@ class ReconnectingCordaRPCOps private constructor(
// First one to get here gets to do all the reconnect logic, including calling onDisconnect and onReconnect. This makes sure // First one to get here gets to do all the reconnect logic, including calling onDisconnect and onReconnect. This makes sure
// that they're only called once per reconnect. // that they're only called once per reconnect.
currentState = DIED currentState = DIED
gracefulReconnect?.onDisconnect?.invoke() gracefulReconnect.onDisconnect.invoke()
//TODO - handle error cases //TODO - handle error cases
log.error("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}") log.error("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}")
log.debug("", e) log.debug("", e)
connect() connect()
previousConnection?.forceClose() previousConnection?.forceClose()
gracefulReconnect?.onReconnect?.invoke() gracefulReconnect.onReconnect.invoke()
} }
/** /**
* Called on external error. * Called on external error.
@ -249,9 +250,16 @@ class ReconnectingCordaRPCOps private constructor(
} }
} }
private fun doInvoke(method: Method, args: Array<out Any>?): Any? { /**
// will stop looping when [method.invoke] succeeds * This method retries the invoked operation in a loop by re-establishing the connection when there is a problem
while (true) { * and checking if the [maxNumberOfRetries] has been exhausted.
*
* A negative number for [maxNumberOfRetries] means an unlimited number of retries will be performed.
*/
private fun doInvoke(method: Method, args: Array<out Any>?, maxNumberOfRetries: Int): Any? {
var remainingRetries = maxNumberOfRetries
var lastException: Throwable? = null
while (remainingRetries != 0) {
try { try {
log.debug { "Invoking RPC $method..." } log.debug { "Invoking RPC $method..." }
return method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also { return method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also {
@ -280,15 +288,19 @@ class ReconnectingCordaRPCOps private constructor(
checkIfIsStartFlow(method, e) checkIfIsStartFlow(method, e)
} }
} }
lastException = e.targetException
remainingRetries--
} }
} }
throw MaxRpcRetryException(maxNumberOfRetries, lastException)
} }
override fun invoke(proxy: Any, method: Method, args: Array<out Any>?): Any? { override fun invoke(proxy: Any, method: Method, args: Array<out Any>?): Any? {
return when (method.returnType) { return when (method.returnType) {
DataFeed::class.java -> { DataFeed::class.java -> {
// Intercept the data feed methods and return a ReconnectingObservable instance // Intercept the data feed methods and return a ReconnectingObservable instance
val initialFeed: DataFeed<Any, Any?> = uncheckedCast(doInvoke(method, args)) val initialFeed: DataFeed<Any, Any?> = uncheckedCast(doInvoke(method, args, reconnectingRPCConnection.gracefulReconnect.maxRetries))
val observable = ReconnectingObservable(reconnectingRPCConnection, initialFeed) { val observable = ReconnectingObservable(reconnectingRPCConnection, initialFeed) {
// This handles reconnecting and creates new feeds. // This handles reconnecting and creates new feeds.
uncheckedCast(this.invoke(reconnectingRPCConnection.proxy, method, args)) uncheckedCast(this.invoke(reconnectingRPCConnection.proxy, method, args))
@ -296,7 +308,7 @@ class ReconnectingCordaRPCOps private constructor(
initialFeed.copy(updates = observable) initialFeed.copy(updates = observable)
} }
// TODO - add handlers for Observable return types. // TODO - add handlers for Observable return types.
else -> doInvoke(method, args) else -> doInvoke(method, args, reconnectingRPCConnection.gracefulReconnect.maxRetries)
} }
} }
} }