diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index b495570148..616f5b6b14 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -103,19 +103,34 @@ interface CordaRPCClientConfiguration { * [CordaRPCClientConfiguration]. While attempting failover, current and future RPC calls will throw * [RPCException] and previously returned observables will call onError(). * + * If the client was created using a list of hosts, automatic failover will occur(the servers have to be started in HA mode) + * * @param hostAndPort The network address to connect to. * @param configuration An optional configuration used to tweak client behaviour. * @param sslConfiguration An optional [SSLConfiguration] used to enable secure communication with the server. + * @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode. + * The client will attempt to connect to a live server by trying each address in the list. If the servers are not in + * HA mode, no failover will occur and the client will try reconnecting to the initial server it connected to. */ class CordaRPCClient private constructor( - hostAndPort: NetworkHostAndPort, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), - sslConfiguration: SSLConfiguration? = null, - classLoader: ClassLoader? = null + private val hostAndPort: NetworkHostAndPort, + private val configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), + private val sslConfiguration: SSLConfiguration? = null, + private val classLoader: ClassLoader? = null, + private val haAddressPool: List = emptyList() ) { @JvmOverloads constructor(hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) : this(hostAndPort, configuration, null) + /** + * @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode. + * The client will attempt to connect to a live server by trying each address in the list. If the servers are not in + * HA mode, no failover will occur and the client will try reconnecting to the initial server it connected to. + * @param configuration An optional configuration used to tweak client behaviour. + */ + @JvmOverloads + constructor(haAddressPool: List, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) : this(haAddressPool.first(), configuration, null, null, haAddressPool) + companion object { internal fun createWithSsl( hostAndPort: NetworkHostAndPort, @@ -125,6 +140,14 @@ class CordaRPCClient private constructor( return CordaRPCClient(hostAndPort, configuration, sslConfiguration) } + internal fun createWithSsl( + haAddressPool: List, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), + sslConfiguration: SSLConfiguration? = null + ): CordaRPCClient { + return CordaRPCClient(haAddressPool.first(), configuration, sslConfiguration, null, haAddressPool) + } + internal fun createWithSslAndClassLoader( hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), @@ -133,6 +156,15 @@ class CordaRPCClient private constructor( ): CordaRPCClient { return CordaRPCClient(hostAndPort, configuration, sslConfiguration, classLoader) } + + internal fun createWithSslAndClassLoader( + haAddressPool: List, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), + sslConfiguration: SSLConfiguration? = null, + classLoader: ClassLoader? = null + ): CordaRPCClient { + return CordaRPCClient(haAddressPool.first(), configuration, sslConfiguration, classLoader, haAddressPool) + } } init { @@ -147,11 +179,19 @@ class CordaRPCClient private constructor( } } - private val rpcClient = RPCClient( - tcpTransport(ConnectionDirection.Outbound(), hostAndPort, config = sslConfiguration), - configuration, - if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT - ) + private fun getRpcClient() : RPCClient { + return if (haAddressPool.isEmpty()) { + RPCClient( + tcpTransport(ConnectionDirection.Outbound(), hostAndPort, config = sslConfiguration), + configuration, + if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT) + } else { + RPCClient(haAddressPool, + sslConfiguration, + configuration, + if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT) + } + } /** * Logs in to the target server and returns an active connection. The returned connection is a [java.io.Closeable] @@ -179,7 +219,7 @@ class CordaRPCClient private constructor( * @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout. */ fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?): CordaRPCConnection { - return CordaRPCConnection(rpcClient.start(CordaRPCOps::class.java, username, password, externalTrace, impersonatedActor)) + return CordaRPCConnection(getRpcClient().start(CordaRPCOps::class.java, username, password, externalTrace, impersonatedActor)) } /** diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt index dc45c1b729..61c436939d 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt @@ -25,6 +25,12 @@ fun createCordaRPCClientWithSsl( sslConfiguration: SSLConfiguration? = null ) = CordaRPCClient.createWithSsl(hostAndPort, configuration, sslConfiguration) +fun createCordaRPCClientWithSsl( + haAddressPool: List, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), + sslConfiguration: SSLConfiguration? = null +) = CordaRPCClient.createWithSsl(haAddressPool, configuration, sslConfiguration) + fun createCordaRPCClientWithSslAndClassLoader( hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), @@ -32,6 +38,13 @@ fun createCordaRPCClientWithSslAndClassLoader( classLoader: ClassLoader? = null ) = CordaRPCClient.createWithSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader) +fun createCordaRPCClientWithSslAndClassLoader( + haAddressPool: List, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), + sslConfiguration: SSLConfiguration? = null, + classLoader: ClassLoader? = null +) = CordaRPCClient.createWithSslAndClassLoader(haAddressPool, configuration, sslConfiguration, classLoader) + fun CordaRPCOps.drainAndShutdown(): Observable { setFlowsDrainingModeEnabled(true) diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt index 2b2e1539f6..77094eb268 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt @@ -24,6 +24,7 @@ import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.utilities.* import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport +import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransportsFromList import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.RPCApi import net.corda.nodeapi.internal.config.SSLConfiguration @@ -70,7 +71,8 @@ data class CordaRPCClientConfigurationImpl( class RPCClient( val transport: TransportConfiguration, val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default, - val serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT + val serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT, + val haPoolTransportConfigurations: List = emptyList() ) { constructor( hostAndPort: NetworkHostAndPort, @@ -79,6 +81,14 @@ class RPCClient( serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT ) : this(tcpTransport(ConnectionDirection.Outbound(), hostAndPort, sslConfiguration), configuration, serializationContext) + constructor( + haAddressPool: List, + sslConfiguration: SSLConfiguration? = null, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default, + serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT + ) : this(tcpTransport(ConnectionDirection.Outbound(), haAddressPool.first(), sslConfiguration), + configuration, serializationContext, tcpTransportsFromList(ConnectionDirection.Outbound(), haAddressPool, sslConfiguration)) + companion object { private val log = contextLogger() } @@ -93,7 +103,11 @@ class RPCClient( return log.logElapsedTime("Startup") { val clientAddress = SimpleString("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username.${random63BitValue()}") - val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(transport).apply { + val serverLocator = (if (haPoolTransportConfigurations.isEmpty()) { + ActiveMQClient.createServerLocatorWithoutHA(transport) + } else { + ActiveMQClient.createServerLocatorWithHA(*haPoolTransportConfigurations.toTypedArray()) + }).apply { retryInterval = rpcConfiguration.connectionRetryInterval.toMillis() retryIntervalMultiplier = rpcConfiguration.connectionRetryIntervalMultiplier maxRetryInterval = rpcConfiguration.connectionMaxRetryInterval.toMillis() diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisTcpTransport.kt b/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisTcpTransport.kt index 812134a060..e63b1ae7f9 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisTcpTransport.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisTcpTransport.kt @@ -104,5 +104,19 @@ class ArtemisTcpTransport { } return TransportConfiguration(factoryName, options) } + + /** Create as list of [TransportConfiguration]. **/ + fun tcpTransportsFromList( + direction: ConnectionDirection, + hostAndPortList: List, + config: SSLConfiguration?, + enableSSL: Boolean = true): List{ + val tcpTransports = ArrayList(hostAndPortList.size) + hostAndPortList.forEach { + tcpTransports.add(tcpTransport(direction, it, config, enableSSL)) + } + + return tcpTransports + } } }