Revert "CORDA-3281: Add a check for shutdown to avoid some of the errors (#5562)" (#5578)

This reverts commit d983b7dd70.
This commit is contained in:
Ryan Fowler
2019-10-11 13:29:30 +01:00
committed by Rick Parker
parent 7a929f177c
commit f5a966ee3b

View File

@ -89,12 +89,7 @@ class ReconnectingCordaRPCOps private constructor(
* *
* Note that this method does not guarantee 100% that the flow will not be started twice. * Note that this method does not guarantee 100% that the flow will not be started twice.
*/ */
fun runFlowWithLogicalRetry( fun runFlowWithLogicalRetry(runFlow: (CordaRPCOps) -> StateMachineRunId, hasFlowStarted: (CordaRPCOps) -> Boolean, onFlowConfirmed: () -> Unit = {}, timeout: Duration = 4.seconds) {
runFlow: (CordaRPCOps) -> StateMachineRunId,
hasFlowStarted: (CordaRPCOps) -> Boolean,
onFlowConfirmed: () -> Unit = {},
timeout: Duration = 4.seconds
) {
try { try {
runFlow(this) runFlow(this)
onFlowConfirmed() onFlowConfirmed()
@ -254,40 +249,39 @@ class ReconnectingCordaRPCOps private constructor(
} }
} }
private tailrec fun doInvoke(method: Method, args: Array<out Any>?): Any? { private fun doInvoke(method: Method, args: Array<out Any>?): Any? {
// will stop recursing when [method.invoke] succeeds // will stop looping when [method.invoke] succeeds
try { while (true) {
log.debug { "Invoking RPC $method..." } try {
return method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also { log.debug { "Invoking RPC $method..." }
log.debug { "RPC $method invoked successfully." } return method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also {
} log.debug { "RPC $method invoked successfully." }
} catch (e: InvocationTargetException) {
when (e.targetException) {
is RejectedCommandException -> {
log.error("Node is being shutdown. Operation ${method.name} rejected. Retrying when node is up...", e)
reconnectingRPCConnection.reconnectOnError(e)
} }
is ConnectionFailureException -> { } catch (e: InvocationTargetException) {
if (!reconnectingRPCConnection.proxy.isWaitingForShutdown()) { when (e.targetException) {
log.error("Failed to perform operation ${method.name}. Connection dropped. Retrying....", e) is RejectedCommandException -> {
log.error("Node is being shutdown. Operation ${method.name} rejected. Retrying when node is up...", e)
reconnectingRPCConnection.reconnectOnError(e) reconnectingRPCConnection.reconnectOnError(e)
} }
checkIfIsStartFlow(method, e) is ConnectionFailureException -> {
} log.error("Failed to perform operation ${method.name}. Connection dropped. Retrying....", e)
is RPCException -> { reconnectingRPCConnection.reconnectOnError(e)
log.error("Failed to perform operation ${method.name}. RPCException. Retrying....", e) checkIfIsStartFlow(method, e)
reconnectingRPCConnection.reconnectOnError(e) }
Thread.sleep(1000) // TODO - explain why this sleep is necessary is RPCException -> {
checkIfIsStartFlow(method, e) log.error("Failed to perform operation ${method.name}. RPCException. Retrying....", e)
} reconnectingRPCConnection.reconnectOnError(e)
else -> { Thread.sleep(1000) // TODO - explain why this sleep is necessary
log.error("Failed to perform operation ${method.name}. Unknown error. Retrying....", e) checkIfIsStartFlow(method, e)
reconnectingRPCConnection.reconnectOnError(e) }
checkIfIsStartFlow(method, e) else -> {
log.error("Failed to perform operation ${method.name}. Unknown error. Retrying....", e)
reconnectingRPCConnection.reconnectOnError(e)
checkIfIsStartFlow(method, e)
}
} }
} }
} }
return doInvoke(method, args)
} }
override fun invoke(proxy: Any, method: Method, args: Array<out Any>?): Any? { override fun invoke(proxy: Any, method: Method, args: Array<out Any>?): Any? {