diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index f4fa6be9f4..2092df3926 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -398,7 +398,7 @@ class RPCStabilityTests { servers[response]!!.shutdown() servers.remove(response) - //failover will take some time + // Failover will take some time. while (true) { try { response = client.serverId() diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index ce5d0c617f..db45afe4fb 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -10,9 +10,9 @@ package net.corda.client.rpc +import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.client.rpc.internal.KryoClientSerializationScheme import net.corda.client.rpc.internal.RPCClient -import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.context.Actor import net.corda.core.context.Trace import net.corda.core.messaging.CordaRPCOps @@ -103,7 +103,8 @@ interface CordaRPCClientConfiguration { * [CordaRPCClientConfiguration]. While attempting failover, current and future RPC calls will throw * [RPCException] and previously returned observables will call onError(). * - * If the client was created using a list of hosts, automatic failover will occur(the servers have to be started in HA mode) + * If the client was created using a list of hosts, automatic failover will occur (the servers have to be started in + * HA mode). * * @param hostAndPort The network address to connect to. * @param configuration An optional configuration used to tweak client behaviour. diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 6f174da6f5..1e03203031 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -80,6 +80,12 @@ import kotlin.reflect.jvm.javaMethod * unsubscribing from the [Observable], or if the [Observable] is garbage collected the client will eventually * automatically signal the server. This is done using a cache that holds weak references to the [UnicastSubject]s. * The cleanup happens in batches using a dedicated reaper, scheduled on [reaperExecutor]. + * + * The client will attempt to failover in case the server become unreachable. Depending on the [ServerLocataor] instance + * passed in the constructor, failover is either handle at Artemis level or client level. If only one transport + * was used to create the [ServerLocator], failover is handled by Artemis (retrying based on [CordaRPCClientConfiguration]. + * If a list of transport configurations was used, failover is handled locally. Artemis is able to do it, however the + * brokers on server side need to be configured in HA mode and the [ServerLocator] needs to be created with HA as well. */ class RPCClientProxyHandler( private val rpcConfiguration: CordaRPCClientConfiguration, @@ -185,7 +191,7 @@ class RPCClientProxyHandler( private val deduplicationSequenceNumber = AtomicLong(0) private val sendingEnabled = AtomicBoolean(true) - // used to interrupt failover thread (i.e. client is closed while failing over) + // Used to interrupt failover thread (i.e. client is closed while failing over). private var haFailoverThread: Thread? = null /** @@ -427,7 +433,7 @@ class RPCClientProxyHandler( } private fun attemptReconnect() { - var reconnectAttempts = rpcConfiguration.maxReconnectAttempts * serverLocator.staticTransportConfigurations.size + var reconnectAttempts = rpcConfiguration.maxReconnectAttempts.times(serverLocator.staticTransportConfigurations.size) var retryInterval = rpcConfiguration.connectionRetryInterval val maxRetryInterval = rpcConfiguration.connectionMaxRetryInterval @@ -456,7 +462,7 @@ class RPCClientProxyHandler( continue } - log.debug("Connected successfully using ${transport.params}") + log.debug("Connected successfully after $reconnectAttempts attempts using ${transport.params}.") log.info("RPC server available.") sessionFactory!!.addFailoverListener(this::haFailoverHandler) initSessions() @@ -495,7 +501,7 @@ class RPCClientProxyHandler( haFailoverThread = Thread.currentThread() attemptReconnect() } - /* Other events are not considered as reconnection is not done by Artemis */ + // Other events are not considered as reconnection is not done by Artemis. } private fun failoverHandler(event: FailoverEventType) { diff --git a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt index 3139e82d82..277ebbaf52 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt @@ -188,29 +188,6 @@ class NodeSchedulerService(private val clock: CordaClock, } } - /** - * Stop scheduler service. - */ - fun stop() { - mutex.locked { - schedulerTimerExecutor.shutdown() - scheduledStatesQueue.clear() - scheduledStates.clear() - } - } - - /** - * Resume scheduler service after having called [stop]. - */ - fun resume() { - mutex.locked { - schedulerTimerExecutor = Executors.newSingleThreadExecutor() - scheduledStates.putAll(createMap()) - scheduledStatesQueue.addAll(scheduledStates.values) - rescheduleWakeUp() - } - } - override fun scheduleStateActivity(action: ScheduledStateRef) { log.trace { "Schedule $action" } val previousState = scheduledStates[action.ref] @@ -250,7 +227,7 @@ class NodeSchedulerService(private val clock: CordaClock, } } - private var schedulerTimerExecutor = Executors.newSingleThreadExecutor() + private val schedulerTimerExecutor = Executors.newSingleThreadExecutor() /** * This method first cancels the [java.util.concurrent.Future] for any pending action so that the * [awaitWithDeadline] used below drops through without running the action. We then create a new diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 61c02bd312..c7cc2ce71f 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -15,7 +15,6 @@ import com.codahale.metrics.MetricRegistry import net.corda.core.crypto.toStringShort import net.corda.core.identity.CordaX500Name import net.corda.core.internal.ThreadBox -import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.SingleMessageRecipient @@ -352,8 +351,6 @@ class P2PMessagingClient(val config: NodeConfiguration, private val shutdownLatch = CountDownLatch(1) - var runningFuture = openFuture() - /** * Starts the p2p event loop: this method only returns once [stop] has been called. */ @@ -364,7 +361,6 @@ class P2PMessagingClient(val config: NodeConfiguration, check(started) { "start must be called first" } check(!running) { "run can't be called twice" } running = true - runningFuture.set(Unit) // If it's null, it means we already called stop, so return immediately. if (p2pConsumer == null) { return @@ -481,7 +477,6 @@ class P2PMessagingClient(val config: NodeConfiguration, check(started) val prevRunning = running running = false - runningFuture = openFuture() networkChangeSubscription?.unsubscribe() require(p2pConsumer != null, { "stop can't be called twice" }) require(producer != null, { "stop can't be called twice" }) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt index c5ea0acdc1..4bc667b8a3 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt @@ -71,10 +71,6 @@ class MultiThreadedStateMachineManager( private val unfinishedFibers: ReusableLatch = ReusableLatch(), private val classloader: ClassLoader = MultiThreadedStateMachineManager::class.java.classLoader ) : StateMachineManager, StateMachineManagerInternal { - override fun resume() { - TODO("not implemented") //To change body of created functions use File | Settings | File Templates. - } - companion object { private val logger = contextLogger() } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 752886fb8c..03f770809c 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -144,20 +144,6 @@ class SingleThreadedStateMachineManager( } } - override fun resume() { - fiberDeserializationChecker?.start(checkpointSerializationContext!!) - val fibers = restoreFlowsFromCheckpoints() - Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable -> - (fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable) - } - serviceHub.networkMapCache.nodeReady.then { - resumeRestoredFlows(fibers) - } - mutex.locked { - stopping = false - } - } - override fun > findStateMachines(flowClass: Class): List>> { return mutex.locked { flows.values.mapNotNull { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 51e12e57db..446e99d536 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -49,11 +49,6 @@ interface StateMachineManager { */ fun stop(allowedUnsuspendedFiberCount: Int) - /** - * Resume state machine manager after having called [stop]. - */ - fun resume() - /** * Starts a new flow. * diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt index 72f42be049..c7ded2c353 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/FiberDeserializationCheckingInterceptor.kt @@ -99,7 +99,6 @@ class FiberDeserializationChecker { fun stop(): Boolean { jobQueue.add(Job.Finish) checkerThread?.join() - checkerThread = null return foundUnrestorableFibers } }