mirror of
https://github.com/corda/corda.git
synced 2025-04-30 16:00:02 +00:00
ENT-2132 Introduce a boolean flag which controls whether we should re-try RPC connection. (#3433)
In case of initial logon - it will not be re-tried to cater for invalid endpoint and/or credentials. However, if connection been successfully established once, re-try logic is getting activated.
This commit is contained in:
parent
9be4c5dca4
commit
21d06a8aa9
@ -6,7 +6,6 @@ import javafx.beans.property.SimpleObjectProperty
|
|||||||
import net.corda.client.rpc.CordaRPCClient
|
import net.corda.client.rpc.CordaRPCClient
|
||||||
import net.corda.client.rpc.CordaRPCClientConfiguration
|
import net.corda.client.rpc.CordaRPCClientConfiguration
|
||||||
import net.corda.client.rpc.CordaRPCConnection
|
import net.corda.client.rpc.CordaRPCConnection
|
||||||
import net.corda.client.rpc.RPCException
|
|
||||||
import net.corda.core.contracts.ContractState
|
import net.corda.core.contracts.ContractState
|
||||||
import net.corda.core.flows.StateMachineRunId
|
import net.corda.core.flows.StateMachineRunId
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
@ -22,7 +21,6 @@ import net.corda.core.transactions.SignedTransaction
|
|||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.core.utilities.seconds
|
import net.corda.core.utilities.seconds
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException
|
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.Subscription
|
import rx.Subscription
|
||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
@ -125,7 +123,7 @@ class NodeMonitorModel {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val stateMachines = performRpcReconnect(nodeHostAndPort, username, password)
|
val stateMachines = performRpcReconnect(nodeHostAndPort, username, password, shouldRetry = false)
|
||||||
|
|
||||||
// Extract the flow tracking stream
|
// Extract the flow tracking stream
|
||||||
// TODO is there a nicer way of doing this? Stream of streams in general results in code like this...
|
// TODO is there a nicer way of doing this? Stream of streams in general results in code like this...
|
||||||
@ -146,9 +144,9 @@ class NodeMonitorModel {
|
|||||||
futureProgressTrackerUpdates.startWith(currentProgressTrackerUpdates).flatMap { it }.retry().subscribe(progressTrackingSubject)
|
futureProgressTrackerUpdates.startWith(currentProgressTrackerUpdates).flatMap { it }.retry().subscribe(progressTrackingSubject)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): List<StateMachineInfo> {
|
private fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String, shouldRetry: Boolean): List<StateMachineInfo> {
|
||||||
|
|
||||||
val connection = establishConnectionWithRetry(nodeHostAndPort, username, password)
|
val connection = establishConnectionWithRetry(nodeHostAndPort, username, password, shouldRetry)
|
||||||
val proxy = connection.proxy
|
val proxy = connection.proxy
|
||||||
|
|
||||||
val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed()
|
val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed()
|
||||||
@ -166,7 +164,7 @@ class NodeMonitorModel {
|
|||||||
// force closing the connection to avoid propagation of notification to the server side.
|
// force closing the connection to avoid propagation of notification to the server side.
|
||||||
connection.forceClose()
|
connection.forceClose()
|
||||||
// Perform re-connect.
|
// Perform re-connect.
|
||||||
performRpcReconnect(nodeHostAndPort, username, password)
|
performRpcReconnect(nodeHostAndPort, username, password, shouldRetry = true)
|
||||||
})
|
})
|
||||||
|
|
||||||
retryableStateMachineUpdatesSubscription.set(subscription)
|
retryableStateMachineUpdatesSubscription.set(subscription)
|
||||||
@ -176,7 +174,7 @@ class NodeMonitorModel {
|
|||||||
return stateMachineInfos
|
return stateMachineInfos
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): CordaRPCConnection {
|
private fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String, shouldRetry: Boolean): CordaRPCConnection {
|
||||||
|
|
||||||
val retryInterval = 5.seconds
|
val retryInterval = 5.seconds
|
||||||
|
|
||||||
@ -195,21 +193,15 @@ class NodeMonitorModel {
|
|||||||
require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty())
|
require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty())
|
||||||
_connection
|
_connection
|
||||||
} catch (throwable: Throwable) {
|
} catch (throwable: Throwable) {
|
||||||
when (throwable) {
|
if (shouldRetry) {
|
||||||
is ActiveMQException, is RPCException -> {
|
|
||||||
// Happens when:
|
|
||||||
// * incorrect credentials provided;
|
|
||||||
// * incorrect endpoint specified;
|
|
||||||
// - no point to retry connecting.
|
|
||||||
throw throwable
|
|
||||||
}
|
|
||||||
else -> {
|
|
||||||
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
||||||
logger.info("Exception upon establishing connection: " + throwable.message)
|
logger.info("Exception upon establishing connection: " + throwable.message)
|
||||||
null
|
null
|
||||||
|
} else {
|
||||||
|
throw throwable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if (connection != null) {
|
if (connection != null) {
|
||||||
logger.info("Connection successfully established with: $nodeHostAndPort")
|
logger.info("Connection successfully established with: $nodeHostAndPort")
|
||||||
return connection
|
return connection
|
||||||
|
Loading…
x
Reference in New Issue
Block a user