Add documentation and param renaming

This commit is contained in:
Dimos Raptis
2019-10-14 15:07:13 +01:00
parent 05a8f050e3
commit 5e78e0a7ae
3 changed files with 23 additions and 17 deletions

View File

@ -282,12 +282,13 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
* *
* @param onDisconnect implement this callback to perform logic when the RPC disconnects on connection disconnect * @param onDisconnect implement this callback to perform logic when the RPC disconnects on connection disconnect
* @param onReconnect implement this callback to perform logic when the RPC has reconnected after connection disconnect * @param onReconnect implement this callback to perform logic when the RPC has reconnected after connection disconnect
* @param maxRetries the maximum number of retries per each individual RPC call. A negative number indicates infinite number of retries. * @param maxAttempts the maximum number of attempts per each individual RPC call. A negative number indicates infinite number of retries.
* The default value is 5. * The default value is 5.
*/ */
class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -> Unit = {}, val maxRetries: Int = 5) { class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -> Unit = {}, val maxAttempts: Int = 5) {
constructor(onDisconnect: Runnable, onReconnect: Runnable, maxRetries: Int = 5) : @JvmOverloads
this(onDisconnect = { onDisconnect.run() }, onReconnect = { onReconnect.run() }, maxRetries = maxRetries) constructor(onDisconnect: Runnable, onReconnect: Runnable, maxAttempts: Int = 5) :
this(onDisconnect = { onDisconnect.run() }, onReconnect = { onReconnect.run() }, maxAttempts = maxAttempts)
} }
/** /**

View File

@ -20,7 +20,6 @@ import net.corda.nodeapi.exceptions.RejectedCommandException
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.apache.activemq.artemis.api.core.ActiveMQUnBlockedException import org.apache.activemq.artemis.api.core.ActiveMQUnBlockedException
import java.lang.RuntimeException
import java.lang.reflect.InvocationHandler import java.lang.reflect.InvocationHandler
import java.lang.reflect.InvocationTargetException import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method import java.lang.reflect.Method
@ -252,14 +251,14 @@ class ReconnectingCordaRPCOps private constructor(
/** /**
* This method retries the invoked operation in a loop by re-establishing the connection when there is a problem * This method retries the invoked operation in a loop by re-establishing the connection when there is a problem
* and checking if the [maxNumberOfRetries] has been exhausted. * and checking if the [maxNumberOfAttempts] has been exhausted.
* *
* A negative number for [maxNumberOfRetries] means an unlimited number of retries will be performed. * A negative number for [maxNumberOfAttempts] means an unlimited number of retries will be performed.
*/ */
private fun doInvoke(method: Method, args: Array<out Any>?, maxNumberOfRetries: Int): Any? { private fun doInvoke(method: Method, args: Array<out Any>?, maxNumberOfAttempts: Int): Any? {
var remainingRetries = maxNumberOfRetries var remainingAttempts = maxNumberOfAttempts
var lastException: Throwable? = null var lastException: Throwable? = null
while (remainingRetries != 0) { while (remainingAttempts != 0) {
try { try {
log.debug { "Invoking RPC $method..." } log.debug { "Invoking RPC $method..." }
return method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also { return method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also {
@ -289,11 +288,11 @@ class ReconnectingCordaRPCOps private constructor(
} }
} }
lastException = e.targetException lastException = e.targetException
remainingRetries-- remainingAttempts--
} }
} }
throw MaxRpcRetryException(maxNumberOfRetries, lastException) throw MaxRpcRetryException(maxNumberOfAttempts, lastException)
} }
override fun invoke(proxy: Any, method: Method, args: Array<out Any>?): Any? { override fun invoke(proxy: Any, method: Method, args: Array<out Any>?): Any? {
@ -301,7 +300,7 @@ class ReconnectingCordaRPCOps private constructor(
DataFeed::class.java -> { DataFeed::class.java -> {
// Intercept the data feed methods and return a ReconnectingObservable instance // Intercept the data feed methods and return a ReconnectingObservable instance
val initialFeed: DataFeed<Any, Any?> = uncheckedCast(doInvoke(method, args, val initialFeed: DataFeed<Any, Any?> = uncheckedCast(doInvoke(method, args,
reconnectingRPCConnection.gracefulReconnect.maxRetries)) reconnectingRPCConnection.gracefulReconnect.maxAttempts))
val observable = ReconnectingObservable(reconnectingRPCConnection, initialFeed) { val observable = ReconnectingObservable(reconnectingRPCConnection, initialFeed) {
// This handles reconnecting and creates new feeds. // This handles reconnecting and creates new feeds.
uncheckedCast(this.invoke(reconnectingRPCConnection.proxy, method, args)) uncheckedCast(this.invoke(reconnectingRPCConnection.proxy, method, args))
@ -309,7 +308,7 @@ class ReconnectingCordaRPCOps private constructor(
initialFeed.copy(updates = observable) initialFeed.copy(updates = observable)
} }
// TODO - add handlers for Observable return types. // TODO - add handlers for Observable return types.
else -> doInvoke(method, args, reconnectingRPCConnection.gracefulReconnect.maxRetries) else -> doInvoke(method, args, reconnectingRPCConnection.gracefulReconnect.maxAttempts)
} }
} }
} }

