ENT-1618: added support for RPC client in HA mode(automatic failover) (#706)

* ENT-1618: added support for RPC client in HA mode(automatic failover)

* RPCClient: renamed variables to avoid confusion about clustering
This commit is contained in:
bpaunescu 2018-04-10 12:38:37 +01:00 committed by GitHub
parent 4ed917436a
commit f022c67689
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 93 additions and 12 deletions

View File

@ -103,19 +103,34 @@ interface CordaRPCClientConfiguration {
* [CordaRPCClientConfiguration]. While attempting failover, current and future RPC calls will throw
* [RPCException] and previously returned observables will call onError().
*
* If the client was created using a list of hosts, automatic failover will occur(the servers have to be started in HA mode)
*
* @param hostAndPort The network address to connect to.
* @param configuration An optional configuration used to tweak client behaviour.
* @param sslConfiguration An optional [SSLConfiguration] used to enable secure communication with the server.
* @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode.
* The client will attempt to connect to a live server by trying each address in the list. If the servers are not in
* HA mode, no failover will occur and the client will try reconnecting to the initial server it connected to.
*/
class CordaRPCClient private constructor(
hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
sslConfiguration: SSLConfiguration? = null,
classLoader: ClassLoader? = null
private val hostAndPort: NetworkHostAndPort,
private val configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
private val sslConfiguration: SSLConfiguration? = null,
private val classLoader: ClassLoader? = null,
private val haAddressPool: List<NetworkHostAndPort> = emptyList()
) {
@JvmOverloads
constructor(hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) : this(hostAndPort, configuration, null)
/**
* @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode.
* The client will attempt to connect to a live server by trying each address in the list. If the servers are not in
* HA mode, no failover will occur and the client will try reconnecting to the initial server it connected to.
* @param configuration An optional configuration used to tweak client behaviour.
*/
@JvmOverloads
constructor(haAddressPool: List<NetworkHostAndPort>, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) : this(haAddressPool.first(), configuration, null, null, haAddressPool)
companion object {
internal fun createWithSsl(
hostAndPort: NetworkHostAndPort,
@ -125,6 +140,14 @@ class CordaRPCClient private constructor(
return CordaRPCClient(hostAndPort, configuration, sslConfiguration)
}
internal fun createWithSsl(
haAddressPool: List<NetworkHostAndPort>,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
sslConfiguration: SSLConfiguration? = null
): CordaRPCClient {
return CordaRPCClient(haAddressPool.first(), configuration, sslConfiguration, null, haAddressPool)
}
internal fun createWithSslAndClassLoader(
hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
@ -133,6 +156,15 @@ class CordaRPCClient private constructor(
): CordaRPCClient {
return CordaRPCClient(hostAndPort, configuration, sslConfiguration, classLoader)
}
internal fun createWithSslAndClassLoader(
haAddressPool: List<NetworkHostAndPort>,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
sslConfiguration: SSLConfiguration? = null,
classLoader: ClassLoader? = null
): CordaRPCClient {
return CordaRPCClient(haAddressPool.first(), configuration, sslConfiguration, classLoader, haAddressPool)
}
}
init {
@ -147,11 +179,19 @@ class CordaRPCClient private constructor(
}
}
private val rpcClient = RPCClient<CordaRPCOps>(
tcpTransport(ConnectionDirection.Outbound(), hostAndPort, config = sslConfiguration),
configuration,
if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT
)
private fun getRpcClient() : RPCClient<CordaRPCOps> {
return if (haAddressPool.isEmpty()) {
RPCClient(
tcpTransport(ConnectionDirection.Outbound(), hostAndPort, config = sslConfiguration),
configuration,
if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT)
} else {
RPCClient(haAddressPool,
sslConfiguration,
configuration,
if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT)
}
}
/**
* Logs in to the target server and returns an active connection. The returned connection is a [java.io.Closeable]
@ -179,7 +219,7 @@ class CordaRPCClient private constructor(
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/
fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?): CordaRPCConnection {
return CordaRPCConnection(rpcClient.start(CordaRPCOps::class.java, username, password, externalTrace, impersonatedActor))
return CordaRPCConnection(getRpcClient().start(CordaRPCOps::class.java, username, password, externalTrace, impersonatedActor))
}
/**

View File

@ -25,6 +25,12 @@ fun createCordaRPCClientWithSsl(
sslConfiguration: SSLConfiguration? = null
) = CordaRPCClient.createWithSsl(hostAndPort, configuration, sslConfiguration)
fun createCordaRPCClientWithSsl(
haAddressPool: List<NetworkHostAndPort>,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
sslConfiguration: SSLConfiguration? = null
) = CordaRPCClient.createWithSsl(haAddressPool, configuration, sslConfiguration)
fun createCordaRPCClientWithSslAndClassLoader(
hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
@ -32,6 +38,13 @@ fun createCordaRPCClientWithSslAndClassLoader(
classLoader: ClassLoader? = null
) = CordaRPCClient.createWithSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader)
fun createCordaRPCClientWithSslAndClassLoader(
haAddressPool: List<NetworkHostAndPort>,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
sslConfiguration: SSLConfiguration? = null,
classLoader: ClassLoader? = null
) = CordaRPCClient.createWithSslAndClassLoader(haAddressPool, configuration, sslConfiguration, classLoader)
fun CordaRPCOps.drainAndShutdown(): Observable<Unit> {
setFlowsDrainingModeEnabled(true)

View File

@ -24,6 +24,7 @@ import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.*
import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport
import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransportsFromList
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.internal.config.SSLConfiguration
@ -70,7 +71,8 @@ data class CordaRPCClientConfigurationImpl(
class RPCClient<I : RPCOps>(
val transport: TransportConfiguration,
val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default,
val serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
val serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT,
val haPoolTransportConfigurations: List<TransportConfiguration> = emptyList()
) {
constructor(
hostAndPort: NetworkHostAndPort,
@ -79,6 +81,14 @@ class RPCClient<I : RPCOps>(
serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
) : this(tcpTransport(ConnectionDirection.Outbound(), hostAndPort, sslConfiguration), configuration, serializationContext)
constructor(
haAddressPool: List<NetworkHostAndPort>,
sslConfiguration: SSLConfiguration? = null,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default,
serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
) : this(tcpTransport(ConnectionDirection.Outbound(), haAddressPool.first(), sslConfiguration),
configuration, serializationContext, tcpTransportsFromList(ConnectionDirection.Outbound(), haAddressPool, sslConfiguration))
companion object {
private val log = contextLogger()
}
@ -93,7 +103,11 @@ class RPCClient<I : RPCOps>(
return log.logElapsedTime("Startup") {
val clientAddress = SimpleString("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username.${random63BitValue()}")
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(transport).apply {
val serverLocator = (if (haPoolTransportConfigurations.isEmpty()) {
ActiveMQClient.createServerLocatorWithoutHA(transport)
} else {
ActiveMQClient.createServerLocatorWithHA(*haPoolTransportConfigurations.toTypedArray())
}).apply {
retryInterval = rpcConfiguration.connectionRetryInterval.toMillis()
retryIntervalMultiplier = rpcConfiguration.connectionRetryIntervalMultiplier
maxRetryInterval = rpcConfiguration.connectionMaxRetryInterval.toMillis()

View File

@ -104,5 +104,19 @@ class ArtemisTcpTransport {
}
return TransportConfiguration(factoryName, options)
}
/** Create as list of [TransportConfiguration]. **/
fun tcpTransportsFromList(
direction: ConnectionDirection,
hostAndPortList: List<NetworkHostAndPort>,
config: SSLConfiguration?,
enableSSL: Boolean = true): List<TransportConfiguration>{
val tcpTransports = ArrayList<TransportConfiguration>(hostAndPortList.size)
hostAndPortList.forEach {
tcpTransports.add(tcpTransport(direction, it, config, enableSSL))
}
return tcpTransports
}
}
}