mirror of
https://github.com/corda/corda.git
synced 2025-01-30 16:14:39 +00:00
Address PR comments
This commit is contained in:
parent
be083d6763
commit
e51878417b
@ -388,7 +388,7 @@ class RPCStabilityTests {
|
|||||||
servers[response]!!.shutdown()
|
servers[response]!!.shutdown()
|
||||||
servers.remove(response)
|
servers.remove(response)
|
||||||
|
|
||||||
//failover will take some time
|
// Failover will take some time.
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
response = client.serverId()
|
response = client.serverId()
|
||||||
|
@ -93,7 +93,8 @@ interface CordaRPCClientConfiguration {
|
|||||||
* [CordaRPCClientConfiguration]. While attempting failover, current and future RPC calls will throw
|
* [CordaRPCClientConfiguration]. While attempting failover, current and future RPC calls will throw
|
||||||
* [RPCException] and previously returned observables will call onError().
|
* [RPCException] and previously returned observables will call onError().
|
||||||
*
|
*
|
||||||
* If the client was created using a list of hosts, automatic failover will occur(the servers have to be started in HA mode)
|
* If the client was created using a list of hosts, automatic failover will occur (the servers have to be started in
|
||||||
|
* HA mode).
|
||||||
*
|
*
|
||||||
* @param hostAndPort The network address to connect to.
|
* @param hostAndPort The network address to connect to.
|
||||||
* @param configuration An optional configuration used to tweak client behaviour.
|
* @param configuration An optional configuration used to tweak client behaviour.
|
||||||
@ -130,14 +131,6 @@ class CordaRPCClient private constructor(
|
|||||||
return CordaRPCClient(hostAndPort, configuration, sslConfiguration)
|
return CordaRPCClient(hostAndPort, configuration, sslConfiguration)
|
||||||
}
|
}
|
||||||
|
|
||||||
internal fun createWithSsl(
|
|
||||||
haAddressPool: List<NetworkHostAndPort>,
|
|
||||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
|
|
||||||
sslConfiguration: SSLConfiguration? = null
|
|
||||||
): CordaRPCClient {
|
|
||||||
return CordaRPCClient(haAddressPool.first(), configuration, sslConfiguration, null, haAddressPool)
|
|
||||||
}
|
|
||||||
|
|
||||||
internal fun createWithSslAndClassLoader(
|
internal fun createWithSslAndClassLoader(
|
||||||
hostAndPort: NetworkHostAndPort,
|
hostAndPort: NetworkHostAndPort,
|
||||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
|
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
|
||||||
@ -146,15 +139,6 @@ class CordaRPCClient private constructor(
|
|||||||
): CordaRPCClient {
|
): CordaRPCClient {
|
||||||
return CordaRPCClient(hostAndPort, configuration, sslConfiguration, classLoader)
|
return CordaRPCClient(hostAndPort, configuration, sslConfiguration, classLoader)
|
||||||
}
|
}
|
||||||
|
|
||||||
internal fun createWithSslAndClassLoader(
|
|
||||||
haAddressPool: List<NetworkHostAndPort>,
|
|
||||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
|
|
||||||
sslConfiguration: SSLConfiguration? = null,
|
|
||||||
classLoader: ClassLoader? = null
|
|
||||||
): CordaRPCClient {
|
|
||||||
return CordaRPCClient(haAddressPool.first(), configuration, sslConfiguration, classLoader, haAddressPool)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
init {
|
init {
|
||||||
|
@ -70,6 +70,12 @@ import kotlin.reflect.jvm.javaMethod
|
|||||||
* unsubscribing from the [Observable], or if the [Observable] is garbage collected the client will eventually
|
* unsubscribing from the [Observable], or if the [Observable] is garbage collected the client will eventually
|
||||||
* automatically signal the server. This is done using a cache that holds weak references to the [UnicastSubject]s.
|
* automatically signal the server. This is done using a cache that holds weak references to the [UnicastSubject]s.
|
||||||
* The cleanup happens in batches using a dedicated reaper, scheduled on [reaperExecutor].
|
* The cleanup happens in batches using a dedicated reaper, scheduled on [reaperExecutor].
|
||||||
|
*
|
||||||
|
* The client will attempt to failover in case the server become unreachable. Depending on the [ServerLocataor] instance
|
||||||
|
* passed in the constructor, failover is either handle at Artemis level or client level. If only one transport
|
||||||
|
* was used to create the [ServerLocator], failover is handled by Artemis (retrying based on [CordaRPCClientConfiguration].
|
||||||
|
* If a list of transport configurations was used, failover is handled locally. Artemis is able to do it, however the
|
||||||
|
* brokers on server side need to be configured in HA mode and the [ServerLocator] needs to be created with HA as well.
|
||||||
*/
|
*/
|
||||||
class RPCClientProxyHandler(
|
class RPCClientProxyHandler(
|
||||||
private val rpcConfiguration: CordaRPCClientConfiguration,
|
private val rpcConfiguration: CordaRPCClientConfiguration,
|
||||||
@ -175,7 +181,7 @@ class RPCClientProxyHandler(
|
|||||||
private val deduplicationSequenceNumber = AtomicLong(0)
|
private val deduplicationSequenceNumber = AtomicLong(0)
|
||||||
|
|
||||||
private val sendingEnabled = AtomicBoolean(true)
|
private val sendingEnabled = AtomicBoolean(true)
|
||||||
// used to interrupt failover thread (i.e. client is closed while failing over)
|
// Used to interrupt failover thread (i.e. client is closed while failing over).
|
||||||
private var haFailoverThread: Thread? = null
|
private var haFailoverThread: Thread? = null
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -440,13 +446,13 @@ class RPCClientProxyHandler(
|
|||||||
try {
|
try {
|
||||||
Thread.sleep(retryInterval.toMillis())
|
Thread.sleep(retryInterval.toMillis())
|
||||||
} catch (e: InterruptedException) {}
|
} catch (e: InterruptedException) {}
|
||||||
// could not connect, try with next server transport
|
// Could not connect, try with next server transport.
|
||||||
reconnectAttempts--
|
reconnectAttempts--
|
||||||
retryInterval = minOf(maxRetryInterval, retryInterval.times(rpcConfiguration.connectionRetryIntervalMultiplier.toLong()))
|
retryInterval = minOf(maxRetryInterval, retryInterval.times(rpcConfiguration.connectionRetryIntervalMultiplier.toLong()))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
log.debug("Connected successfully using ${transport.params}")
|
log.debug("Connected successfully after $reconnectAttempts attempts using ${transport.params}.")
|
||||||
log.info("RPC server available.")
|
log.info("RPC server available.")
|
||||||
sessionFactory!!.addFailoverListener(this::haFailoverHandler)
|
sessionFactory!!.addFailoverListener(this::haFailoverHandler)
|
||||||
initSessions()
|
initSessions()
|
||||||
@ -485,7 +491,7 @@ class RPCClientProxyHandler(
|
|||||||
haFailoverThread = Thread.currentThread()
|
haFailoverThread = Thread.currentThread()
|
||||||
attemptReconnect()
|
attemptReconnect()
|
||||||
}
|
}
|
||||||
/* Other events are not considered as reconnection is not done by Artemis */
|
// Other events are not considered as reconnection is not done by Artemis.
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun failoverHandler(event: FailoverEventType) {
|
private fun failoverHandler(event: FailoverEventType) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user