View File

@ -371,13 +371,19 @@ More specifically, the behaviour in the second case is a bit more subtle:
.. warning:: In this approach, some events might be lost during a reconnection and not sent from the subscribed ``Observable``\s. .. warning:: In this approach, some events might be lost during a reconnection and not sent from the subscribed ``Observable``\s.
You can enable this graceful form of reconnection by using the ``gracefulReconnect`` parameter in the following way: You can enable this graceful form of reconnection by using the ``gracefulReconnect`` parameter, which is an object containing 3 optional fields:
* ``onDisconnect``: A callback handler that will be invoked every time the connection is disconnected.
* ``onReconnect``: A callback handler that will be invoked every time the connection is established again after a disconnection.
* ``maxAttempts``: The maximum number of attempts that will be performed per RPC operation. A negative value implies infinite retries. The default value is 5.
This can be used in the following way:
.. container:: codeset .. container:: codeset
.. sourcecode:: kotlin .. sourcecode:: kotlin
val gracefulReconnect = GracefulReconnect(onDisconnect={/*insert disconnect handling*/}, onReconnect{/*insert reconnect handling*/}) val gracefulReconnect = GracefulReconnect(onDisconnect={/*insert disconnect handling*/}, onReconnect{/*insert reconnect handling*/}, maxAttempts = 3)
val cordaClient = CordaRPCClient(nodeRpcAddress) val cordaClient = CordaRPCClient(nodeRpcAddress)
val cordaRpcOps = cordaClient.start(rpcUserName, rpcUserPassword, gracefulReconnect = gracefulReconnect).proxy val cordaRpcOps = cordaClient.start(rpcUserName, rpcUserPassword, gracefulReconnect = gracefulReconnect).proxy
@ -392,7 +398,7 @@ You can enable this graceful form of reconnection by using the ``gracefulReconne
} }
void method() { void method() {
GracefulReconnect gracefulReconnect = new GracefulReconnect(this::onDisconnect, this::onReconnect); GracefulReconnect gracefulReconnect = new GracefulReconnect(this::onDisconnect, this::onReconnect, 3);
CordaRPCClient cordaClient = new CordaRPCClient(nodeRpcAddress); CordaRPCClient cordaClient = new CordaRPCClient(nodeRpcAddress);
CordaRPCConnection cordaRpcOps = cordaClient.start(rpcUserName, rpcUserPassword, gracefulReconnect); CordaRPCConnection cordaRpcOps = cordaClient.start(rpcUserName, rpcUserPassword, gracefulReconnect);
} }