Added reconnect capability to RPC (#192)

* Added reconnect capability to RPC
* Issue - https://github.com/corda/corda/issues/184
* JIRA - https://r3-cev.atlassian.net/browse/CORDA-189
This commit is contained in:
Patrick Kuo 2017-01-31 17:04:05 +00:00 committed by GitHub
parent 646ce8afe0
commit 7f96b752d1
2 changed files with 17 additions and 7 deletions

View File

@ -9,6 +9,7 @@ import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.StateMachineTransactionMapping
import net.corda.core.node.services.Vault
import net.corda.core.seconds
import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.config.SSLConfiguration
import net.corda.node.services.messaging.CordaRPCClient
@ -52,7 +53,9 @@ class NodeMonitorModel {
* TODO provide an unsubscribe mechanism
*/
fun register(nodeHostAndPort: HostAndPort, sslConfig: SSLConfiguration, username: String, password: String) {
val client = CordaRPCClient(nodeHostAndPort, sslConfig)
val client = CordaRPCClient(nodeHostAndPort, sslConfig){
maxRetryInterval = 10.seconds.toMillis()
}
client.start(username, password)
val proxy = client.proxy()

View File

@ -4,6 +4,8 @@ import com.google.common.net.HostAndPort
import net.corda.core.ThreadBox
import net.corda.core.logElapsedTime
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.minutes
import net.corda.core.seconds
import net.corda.core.utilities.loggerFor
import net.corda.node.services.config.SSLConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Outbound
@ -11,6 +13,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory
import org.apache.activemq.artemis.api.core.client.ServerLocator
import rx.Observable
import java.io.Closeable
import java.time.Duration
@ -24,7 +27,7 @@ import javax.annotation.concurrent.ThreadSafe
* @param config If specified, the SSL configuration to use. If not specified, SSL will be disabled and the node will not be authenticated, nor will RPC traffic be encrypted.
*/
@ThreadSafe
class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguration?) : Closeable, ArtemisMessagingComponent() {
class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguration?, val serviceConfigurationOverride: (ServerLocator.() -> Unit)? = null) : Closeable, ArtemisMessagingComponent() {
private companion object {
val log = loggerFor<CordaRPCClient>()
}
@ -49,11 +52,15 @@ class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguratio
check(!running)
log.logElapsedTime("Startup") {
checkStorePasswords()
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(Outbound(), host.hostText, host.port))
serverLocator.threadPoolMaxSize = 1
// TODO: Configure session reconnection, confirmation window sizes and other Artemis features.
// This will allow reconnection in case of server restart/network outages/IP address changes, etc.
// See http://activemq.apache.org/artemis/docs/1.5.0/client-reconnection.html
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(Outbound(), host.hostText, host.port)).apply {
// TODO: Put these in config file or make it user configurable?
threadPoolMaxSize = 1
confirmationWindowSize = 100000 // a guess
retryInterval = 5.seconds.toMillis()
retryIntervalMultiplier = 1.5 // Exponential backoff
maxRetryInterval = 3.minutes.toMillis()
serviceConfigurationOverride?.invoke(this)
}
sessionFactory = serverLocator.createSessionFactory()
session = sessionFactory.createSession(username, password, false, true, true, serverLocator.isPreAcknowledge, serverLocator.ackBatchSize)
session.start()