mirror of
https://github.com/corda/corda.git
synced 2025-06-15 13:48:14 +00:00
[ENT-1774] FlowAsyncOperation deduplication ID (#4068)
This commit is contained in:
@ -3,9 +3,13 @@ package net.corda.node.flows
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.FlowAsyncOperation
|
||||
import net.corda.core.internal.IdempotentFlow
|
||||
import net.corda.core.internal.concurrent.doneFuture
|
||||
import net.corda.core.internal.executeAsync
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
@ -56,6 +60,21 @@ class FlowRetryTest {
|
||||
assertEquals("$numSessions:$numIterations", result)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `async operation deduplication id is stable accross retries`() {
|
||||
val user = User("mark", "dadada", setOf(Permissions.startFlow<AsyncRetryFlow>()))
|
||||
driver(DriverParameters(
|
||||
startNodesInProcess = isQuasarAgentSpecified(),
|
||||
notarySpecs = emptyList()
|
||||
)) {
|
||||
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
|
||||
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
|
||||
it.proxy.startFlow(::AsyncRetryFlow).returnValue.getOrThrow()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `flow gives up after number of exceptions, even if this is the first line of the flow`() {
|
||||
val user = User("mark", "dadada", setOf(Permissions.startFlow<RetryFlow>()))
|
||||
@ -218,6 +237,36 @@ class RetryFlow() : FlowLogic<String>(), IdempotentFlow {
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class AsyncRetryFlow() : FlowLogic<String>(), IdempotentFlow {
|
||||
companion object {
|
||||
object FIRST_STEP : ProgressTracker.Step("Step one")
|
||||
|
||||
fun tracker() = ProgressTracker(FIRST_STEP)
|
||||
|
||||
val deduplicationIds = mutableSetOf<String>()
|
||||
}
|
||||
|
||||
class RecordDeduplicationId: FlowAsyncOperation<String> {
|
||||
override fun execute(deduplicationId: String): CordaFuture<String> {
|
||||
val dedupeIdIsNew = deduplicationIds.add(deduplicationId)
|
||||
if (dedupeIdIsNew) {
|
||||
throw ExceptionToCauseFiniteRetry()
|
||||
}
|
||||
return doneFuture(deduplicationId)
|
||||
}
|
||||
}
|
||||
|
||||
override val progressTracker = tracker()
|
||||
|
||||
@Suspendable
|
||||
override fun call(): String {
|
||||
progressTracker.currentStep = FIRST_STEP
|
||||
executeAsync(RecordDeduplicationId())
|
||||
return "Result"
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class ThrowingFlow() : FlowLogic<String>(), IdempotentFlow {
|
||||
companion object {
|
||||
@ -237,4 +286,4 @@ class ThrowingFlow() : FlowLogic<String>(), IdempotentFlow {
|
||||
progressTracker.currentStep = FIRST_STEP
|
||||
return "Result"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -124,7 +124,7 @@ sealed class Action {
|
||||
/**
|
||||
* Execute the specified [operation].
|
||||
*/
|
||||
data class ExecuteAsyncOperation(val operation: FlowAsyncOperation<*>) : Action()
|
||||
data class ExecuteAsyncOperation(val deduplicationId: String, val operation: FlowAsyncOperation<*>) : Action()
|
||||
|
||||
/**
|
||||
* Release soft locks associated with given ID (currently the flow ID).
|
||||
|
@ -221,7 +221,7 @@ class ActionExecutorImpl(
|
||||
|
||||
@Suspendable
|
||||
private fun executeAsyncOperation(fiber: FlowFiber, action: Action.ExecuteAsyncOperation) {
|
||||
val operationFuture = action.operation.execute()
|
||||
val operationFuture = action.operation.execute(action.deduplicationId)
|
||||
operationFuture.thenMatch(
|
||||
success = { result ->
|
||||
fiber.scheduleEvent(Event.AsyncOperationCompletion(result))
|
||||
|
@ -411,7 +411,10 @@ class StartedFlowTransition(
|
||||
|
||||
private fun executeAsyncOperation(flowIORequest: FlowIORequest.ExecuteAsyncOperation<*>): TransitionResult {
|
||||
return builder {
|
||||
actions.add(Action.ExecuteAsyncOperation(flowIORequest.operation))
|
||||
// The `numberOfSuspends` is added to the deduplication ID in case an async
|
||||
// operation is executed multiple times within the same flow.
|
||||
val deduplicationId = context.id.toString() + ":" + currentState.checkpoint.numberOfSuspends.toString()
|
||||
actions.add(Action.ExecuteAsyncOperation(deduplicationId, flowIORequest.operation))
|
||||
FlowContinuation.ProcessEvents
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user