mirror of
https://github.com/corda/corda.git
synced 2024-12-21 22:07:55 +00:00
CORDA-2304 Never give up on notarisations (#4420)
* Make TimedFlows retry forever, and cap the growth of the time out interval. * Only time flows for restart if that is sensible (i.e. notary flows that actually have an alternative node to talk to). * Move check for multi node notary into getter so it the `canBeRestarted` attribute can't be set too late. * Make restartable timed flow a concept on SubFlow metadata and the relevant events so we can handle it properly for subflows based on their metadata. * Addressing review comments. * Consistent naming * Update documentation * Addressing documentation comments.
This commit is contained in:
parent
e70670368c
commit
e8a467cab8
@ -48,6 +48,12 @@ class NotaryFlow {
|
|||||||
fun tracker() = ProgressTracker(REQUESTING, VALIDATING)
|
fun tracker() = ProgressTracker(REQUESTING, VALIDATING)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override val isTimeoutEnabled: Boolean
|
||||||
|
get() {
|
||||||
|
val notaryParty = stx.notary ?: throw IllegalStateException("Transaction does not specify a Notary")
|
||||||
|
return serviceHub.networkMapCache.getNodesByLegalIdentityKey(notaryParty.owningKey).size > 1
|
||||||
|
}
|
||||||
|
|
||||||
@Suspendable
|
@Suspendable
|
||||||
@Throws(NotaryException::class)
|
@Throws(NotaryException::class)
|
||||||
override fun call(): List<TransactionSignature> {
|
override fun call(): List<TransactionSignature> {
|
||||||
|
@ -20,4 +20,6 @@ interface IdempotentFlow
|
|||||||
* persisted. Otherwise, it wouldn't be possible to correctly reset the [TimedFlow].
|
* persisted. Otherwise, it wouldn't be possible to correctly reset the [TimedFlow].
|
||||||
*/
|
*/
|
||||||
// TODO: allow specifying retry settings per flow
|
// TODO: allow specifying retry settings per flow
|
||||||
interface TimedFlow : IdempotentFlow
|
interface TimedFlow : IdempotentFlow {
|
||||||
|
val isTimeoutEnabled: Boolean
|
||||||
|
}
|
@ -7,6 +7,11 @@ release, see :doc:`upgrade-notes`.
|
|||||||
Unreleased
|
Unreleased
|
||||||
----------
|
----------
|
||||||
|
|
||||||
|
* TimedFlows (only used by the notary client flow) will never give up trying to reach the notary, as this would leave the states
|
||||||
|
in the notarisation request in an undefined state (unknown whether the spend has been notarised, i.e. has happened, or not). Also,
|
||||||
|
retries have been disabled for single node notaries since in this case they offer no potential benefits, unlike for a notary cluster with
|
||||||
|
several members who might have different availability.
|
||||||
|
|
||||||
* New configuration property `database.initialiseAppSchema` with values `UPDATE`, `VALIDATE` and `NONE`.
|
* New configuration property `database.initialiseAppSchema` with values `UPDATE`, `VALIDATE` and `NONE`.
|
||||||
The property controls the behavior of the Hibernate DDL generation. `UPDATE` performs an update of CorDapp schemas, while
|
The property controls the behavior of the Hibernate DDL generation. `UPDATE` performs an update of CorDapp schemas, while
|
||||||
`VALID` only verifies their integrity. The property does not affect the node-specific DDL handling and
|
`VALID` only verifies their integrity. The property does not affect the node-specific DDL handling and
|
||||||
|
@ -117,13 +117,15 @@ The available config fields are listed below.
|
|||||||
:additionalP2PAddresses: An array of additional host:port values, which will be included in the advertised NodeInfo in the network map in addition to the ``p2pAddress``.
|
:additionalP2PAddresses: An array of additional host:port values, which will be included in the advertised NodeInfo in the network map in addition to the ``p2pAddress``.
|
||||||
Nodes can use this configuration option to advertise HA endpoints and aliases to external parties. If not specified the default value is an empty list.
|
Nodes can use this configuration option to advertise HA endpoints and aliases to external parties. If not specified the default value is an empty list.
|
||||||
|
|
||||||
:flowTimeout: When a flow implementing the ``TimedFlow`` interface does not complete in time, it is restarted from the
|
:flowTimeout: When a flow implementing the ``TimedFlow`` interface and setting the ``isTimeoutEnabled`` flag does not complete within a
|
||||||
initial checkpoint. Currently only used for notarisation requests: if a notary replica dies while processing a notarisation request,
|
defined elapsed time, it is restarted from the initial checkpoint. Currently only used for notarisation requests with clustered
|
||||||
the client flow eventually times out and gets restarted. On restart the request is resent to a different notary replica
|
notaries: if a notary cluster member dies while processing a notarisation request, the client flow eventually times out and gets
|
||||||
in a round-robin fashion (assuming the notary is clustered).
|
restarted. On restart the request is resent to a different notary cluster member in a round-robin fashion. Note that the flow will
|
||||||
|
keep retrying forever.
|
||||||
|
|
||||||
:timeout: The initial flow timeout period, e.g. `30 seconds`.
|
:timeout: The initial flow timeout period, e.g. `30 seconds`.
|
||||||
:maxRestartCount: Maximum number of times the flow will restart before resulting in an error.
|
:maxRestartCount: The number of retries the back-off time keeps growing for. For subsequent retries, the timeout value will remain
|
||||||
|
constant.
|
||||||
:backoffBase: The base of the exponential backoff, `t_{wait} = timeout * backoffBase^{retryCount}`.
|
:backoffBase: The base of the exponential backoff, `t_{wait} = timeout * backoffBase^{retryCount}`.
|
||||||
|
|
||||||
:rpcAddress: (Deprecated) The address of the RPC system on which RPC requests can be made to the node. If not provided then the node will run without RPC. This is now deprecated in favour of the ``rpcSettings`` block.
|
:rpcAddress: (Deprecated) The address of the RPC system on which RPC requests can be made to the node. If not provided then the node will run without RPC. This is now deprecated in favour of the ``rpcSettings`` block.
|
||||||
|
@ -80,7 +80,7 @@ sealed class Event {
|
|||||||
*
|
*
|
||||||
* @param subFlowClass the [Class] of the subflow, to be used to determine whether it's Initiating or inlined.
|
* @param subFlowClass the [Class] of the subflow, to be used to determine whether it's Initiating or inlined.
|
||||||
*/
|
*/
|
||||||
data class EnterSubFlow(val subFlowClass: Class<FlowLogic<*>>, val subFlowVersion: SubFlowVersion ) : Event()
|
data class EnterSubFlow(val subFlowClass: Class<FlowLogic<*>>, val subFlowVersion: SubFlowVersion, val isEnabledTimedFlow: Boolean) : Event()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Signal the leaving of a subflow.
|
* Signal the leaving of a subflow.
|
||||||
|
@ -24,6 +24,7 @@ import net.corda.node.services.api.ServiceHubInternal
|
|||||||
import net.corda.node.services.logging.pushToLoggingContext
|
import net.corda.node.services.logging.pushToLoggingContext
|
||||||
import net.corda.node.services.statemachine.transitions.FlowContinuation
|
import net.corda.node.services.statemachine.transitions.FlowContinuation
|
||||||
import net.corda.node.services.statemachine.transitions.StateMachine
|
import net.corda.node.services.statemachine.transitions.StateMachine
|
||||||
|
import net.corda.node.utilities.isEnabledTimedFlow
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
|
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
|
||||||
import net.corda.nodeapi.internal.persistence.contextTransaction
|
import net.corda.nodeapi.internal.persistence.contextTransaction
|
||||||
@ -273,7 +274,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
Event.EnterSubFlow(subFlow.javaClass,
|
Event.EnterSubFlow(subFlow.javaClass,
|
||||||
createSubFlowVersion(
|
createSubFlowVersion(
|
||||||
serviceHub.cordappProvider.getCordappForFlow(subFlow), serviceHub.myInfo.platformVersion
|
serviceHub.cordappProvider.getCordappForFlow(subFlow), serviceHub.myInfo.platformVersion
|
||||||
)
|
),
|
||||||
|
subFlow.isEnabledTimedFlow()
|
||||||
),
|
),
|
||||||
isDbTransactionOpenOnEntry = true,
|
isDbTransactionOpenOnEntry = true,
|
||||||
isDbTransactionOpenOnExit = true
|
isDbTransactionOpenOnExit = true
|
||||||
|
@ -6,4 +6,4 @@ import net.corda.core.CordaException
|
|||||||
* This exception is fired once the retry timeout of a [TimedFlow] expires.
|
* This exception is fired once the retry timeout of a [TimedFlow] expires.
|
||||||
* It will indicate to the flow hospital to restart the flow.
|
* It will indicate to the flow hospital to restart the flow.
|
||||||
*/
|
*/
|
||||||
data class FlowTimeoutException(val maxRetries: Int) : CordaException("replaying flow from the last checkpoint")
|
class FlowTimeoutException : CordaException("replaying flow from the last checkpoint")
|
@ -15,7 +15,6 @@ import net.corda.core.flows.StateMachineRunId
|
|||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
import net.corda.core.internal.FlowStateMachine
|
import net.corda.core.internal.FlowStateMachine
|
||||||
import net.corda.core.internal.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.internal.TimedFlow
|
|
||||||
import net.corda.core.internal.bufferUntilSubscribed
|
import net.corda.core.internal.bufferUntilSubscribed
|
||||||
import net.corda.core.internal.castIfPossible
|
import net.corda.core.internal.castIfPossible
|
||||||
import net.corda.core.internal.concurrent.OpenFuture
|
import net.corda.core.internal.concurrent.OpenFuture
|
||||||
@ -46,6 +45,7 @@ import net.corda.node.services.statemachine.interceptors.PrintingInterceptor
|
|||||||
import net.corda.node.services.statemachine.transitions.StateMachine
|
import net.corda.node.services.statemachine.transitions.StateMachine
|
||||||
import net.corda.node.utilities.AffinityExecutor
|
import net.corda.node.utilities.AffinityExecutor
|
||||||
import net.corda.node.utilities.injectOldProgressTracker
|
import net.corda.node.utilities.injectOldProgressTracker
|
||||||
|
import net.corda.node.utilities.isEnabledTimedFlow
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
|
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
|
||||||
import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
|
import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
|
||||||
@ -54,6 +54,7 @@ import org.apache.activemq.artemis.utils.ReusableLatch
|
|||||||
import org.apache.logging.log4j.LogManager
|
import org.apache.logging.log4j.LogManager
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
|
import java.lang.Integer.min
|
||||||
import java.security.SecureRandom
|
import java.security.SecureRandom
|
||||||
import java.util.HashSet
|
import java.util.HashSet
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
@ -544,7 +545,15 @@ class SingleThreadedStateMachineManager(
|
|||||||
|
|
||||||
val flowCorDappVersion = createSubFlowVersion(serviceHub.cordappProvider.getCordappForFlow(flowLogic), serviceHub.myInfo.platformVersion)
|
val flowCorDappVersion = createSubFlowVersion(serviceHub.cordappProvider.getCordappForFlow(flowLogic), serviceHub.myInfo.platformVersion)
|
||||||
|
|
||||||
val initialCheckpoint = Checkpoint.create(invocationContext, flowStart, flowLogic.javaClass, frozenFlowLogic, ourIdentity, flowCorDappVersion).getOrThrow()
|
val initialCheckpoint = Checkpoint.create(
|
||||||
|
invocationContext,
|
||||||
|
flowStart,
|
||||||
|
flowLogic.javaClass,
|
||||||
|
frozenFlowLogic,
|
||||||
|
ourIdentity,
|
||||||
|
flowCorDappVersion,
|
||||||
|
flowLogic.isEnabledTimedFlow()
|
||||||
|
).getOrThrow()
|
||||||
val startedFuture = openFuture<Unit>()
|
val startedFuture = openFuture<Unit>()
|
||||||
val initialState = StateMachineState(
|
val initialState = StateMachineState(
|
||||||
checkpoint = initialCheckpoint,
|
checkpoint = initialCheckpoint,
|
||||||
@ -627,7 +636,7 @@ class SingleThreadedStateMachineManager(
|
|||||||
private fun scheduleTimeoutException(flow: Flow, delay: Long): ScheduledFuture<*> {
|
private fun scheduleTimeoutException(flow: Flow, delay: Long): ScheduledFuture<*> {
|
||||||
return with(serviceHub.configuration.flowTimeout) {
|
return with(serviceHub.configuration.flowTimeout) {
|
||||||
timeoutScheduler.schedule({
|
timeoutScheduler.schedule({
|
||||||
val event = Event.Error(FlowTimeoutException(maxRestartCount))
|
val event = Event.Error(FlowTimeoutException())
|
||||||
flow.fiber.scheduleEvent(event)
|
flow.fiber.scheduleEvent(event)
|
||||||
}, delay, TimeUnit.SECONDS)
|
}, delay, TimeUnit.SECONDS)
|
||||||
}
|
}
|
||||||
@ -635,7 +644,7 @@ class SingleThreadedStateMachineManager(
|
|||||||
|
|
||||||
private fun calculateDefaultTimeoutSeconds(retryCount: Int): Long {
|
private fun calculateDefaultTimeoutSeconds(retryCount: Int): Long {
|
||||||
return with(serviceHub.configuration.flowTimeout) {
|
return with(serviceHub.configuration.flowTimeout) {
|
||||||
val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong()
|
val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, min(retryCount, maxRestartCount).toDouble()).toLong()
|
||||||
maxOf(1L, ((1.0 + Math.random()) * timeoutDelaySeconds / 2).toLong())
|
maxOf(1L, ((1.0 + Math.random()) * timeoutDelaySeconds / 2).toLong())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -761,7 +770,7 @@ class SingleThreadedStateMachineManager(
|
|||||||
oldFlow.resultFuture.captureLater(flow.resultFuture)
|
oldFlow.resultFuture.captureLater(flow.resultFuture)
|
||||||
}
|
}
|
||||||
val flowLogic = flow.fiber.logic
|
val flowLogic = flow.fiber.logic
|
||||||
if (flowLogic is TimedFlow) scheduleTimeout(id)
|
if (flowLogic.isEnabledTimedFlow()) scheduleTimeout(id)
|
||||||
flow.fiber.scheduleEvent(Event.DoRemainingWork)
|
flow.fiber.scheduleEvent(Event.DoRemainingWork)
|
||||||
when (checkpoint.flowState) {
|
when (checkpoint.flowState) {
|
||||||
is FlowState.Unstarted -> {
|
is FlowState.Unstarted -> {
|
||||||
|
@ -292,15 +292,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
|
|||||||
object DoctorTimeout : Staff {
|
object DoctorTimeout : Staff {
|
||||||
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
|
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
|
||||||
if (newError is FlowTimeoutException) {
|
if (newError is FlowTimeoutException) {
|
||||||
if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this, currentState)) {
|
return Diagnosis.DISCHARGE
|
||||||
return Diagnosis.DISCHARGE
|
|
||||||
} else {
|
|
||||||
val errorMsg = "Maximum number of retries reached for flow ${flowFiber.snapshot().flowLogic.javaClass}. " +
|
|
||||||
"If the flow involves notarising a transaction, it means that no response was received from the notary." +
|
|
||||||
"This could be either due to the the notary being overloaded or unable to reach this node."
|
|
||||||
newError.setMessage(errorMsg)
|
|
||||||
log.warn(errorMsg)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return Diagnosis.NOT_MY_SPECIALTY
|
return Diagnosis.NOT_MY_SPECIALTY
|
||||||
}
|
}
|
||||||
|
@ -69,9 +69,10 @@ data class Checkpoint(
|
|||||||
flowLogicClass: Class<FlowLogic<*>>,
|
flowLogicClass: Class<FlowLogic<*>>,
|
||||||
frozenFlowLogic: SerializedBytes<FlowLogic<*>>,
|
frozenFlowLogic: SerializedBytes<FlowLogic<*>>,
|
||||||
ourIdentity: Party,
|
ourIdentity: Party,
|
||||||
subFlowVersion: SubFlowVersion
|
subFlowVersion: SubFlowVersion,
|
||||||
|
isEnabledTimedFlow: Boolean
|
||||||
): Try<Checkpoint> {
|
): Try<Checkpoint> {
|
||||||
return SubFlow.create(flowLogicClass, subFlowVersion).map { topLevelSubFlow ->
|
return SubFlow.create(flowLogicClass, subFlowVersion, isEnabledTimedFlow).map { topLevelSubFlow ->
|
||||||
Checkpoint(
|
Checkpoint(
|
||||||
invocationContext = invocationContext,
|
invocationContext = invocationContext,
|
||||||
ourIdentity = ourIdentity,
|
ourIdentity = ourIdentity,
|
||||||
|
@ -16,10 +16,12 @@ sealed class SubFlow {
|
|||||||
// Version of the code.
|
// Version of the code.
|
||||||
abstract val subFlowVersion: SubFlowVersion
|
abstract val subFlowVersion: SubFlowVersion
|
||||||
|
|
||||||
|
abstract val isEnabledTimedFlow: Boolean
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An inlined subflow.
|
* An inlined subflow.
|
||||||
*/
|
*/
|
||||||
data class Inlined(override val flowClass: Class<FlowLogic<*>>, override val subFlowVersion: SubFlowVersion) : SubFlow()
|
data class Inlined(override val flowClass: Class<FlowLogic<*>>, override val subFlowVersion: SubFlowVersion, override val isEnabledTimedFlow: Boolean) : SubFlow()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An initiating subflow.
|
* An initiating subflow.
|
||||||
@ -32,21 +34,22 @@ sealed class SubFlow {
|
|||||||
override val flowClass: Class<FlowLogic<*>>,
|
override val flowClass: Class<FlowLogic<*>>,
|
||||||
val classToInitiateWith: Class<in FlowLogic<*>>,
|
val classToInitiateWith: Class<in FlowLogic<*>>,
|
||||||
val flowInfo: FlowInfo,
|
val flowInfo: FlowInfo,
|
||||||
override val subFlowVersion: SubFlowVersion
|
override val subFlowVersion: SubFlowVersion,
|
||||||
|
override val isEnabledTimedFlow: Boolean
|
||||||
) : SubFlow()
|
) : SubFlow()
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
fun create(flowClass: Class<FlowLogic<*>>, subFlowVersion: SubFlowVersion): Try<SubFlow> {
|
fun create(flowClass: Class<FlowLogic<*>>, subFlowVersion: SubFlowVersion, isEnabledTimedFlow: Boolean): Try<SubFlow> {
|
||||||
// Are we an InitiatingFlow?
|
// Are we an InitiatingFlow?
|
||||||
val initiatingAnnotations = getInitiatingFlowAnnotations(flowClass)
|
val initiatingAnnotations = getInitiatingFlowAnnotations(flowClass)
|
||||||
return when (initiatingAnnotations.size) {
|
return when (initiatingAnnotations.size) {
|
||||||
0 -> {
|
0 -> {
|
||||||
Try.Success(Inlined(flowClass, subFlowVersion))
|
Try.Success(Inlined(flowClass, subFlowVersion, isEnabledTimedFlow))
|
||||||
}
|
}
|
||||||
1 -> {
|
1 -> {
|
||||||
val initiatingAnnotation = initiatingAnnotations[0]
|
val initiatingAnnotation = initiatingAnnotations[0]
|
||||||
val flowContext = FlowInfo(initiatingAnnotation.second.version, flowClass.appName)
|
val flowContext = FlowInfo(initiatingAnnotation.second.version, flowClass.appName)
|
||||||
Try.Success(Initiating(flowClass, initiatingAnnotation.first, flowContext, subFlowVersion))
|
Try.Success(Initiating(flowClass, initiatingAnnotation.first, flowContext, subFlowVersion, isEnabledTimedFlow))
|
||||||
}
|
}
|
||||||
else -> {
|
else -> {
|
||||||
Try.Failure(IllegalArgumentException("${InitiatingFlow::class.java.name} can only be annotated " +
|
Try.Failure(IllegalArgumentException("${InitiatingFlow::class.java.name} can only be annotated " +
|
||||||
|
@ -2,7 +2,6 @@ package net.corda.node.services.statemachine.transitions
|
|||||||
|
|
||||||
import net.corda.core.flows.InitiatingFlow
|
import net.corda.core.flows.InitiatingFlow
|
||||||
import net.corda.core.internal.FlowIORequest
|
import net.corda.core.internal.FlowIORequest
|
||||||
import net.corda.core.internal.TimedFlow
|
|
||||||
import net.corda.core.utilities.Try
|
import net.corda.core.utilities.Try
|
||||||
import net.corda.node.services.statemachine.*
|
import net.corda.node.services.statemachine.*
|
||||||
|
|
||||||
@ -93,13 +92,10 @@ class TopLevelTransition(
|
|||||||
|
|
||||||
private fun enterSubFlowTransition(event: Event.EnterSubFlow): TransitionResult {
|
private fun enterSubFlowTransition(event: Event.EnterSubFlow): TransitionResult {
|
||||||
return builder {
|
return builder {
|
||||||
val subFlow = SubFlow.create(event.subFlowClass, event.subFlowVersion)
|
val subFlow = SubFlow.create(event.subFlowClass, event.subFlowVersion, event.isEnabledTimedFlow)
|
||||||
when (subFlow) {
|
when (subFlow) {
|
||||||
is Try.Success -> {
|
is Try.Success -> {
|
||||||
val containsTimedSubFlows = currentState.checkpoint.subFlowStack.any {
|
val containsTimedSubflow = containsTimedFlows(currentState.checkpoint.subFlowStack)
|
||||||
TimedFlow::class.java.isAssignableFrom(it.flowClass)
|
|
||||||
}
|
|
||||||
val isCurrentSubFlowTimed = TimedFlow::class.java.isAssignableFrom(event.subFlowClass)
|
|
||||||
currentState = currentState.copy(
|
currentState = currentState.copy(
|
||||||
checkpoint = currentState.checkpoint.copy(
|
checkpoint = currentState.checkpoint.copy(
|
||||||
subFlowStack = currentState.checkpoint.subFlowStack + subFlow.value
|
subFlowStack = currentState.checkpoint.subFlowStack + subFlow.value
|
||||||
@ -107,7 +103,7 @@ class TopLevelTransition(
|
|||||||
)
|
)
|
||||||
// We don't schedule a timeout if there already is a timed subflow on the stack - a timeout had
|
// We don't schedule a timeout if there already is a timed subflow on the stack - a timeout had
|
||||||
// been scheduled already.
|
// been scheduled already.
|
||||||
if (isCurrentSubFlowTimed && !containsTimedSubFlows) {
|
if (event.isEnabledTimedFlow && !containsTimedSubflow) {
|
||||||
actions.add(Action.ScheduleFlowTimeout(currentState.flowLogic.runId))
|
actions.add(Action.ScheduleFlowTimeout(currentState.flowLogic.runId))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -125,8 +121,7 @@ class TopLevelTransition(
|
|||||||
if (checkpoint.subFlowStack.isEmpty()) {
|
if (checkpoint.subFlowStack.isEmpty()) {
|
||||||
freshErrorTransition(UnexpectedEventInState())
|
freshErrorTransition(UnexpectedEventInState())
|
||||||
} else {
|
} else {
|
||||||
val lastSubFlowClass = checkpoint.subFlowStack.last().flowClass
|
val isLastSubFlowTimed = checkpoint.subFlowStack.last().isEnabledTimedFlow
|
||||||
val isLastSubFlowTimed = TimedFlow::class.java.isAssignableFrom(lastSubFlowClass)
|
|
||||||
val newSubFlowStack = checkpoint.subFlowStack.dropLast(1)
|
val newSubFlowStack = checkpoint.subFlowStack.dropLast(1)
|
||||||
currentState = currentState.copy(
|
currentState = currentState.copy(
|
||||||
checkpoint = checkpoint.copy(
|
checkpoint = checkpoint.copy(
|
||||||
@ -142,7 +137,7 @@ class TopLevelTransition(
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun containsTimedFlows(subFlowStack: List<SubFlow>): Boolean {
|
private fun containsTimedFlows(subFlowStack: List<SubFlow>): Boolean {
|
||||||
return subFlowStack.any { TimedFlow::class.java.isAssignableFrom(it.flowClass) }
|
return subFlowStack.any { it.isEnabledTimedFlow }
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun suspendTransition(event: Event.Suspend): TransitionResult {
|
private fun suspendTransition(event: Event.Suspend): TransitionResult {
|
||||||
|
@ -0,0 +1,12 @@
|
|||||||
|
package net.corda.node.utilities
|
||||||
|
|
||||||
|
import net.corda.core.flows.FlowLogic
|
||||||
|
import net.corda.core.internal.TimedFlow
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if a flow logic is a [TimedFlow] and if yes whether it actually can be restarted. Only flows that match both criteria should time
|
||||||
|
* out and get restarted.
|
||||||
|
*/
|
||||||
|
internal fun FlowLogic<*>.isEnabledTimedFlow(): Boolean {
|
||||||
|
return (this as? TimedFlow)?.isTimeoutEnabled ?: false
|
||||||
|
}
|
@ -190,7 +190,8 @@ class DBCheckpointStorageTests {
|
|||||||
override fun call() {}
|
override fun call() {}
|
||||||
}
|
}
|
||||||
val frozenLogic = logic.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
|
val frozenLogic = logic.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
|
||||||
val checkpoint = Checkpoint.create(InvocationContext.shell(), FlowStart.Explicit, logic.javaClass, frozenLogic, ALICE, SubFlowVersion.CoreFlow(version)).getOrThrow()
|
val checkpoint = Checkpoint.create(InvocationContext.shell(), FlowStart.Explicit, logic.javaClass, frozenLogic, ALICE, SubFlowVersion.CoreFlow(version), false)
|
||||||
|
.getOrThrow()
|
||||||
return id to checkpoint.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
|
return id to checkpoint.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,6 +66,8 @@ class IdempotentFlowTests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private class TimedSubflow : FlowLogic<Unit>(), TimedFlow {
|
private class TimedSubflow : FlowLogic<Unit>(), TimedFlow {
|
||||||
|
override val isTimeoutEnabled: Boolean = true
|
||||||
|
|
||||||
@Suspendable
|
@Suspendable
|
||||||
override fun call() {
|
override fun call() {
|
||||||
subFlowExecutionCounter.incrementAndGet() // No checkpoint should be taken before invoking IdempotentSubFlow,
|
subFlowExecutionCounter.incrementAndGet() // No checkpoint should be taken before invoking IdempotentSubFlow,
|
||||||
|
Loading…
Reference in New Issue
Block a user