mirror of
https://github.com/corda/corda.git
synced 2025-05-21 09:47:43 +00:00
Merge commit '10c559a3f3dc1cc8055e3204cd289468fbf3e644' into aslemmer-merge-10c559a3f3dc1cc8055e3204cd289468fbf3e644
This commit is contained in:
commit
f7e068b842
@ -398,7 +398,7 @@ class RPCStabilityTests {
|
|||||||
servers[response]!!.shutdown()
|
servers[response]!!.shutdown()
|
||||||
servers.remove(response)
|
servers.remove(response)
|
||||||
|
|
||||||
//failover will take some time
|
// Failover will take some time.
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
response = client.serverId()
|
response = client.serverId()
|
||||||
|
@ -10,9 +10,9 @@
|
|||||||
|
|
||||||
package net.corda.client.rpc
|
package net.corda.client.rpc
|
||||||
|
|
||||||
|
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
||||||
import net.corda.client.rpc.internal.KryoClientSerializationScheme
|
import net.corda.client.rpc.internal.KryoClientSerializationScheme
|
||||||
import net.corda.client.rpc.internal.RPCClient
|
import net.corda.client.rpc.internal.RPCClient
|
||||||
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
|
||||||
import net.corda.core.context.Actor
|
import net.corda.core.context.Actor
|
||||||
import net.corda.core.context.Trace
|
import net.corda.core.context.Trace
|
||||||
import net.corda.core.messaging.CordaRPCOps
|
import net.corda.core.messaging.CordaRPCOps
|
||||||
@ -103,7 +103,8 @@ interface CordaRPCClientConfiguration {
|
|||||||
* [CordaRPCClientConfiguration]. While attempting failover, current and future RPC calls will throw
|
* [CordaRPCClientConfiguration]. While attempting failover, current and future RPC calls will throw
|
||||||
* [RPCException] and previously returned observables will call onError().
|
* [RPCException] and previously returned observables will call onError().
|
||||||
*
|
*
|
||||||
* If the client was created using a list of hosts, automatic failover will occur(the servers have to be started in HA mode)
|
* If the client was created using a list of hosts, automatic failover will occur (the servers have to be started in
|
||||||
|
* HA mode).
|
||||||
*
|
*
|
||||||
* @param hostAndPort The network address to connect to.
|
* @param hostAndPort The network address to connect to.
|
||||||
* @param configuration An optional configuration used to tweak client behaviour.
|
* @param configuration An optional configuration used to tweak client behaviour.
|
||||||
|
@ -80,6 +80,12 @@ import kotlin.reflect.jvm.javaMethod
|
|||||||
* unsubscribing from the [Observable], or if the [Observable] is garbage collected the client will eventually
|
* unsubscribing from the [Observable], or if the [Observable] is garbage collected the client will eventually
|
||||||
* automatically signal the server. This is done using a cache that holds weak references to the [UnicastSubject]s.
|
* automatically signal the server. This is done using a cache that holds weak references to the [UnicastSubject]s.
|
||||||
* The cleanup happens in batches using a dedicated reaper, scheduled on [reaperExecutor].
|
* The cleanup happens in batches using a dedicated reaper, scheduled on [reaperExecutor].
|
||||||
|
*
|
||||||
|
* The client will attempt to failover in case the server become unreachable. Depending on the [ServerLocataor] instance
|
||||||
|
* passed in the constructor, failover is either handle at Artemis level or client level. If only one transport
|
||||||
|
* was used to create the [ServerLocator], failover is handled by Artemis (retrying based on [CordaRPCClientConfiguration].
|
||||||
|
* If a list of transport configurations was used, failover is handled locally. Artemis is able to do it, however the
|
||||||
|
* brokers on server side need to be configured in HA mode and the [ServerLocator] needs to be created with HA as well.
|
||||||
*/
|
*/
|
||||||
class RPCClientProxyHandler(
|
class RPCClientProxyHandler(
|
||||||
private val rpcConfiguration: CordaRPCClientConfiguration,
|
private val rpcConfiguration: CordaRPCClientConfiguration,
|
||||||
@ -185,7 +191,7 @@ class RPCClientProxyHandler(
|
|||||||
private val deduplicationSequenceNumber = AtomicLong(0)
|
private val deduplicationSequenceNumber = AtomicLong(0)
|
||||||
|
|
||||||
private val sendingEnabled = AtomicBoolean(true)
|
private val sendingEnabled = AtomicBoolean(true)
|
||||||
// used to interrupt failover thread (i.e. client is closed while failing over)
|
// Used to interrupt failover thread (i.e. client is closed while failing over).
|
||||||
private var haFailoverThread: Thread? = null
|
private var haFailoverThread: Thread? = null
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -427,7 +433,7 @@ class RPCClientProxyHandler(
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun attemptReconnect() {
|
private fun attemptReconnect() {
|
||||||
var reconnectAttempts = rpcConfiguration.maxReconnectAttempts * serverLocator.staticTransportConfigurations.size
|
var reconnectAttempts = rpcConfiguration.maxReconnectAttempts.times(serverLocator.staticTransportConfigurations.size)
|
||||||
var retryInterval = rpcConfiguration.connectionRetryInterval
|
var retryInterval = rpcConfiguration.connectionRetryInterval
|
||||||
val maxRetryInterval = rpcConfiguration.connectionMaxRetryInterval
|
val maxRetryInterval = rpcConfiguration.connectionMaxRetryInterval
|
||||||
|
|
||||||
@ -456,7 +462,7 @@ class RPCClientProxyHandler(
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
log.debug("Connected successfully using ${transport.params}")
|
log.debug("Connected successfully after $reconnectAttempts attempts using ${transport.params}.")
|
||||||
log.info("RPC server available.")
|
log.info("RPC server available.")
|
||||||
sessionFactory!!.addFailoverListener(this::haFailoverHandler)
|
sessionFactory!!.addFailoverListener(this::haFailoverHandler)
|
||||||
initSessions()
|
initSessions()
|
||||||
@ -495,7 +501,7 @@ class RPCClientProxyHandler(
|
|||||||
haFailoverThread = Thread.currentThread()
|
haFailoverThread = Thread.currentThread()
|
||||||
attemptReconnect()
|
attemptReconnect()
|
||||||
}
|
}
|
||||||
/* Other events are not considered as reconnection is not done by Artemis */
|
// Other events are not considered as reconnection is not done by Artemis.
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun failoverHandler(event: FailoverEventType) {
|
private fun failoverHandler(event: FailoverEventType) {
|
||||||
|
@ -188,29 +188,6 @@ class NodeSchedulerService(private val clock: CordaClock,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Stop scheduler service.
|
|
||||||
*/
|
|
||||||
fun stop() {
|
|
||||||
mutex.locked {
|
|
||||||
schedulerTimerExecutor.shutdown()
|
|
||||||
scheduledStatesQueue.clear()
|
|
||||||
scheduledStates.clear()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Resume scheduler service after having called [stop].
|
|
||||||
*/
|
|
||||||
fun resume() {
|
|
||||||
mutex.locked {
|
|
||||||
schedulerTimerExecutor = Executors.newSingleThreadExecutor()
|
|
||||||
scheduledStates.putAll(createMap())
|
|
||||||
scheduledStatesQueue.addAll(scheduledStates.values)
|
|
||||||
rescheduleWakeUp()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun scheduleStateActivity(action: ScheduledStateRef) {
|
override fun scheduleStateActivity(action: ScheduledStateRef) {
|
||||||
log.trace { "Schedule $action" }
|
log.trace { "Schedule $action" }
|
||||||
val previousState = scheduledStates[action.ref]
|
val previousState = scheduledStates[action.ref]
|
||||||
@ -250,7 +227,7 @@ class NodeSchedulerService(private val clock: CordaClock,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private var schedulerTimerExecutor = Executors.newSingleThreadExecutor()
|
private val schedulerTimerExecutor = Executors.newSingleThreadExecutor()
|
||||||
/**
|
/**
|
||||||
* This method first cancels the [java.util.concurrent.Future] for any pending action so that the
|
* This method first cancels the [java.util.concurrent.Future] for any pending action so that the
|
||||||
* [awaitWithDeadline] used below drops through without running the action. We then create a new
|
* [awaitWithDeadline] used below drops through without running the action. We then create a new
|
||||||
|
@ -15,7 +15,6 @@ import com.codahale.metrics.MetricRegistry
|
|||||||
import net.corda.core.crypto.toStringShort
|
import net.corda.core.crypto.toStringShort
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.internal.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.internal.concurrent.openFuture
|
|
||||||
import net.corda.core.messaging.CordaRPCOps
|
import net.corda.core.messaging.CordaRPCOps
|
||||||
import net.corda.core.messaging.MessageRecipients
|
import net.corda.core.messaging.MessageRecipients
|
||||||
import net.corda.core.messaging.SingleMessageRecipient
|
import net.corda.core.messaging.SingleMessageRecipient
|
||||||
@ -352,8 +351,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
|||||||
|
|
||||||
private val shutdownLatch = CountDownLatch(1)
|
private val shutdownLatch = CountDownLatch(1)
|
||||||
|
|
||||||
var runningFuture = openFuture<Unit>()
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Starts the p2p event loop: this method only returns once [stop] has been called.
|
* Starts the p2p event loop: this method only returns once [stop] has been called.
|
||||||
*/
|
*/
|
||||||
@ -364,7 +361,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
|||||||
check(started) { "start must be called first" }
|
check(started) { "start must be called first" }
|
||||||
check(!running) { "run can't be called twice" }
|
check(!running) { "run can't be called twice" }
|
||||||
running = true
|
running = true
|
||||||
runningFuture.set(Unit)
|
|
||||||
// If it's null, it means we already called stop, so return immediately.
|
// If it's null, it means we already called stop, so return immediately.
|
||||||
if (p2pConsumer == null) {
|
if (p2pConsumer == null) {
|
||||||
return
|
return
|
||||||
@ -481,7 +477,6 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
|||||||
check(started)
|
check(started)
|
||||||
val prevRunning = running
|
val prevRunning = running
|
||||||
running = false
|
running = false
|
||||||
runningFuture = openFuture()
|
|
||||||
networkChangeSubscription?.unsubscribe()
|
networkChangeSubscription?.unsubscribe()
|
||||||
require(p2pConsumer != null, { "stop can't be called twice" })
|
require(p2pConsumer != null, { "stop can't be called twice" })
|
||||||
require(producer != null, { "stop can't be called twice" })
|
require(producer != null, { "stop can't be called twice" })
|
||||||
|
@ -71,10 +71,6 @@ class MultiThreadedStateMachineManager(
|
|||||||
private val unfinishedFibers: ReusableLatch = ReusableLatch(),
|
private val unfinishedFibers: ReusableLatch = ReusableLatch(),
|
||||||
private val classloader: ClassLoader = MultiThreadedStateMachineManager::class.java.classLoader
|
private val classloader: ClassLoader = MultiThreadedStateMachineManager::class.java.classLoader
|
||||||
) : StateMachineManager, StateMachineManagerInternal {
|
) : StateMachineManager, StateMachineManagerInternal {
|
||||||
override fun resume() {
|
|
||||||
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
|
|
||||||
}
|
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
private val logger = contextLogger()
|
private val logger = contextLogger()
|
||||||
}
|
}
|
||||||
|
@ -144,20 +144,6 @@ class SingleThreadedStateMachineManager(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun resume() {
|
|
||||||
fiberDeserializationChecker?.start(checkpointSerializationContext!!)
|
|
||||||
val fibers = restoreFlowsFromCheckpoints()
|
|
||||||
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
|
|
||||||
(fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable)
|
|
||||||
}
|
|
||||||
serviceHub.networkMapCache.nodeReady.then {
|
|
||||||
resumeRestoredFlows(fibers)
|
|
||||||
}
|
|
||||||
mutex.locked {
|
|
||||||
stopping = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun <A : FlowLogic<*>> findStateMachines(flowClass: Class<A>): List<Pair<A, CordaFuture<*>>> {
|
override fun <A : FlowLogic<*>> findStateMachines(flowClass: Class<A>): List<Pair<A, CordaFuture<*>>> {
|
||||||
return mutex.locked {
|
return mutex.locked {
|
||||||
flows.values.mapNotNull {
|
flows.values.mapNotNull {
|
||||||
|
@ -49,11 +49,6 @@ interface StateMachineManager {
|
|||||||
*/
|
*/
|
||||||
fun stop(allowedUnsuspendedFiberCount: Int)
|
fun stop(allowedUnsuspendedFiberCount: Int)
|
||||||
|
|
||||||
/**
|
|
||||||
* Resume state machine manager after having called [stop].
|
|
||||||
*/
|
|
||||||
fun resume()
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Starts a new flow.
|
* Starts a new flow.
|
||||||
*
|
*
|
||||||
|
@ -99,7 +99,6 @@ class FiberDeserializationChecker {
|
|||||||
fun stop(): Boolean {
|
fun stop(): Boolean {
|
||||||
jobQueue.add(Job.Finish)
|
jobQueue.add(Job.Finish)
|
||||||
checkerThread?.join()
|
checkerThread?.join()
|
||||||
checkerThread = null
|
|
||||||
return foundUnrestorableFibers
|
return foundUnrestorableFibers
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user