diff --git a/client/src/main/kotlin/net/corda/client/model/NodeMonitorModel.kt b/client/src/main/kotlin/net/corda/client/model/NodeMonitorModel.kt index df383eb4a0..8569835beb 100644 --- a/client/src/main/kotlin/net/corda/client/model/NodeMonitorModel.kt +++ b/client/src/main/kotlin/net/corda/client/model/NodeMonitorModel.kt @@ -9,6 +9,7 @@ import net.corda.core.messaging.StateMachineUpdate import net.corda.core.node.services.NetworkMapCache.MapChange import net.corda.core.node.services.StateMachineTransactionMapping import net.corda.core.node.services.Vault +import net.corda.core.seconds import net.corda.core.transactions.SignedTransaction import net.corda.node.services.config.SSLConfiguration import net.corda.node.services.messaging.CordaRPCClient @@ -52,7 +53,9 @@ class NodeMonitorModel { * TODO provide an unsubscribe mechanism */ fun register(nodeHostAndPort: HostAndPort, sslConfig: SSLConfiguration, username: String, password: String) { - val client = CordaRPCClient(nodeHostAndPort, sslConfig) + val client = CordaRPCClient(nodeHostAndPort, sslConfig){ + maxRetryInterval = 10.seconds.toMillis() + } client.start(username, password) val proxy = client.proxy() diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClient.kt index 10514e08f3..7c68bd24b3 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/CordaRPCClient.kt @@ -4,6 +4,8 @@ import com.google.common.net.HostAndPort import net.corda.core.ThreadBox import net.corda.core.logElapsedTime import net.corda.core.messaging.CordaRPCOps +import net.corda.core.minutes +import net.corda.core.seconds import net.corda.core.utilities.loggerFor import net.corda.node.services.config.SSLConfiguration import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Outbound @@ -11,6 +13,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException import org.apache.activemq.artemis.api.core.client.ActiveMQClient import org.apache.activemq.artemis.api.core.client.ClientSession import org.apache.activemq.artemis.api.core.client.ClientSessionFactory +import org.apache.activemq.artemis.api.core.client.ServerLocator import rx.Observable import java.io.Closeable import java.time.Duration @@ -24,7 +27,7 @@ import javax.annotation.concurrent.ThreadSafe * @param config If specified, the SSL configuration to use. If not specified, SSL will be disabled and the node will not be authenticated, nor will RPC traffic be encrypted. */ @ThreadSafe -class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguration?) : Closeable, ArtemisMessagingComponent() { +class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguration?, val serviceConfigurationOverride: (ServerLocator.() -> Unit)? = null) : Closeable, ArtemisMessagingComponent() { private companion object { val log = loggerFor() } @@ -49,11 +52,15 @@ class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguratio check(!running) log.logElapsedTime("Startup") { checkStorePasswords() - val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(Outbound(), host.hostText, host.port)) - serverLocator.threadPoolMaxSize = 1 - // TODO: Configure session reconnection, confirmation window sizes and other Artemis features. - // This will allow reconnection in case of server restart/network outages/IP address changes, etc. - // See http://activemq.apache.org/artemis/docs/1.5.0/client-reconnection.html + val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(Outbound(), host.hostText, host.port)).apply { + // TODO: Put these in config file or make it user configurable? + threadPoolMaxSize = 1 + confirmationWindowSize = 100000 // a guess + retryInterval = 5.seconds.toMillis() + retryIntervalMultiplier = 1.5 // Exponential backoff + maxRetryInterval = 3.minutes.toMillis() + serviceConfigurationOverride?.invoke(this) + } sessionFactory = serverLocator.createSessionFactory() session = sessionFactory.createSession(username, password, false, true, true, serverLocator.isPreAcknowledge, serverLocator.ackBatchSize) session.start()