mirror of
https://github.com/corda/corda.git
synced 2025-06-17 14:48:16 +00:00
CORDA-1645: Checkpoint before calling an idempotent sub-flow. (#3399)
* CORDA-1645: Checkpoint before calling an idempotent sub-flow. When an idempotent sub-flow causes a flow restart, the flow will be replayed from the last checkpoint before the sub-flow invocation. However, the logic between the last checkpoint and the sub-flow invocation may contain side-effects which shouldn't be replayed. Thus we need to persist a checkpoint just before an idempotent sub-flow is invoked.
This commit is contained in:
@ -85,4 +85,11 @@ sealed class FlowIORequest<out R : Any> {
|
|||||||
* Execute the specified [operation], suspend the flow until completion.
|
* Execute the specified [operation], suspend the flow until completion.
|
||||||
*/
|
*/
|
||||||
data class ExecuteAsyncOperation<T : Any>(val operation: FlowAsyncOperation<T>) : FlowIORequest<T>()
|
data class ExecuteAsyncOperation<T : Any>(val operation: FlowAsyncOperation<T>) : FlowIORequest<T>()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Indicates that no actual IO request occurred, and the flow should be resumed immediately.
|
||||||
|
* This is used for performing explicit checkpointing anywhere in a flow.
|
||||||
|
*/
|
||||||
|
// TODO: consider using an empty FlowAsyncOperation instead
|
||||||
|
object ForceCheckpoint : FlowIORequest<Unit>()
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,7 @@ import net.corda.core.cordapp.Cordapp
|
|||||||
import net.corda.core.cordapp.CordappConfig
|
import net.corda.core.cordapp.CordappConfig
|
||||||
import net.corda.core.cordapp.CordappContext
|
import net.corda.core.cordapp.CordappContext
|
||||||
import net.corda.core.crypto.*
|
import net.corda.core.crypto.*
|
||||||
|
import net.corda.core.flows.FlowLogic
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.node.ServicesForResolution
|
import net.corda.core.node.ServicesForResolution
|
||||||
import net.corda.core.serialization.*
|
import net.corda.core.serialization.*
|
||||||
@ -515,6 +516,11 @@ fun createCordappContext(cordapp: Cordapp, attachmentId: SecureHash?, classLoade
|
|||||||
|
|
||||||
val PublicKey.hash: SecureHash get() = encoded.sha256()
|
val PublicKey.hash: SecureHash get() = encoded.sha256()
|
||||||
|
|
||||||
|
/** Checks if this flow is an idempotent flow. */
|
||||||
|
fun Class<out FlowLogic<*>>.isIdempotentFlow(): Boolean {
|
||||||
|
return IdempotentFlow::class.java.isAssignableFrom(this)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extension method for providing a sumBy method that processes and returns a Long
|
* Extension method for providing a sumBy method that processes and returns a Long
|
||||||
*/
|
*/
|
||||||
|
@ -254,6 +254,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
|
|
||||||
@Suspendable
|
@Suspendable
|
||||||
override fun <R> subFlow(subFlow: FlowLogic<R>): R {
|
override fun <R> subFlow(subFlow: FlowLogic<R>): R {
|
||||||
|
checkpointIfSubflowIdempotent(subFlow.javaClass)
|
||||||
processEventImmediately(
|
processEventImmediately(
|
||||||
Event.EnterSubFlow(subFlow.javaClass,
|
Event.EnterSubFlow(subFlow.javaClass,
|
||||||
createSubFlowVersion(
|
createSubFlowVersion(
|
||||||
@ -274,6 +275,21 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the sub-flow is [IdempotentFlow] we need to perform a checkpoint to make sure any potentially side-effect
|
||||||
|
* generating logic between the last checkpoint and the sub-flow invocation does not get replayed if the
|
||||||
|
* flow restarts.
|
||||||
|
*
|
||||||
|
* We don't checkpoint if the current flow is [IdempotentFlow] as well.
|
||||||
|
*/
|
||||||
|
@Suspendable
|
||||||
|
private fun checkpointIfSubflowIdempotent(subFlow: Class<FlowLogic<*>>) {
|
||||||
|
val currentFlow = snapshot().checkpoint.subFlowStack.last().flowClass
|
||||||
|
if (!currentFlow.isIdempotentFlow() && subFlow.isIdempotentFlow()) {
|
||||||
|
suspend(FlowIORequest.ForceCheckpoint, false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Suspendable
|
@Suspendable
|
||||||
override fun initiateFlow(party: Party): FlowSession {
|
override fun initiateFlow(party: Party): FlowSession {
|
||||||
val resume = processEventImmediately(
|
val resume = processEventImmediately(
|
||||||
|
@ -206,8 +206,8 @@ class StaffedFlowHospital {
|
|||||||
if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this)) {
|
if (history.notDischargedForTheSameThingMoreThan(newError.maxRetries, this)) {
|
||||||
return Diagnosis.DISCHARGE
|
return Diagnosis.DISCHARGE
|
||||||
} else {
|
} else {
|
||||||
val errorMsg = "Maximum number of retries reached for flow ${flowFiber.snapshot().flowLogic.javaClass}." +
|
val errorMsg = "Maximum number of retries reached for flow ${flowFiber.snapshot().flowLogic.javaClass}. " +
|
||||||
"If the flow involves notarising a transaction, this usually means that the notary is being overloaded and" +
|
"If the flow involves notarising a transaction, this usually means that the notary is being overloaded and " +
|
||||||
"unable to service requests fast enough. Please try again later."
|
"unable to service requests fast enough. Please try again later."
|
||||||
newError.setMessage(errorMsg)
|
newError.setMessage(errorMsg)
|
||||||
log.warn(errorMsg)
|
log.warn(errorMsg)
|
||||||
|
@ -6,22 +6,7 @@ import net.corda.core.flows.UnexpectedFlowEndException
|
|||||||
import net.corda.core.internal.FlowIORequest
|
import net.corda.core.internal.FlowIORequest
|
||||||
import net.corda.core.serialization.SerializedBytes
|
import net.corda.core.serialization.SerializedBytes
|
||||||
import net.corda.core.utilities.toNonEmptySet
|
import net.corda.core.utilities.toNonEmptySet
|
||||||
import net.corda.node.services.statemachine.Action
|
import net.corda.node.services.statemachine.*
|
||||||
import net.corda.node.services.statemachine.Checkpoint
|
|
||||||
import net.corda.node.services.statemachine.DataSessionMessage
|
|
||||||
import net.corda.node.services.statemachine.DeduplicationId
|
|
||||||
import net.corda.node.services.statemachine.ExistingSessionMessage
|
|
||||||
import net.corda.node.services.statemachine.FlowError
|
|
||||||
import net.corda.node.services.statemachine.FlowSessionImpl
|
|
||||||
import net.corda.node.services.statemachine.FlowState
|
|
||||||
import net.corda.node.services.statemachine.InitialSessionMessage
|
|
||||||
import net.corda.node.services.statemachine.InitiatedSessionState
|
|
||||||
import net.corda.node.services.statemachine.SenderDeduplicationId
|
|
||||||
import net.corda.node.services.statemachine.SessionId
|
|
||||||
import net.corda.node.services.statemachine.SessionMap
|
|
||||||
import net.corda.node.services.statemachine.SessionState
|
|
||||||
import net.corda.node.services.statemachine.StateMachineState
|
|
||||||
import net.corda.node.services.statemachine.SubFlow
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This transition describes what should happen with a specific [FlowIORequest]. Note that at this time the request
|
* This transition describes what should happen with a specific [FlowIORequest]. Note that at this time the request
|
||||||
@ -55,6 +40,7 @@ class StartedFlowTransition(
|
|||||||
is FlowIORequest.GetFlowInfo -> getFlowInfoTransition(flowIORequest)
|
is FlowIORequest.GetFlowInfo -> getFlowInfoTransition(flowIORequest)
|
||||||
is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition()
|
is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition()
|
||||||
is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest)
|
is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest)
|
||||||
|
FlowIORequest.ForceCheckpoint -> executeForceCheckpoint()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -400,6 +386,9 @@ class StartedFlowTransition(
|
|||||||
is FlowIORequest.ExecuteAsyncOperation<*> -> {
|
is FlowIORequest.ExecuteAsyncOperation<*> -> {
|
||||||
emptyList()
|
emptyList()
|
||||||
}
|
}
|
||||||
|
FlowIORequest.ForceCheckpoint -> {
|
||||||
|
emptyList()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -426,4 +415,8 @@ class StartedFlowTransition(
|
|||||||
FlowContinuation.ProcessEvents
|
FlowContinuation.ProcessEvents
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun executeForceCheckpoint(): TransitionResult {
|
||||||
|
return builder { resumeFlowLogic(Unit) }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,93 @@
|
|||||||
|
package net.corda.node.services.statemachine
|
||||||
|
|
||||||
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
|
import com.nhaarman.mockito_kotlin.doReturn
|
||||||
|
import com.nhaarman.mockito_kotlin.whenever
|
||||||
|
import net.corda.core.crypto.SecureHash
|
||||||
|
import net.corda.core.flows.FlowLogic
|
||||||
|
import net.corda.core.flows.InitiatingFlow
|
||||||
|
import net.corda.core.identity.CordaX500Name
|
||||||
|
import net.corda.core.internal.IdempotentFlow
|
||||||
|
import net.corda.core.internal.TimedFlow
|
||||||
|
import net.corda.core.internal.packageName
|
||||||
|
import net.corda.core.utilities.seconds
|
||||||
|
import net.corda.node.internal.StartedNode
|
||||||
|
import net.corda.node.services.config.FlowTimeoutConfiguration
|
||||||
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
|
import net.corda.testing.node.internal.InternalMockNetwork
|
||||||
|
import net.corda.testing.node.internal.InternalMockNodeParameters
|
||||||
|
import net.corda.testing.node.internal.startFlow
|
||||||
|
import org.junit.After
|
||||||
|
import org.junit.Before
|
||||||
|
import org.junit.Test
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
import kotlin.test.assertEquals
|
||||||
|
|
||||||
|
class IdempotentFlowTests {
|
||||||
|
private lateinit var mockNet: InternalMockNetwork
|
||||||
|
private lateinit var nodeA: StartedNode<InternalMockNetwork.MockNode>
|
||||||
|
private lateinit var nodeB: StartedNode<InternalMockNetwork.MockNode>
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
val executionCounter = AtomicInteger(0)
|
||||||
|
val subFlowExecutionCounter = AtomicInteger(0)
|
||||||
|
val suspendedOnce = AtomicBoolean(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
fun start() {
|
||||||
|
mockNet = InternalMockNetwork(threadPerNode = true, cordappPackages = listOf(this.javaClass.packageName))
|
||||||
|
nodeA = mockNet.createNode(InternalMockNodeParameters(
|
||||||
|
legalName = CordaX500Name("Alice", "AliceCorp", "GB"),
|
||||||
|
configOverrides = {
|
||||||
|
conf: NodeConfiguration ->
|
||||||
|
val retryConfig = FlowTimeoutConfiguration(1.seconds, 3, 1.0)
|
||||||
|
doReturn(retryConfig).whenever(conf).flowTimeout
|
||||||
|
}
|
||||||
|
))
|
||||||
|
nodeB = mockNet.createNode()
|
||||||
|
mockNet.startNodes()
|
||||||
|
executionCounter.set(0)
|
||||||
|
subFlowExecutionCounter.set(0)
|
||||||
|
suspendedOnce.set(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
fun cleanUp() {
|
||||||
|
mockNet.stopNodes()
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `restarting idempotent flow does not replay any part of its parent flow`() {
|
||||||
|
nodeA.services.startFlow(SideEffectFlow()).resultFuture.get()
|
||||||
|
assertEquals(1, executionCounter.get())
|
||||||
|
assertEquals(2, subFlowExecutionCounter.get())
|
||||||
|
}
|
||||||
|
|
||||||
|
@InitiatingFlow
|
||||||
|
private class SideEffectFlow : FlowLogic<Unit>() {
|
||||||
|
@Suspendable
|
||||||
|
override fun call() {
|
||||||
|
executionCounter.incrementAndGet() // This shouldn't be replayed when the TimedSubFlow restarts.
|
||||||
|
subFlow(TimedSubflow()) // Checkpoint should be taken before invoking the sub-flow.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class TimedSubflow : FlowLogic<Unit>(), TimedFlow {
|
||||||
|
@Suspendable
|
||||||
|
override fun call() {
|
||||||
|
subFlowExecutionCounter.incrementAndGet() // No checkpoint should be taken before invoking IdempotentSubFlow,
|
||||||
|
// so this should be replayed when TimedSubFlow restarts.
|
||||||
|
subFlow(IdempotentSubFlow()) // Checkpoint shouldn't be taken before invoking the sub-flow.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class IdempotentSubFlow : FlowLogic<Unit>(), IdempotentFlow {
|
||||||
|
@Suspendable
|
||||||
|
override fun call() {
|
||||||
|
if (!IdempotentFlowTests.suspendedOnce.getAndSet(true))
|
||||||
|
waitForLedgerCommit(SecureHash.zeroHash)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user