CORDA-3470 Fix flow async operations (#5780)

This commit is contained in:
Rick Parker 2019-12-02 14:40:07 +00:00 committed by GitHub
parent 1caeeb01de
commit 6e467f20b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 77 additions and 7 deletions

View File

@ -2,7 +2,12 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.Suspendable
import com.codahale.metrics.*
import com.codahale.metrics.Gauge
import com.codahale.metrics.Histogram
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.Reservoir
import com.codahale.metrics.SlidingTimeWindowArrayReservoir
import com.codahale.metrics.SlidingTimeWindowReservoir
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.internal.CheckpointSerializationContext
@ -239,7 +244,7 @@ class ActionExecutorImpl(
fiber.scheduleEvent(Event.AsyncOperationCompletion(result))
},
failure = { exception ->
fiber.scheduleEvent(Event.Error(exception))
fiber.scheduleEvent(Event.AsyncOperationThrows(exception))
}
)
} catch (e: Exception) {

View File

@ -134,6 +134,15 @@ sealed class Event {
*/
data class AsyncOperationCompletion(val returnValue: Any?) : Event()
/**
* Signals the faiure of a [FlowAsyncOperation].
*
* Scheduling is triggered by the service that completes the future returned by the async operation.
*
* @param throwable the exception thrown by the operation.
*/
data class AsyncOperationThrows(val throwable: Throwable) : Event()
/**
* Retry a flow from the last checkpoint, or if there is no checkpoint, restart the flow with the same invocation details.
*/

View File

@ -3,7 +3,22 @@ package net.corda.node.services.statemachine.transitions
import net.corda.core.flows.InitiatingFlow
import net.corda.core.internal.FlowIORequest
import net.corda.core.utilities.Try
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.Action
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.services.statemachine.EndSessionMessage
import net.corda.node.services.statemachine.ErrorState
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.ExistingSessionMessage
import net.corda.node.services.statemachine.FlowRemovalReason
import net.corda.node.services.statemachine.FlowSessionImpl
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.InitiatedSessionState
import net.corda.node.services.statemachine.SenderDeduplicationId
import net.corda.node.services.statemachine.SessionId
import net.corda.node.services.statemachine.SessionState
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.SubFlow
/**
* This is the top level event-handling transition function capable of handling any [Event].
@ -30,6 +45,7 @@ class TopLevelTransition(
is Event.FlowFinish -> flowFinishTransition(event)
is Event.InitiateFlow -> initiateFlowTransition(event)
is Event.AsyncOperationCompletion -> asyncOperationCompletionTransition(event)
is Event.AsyncOperationThrows -> asyncOperationThrowsTransition(event)
is Event.RetryFlowFromSafePoint -> retryFlowFromSafePointTransition(startingState)
}
}
@ -258,6 +274,12 @@ class TopLevelTransition(
}
}
private fun asyncOperationThrowsTransition(event: Event.AsyncOperationThrows): TransitionResult {
return builder {
resumeFlowLogic(event.throwable)
}
}
private fun retryFlowFromSafePointTransition(startingState: StateMachineState): TransitionResult {
return builder {
// Need to create a flow from the prior checkpoint or flow initiation.

View File

@ -1,7 +1,12 @@
package net.corda.node.services.statemachine.transitions
import net.corda.core.flows.IdentifiableException
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.Action
import net.corda.node.services.statemachine.ErrorState
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.FlowError
import net.corda.node.services.statemachine.SessionId
import net.corda.node.services.statemachine.StateMachineState
// This is a file defining some common utilities for creating state machine transitions.
@ -66,6 +71,12 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi
currentState = currentState.copy(isFlowResumed = true)
return FlowContinuation.Resume(result)
}
fun resumeFlowLogic(result: Throwable): FlowContinuation {
actions.add(Action.CreateTransaction)
currentState = currentState.copy(isFlowResumed = true)
return FlowContinuation.Throw(result)
}
}
class CannotFindSessionException(sessionId: SessionId) : IllegalStateException("Couldn't find session with id $sessionId")

View File

@ -11,7 +11,12 @@ import net.corda.core.internal.executeAsync
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.testing.node.internal.*
import net.corda.core.utilities.getOrThrow
import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.enclosedCordapp
import net.corda.testing.node.internal.startFlow
import org.junit.After
import org.junit.Before
import org.junit.Test
@ -63,17 +68,35 @@ class FlowAsyncOperationTests {
}
}
assertFailsWith<ExecutionException> { aliceNode.services.startFlow(flow).resultFuture.get() }
assertFailsWith<SpecialException> { aliceNode.services.startFlow(flow).resultFuture.getOrThrow() }
}
@Test
fun `operation result errors are propagated correctly, and can be caught by the flow`() {
val flow = object : FlowLogic<Unit>() {
@Suspendable
override fun call() {
try {
executeAsync(ErroredResult())
} catch (e: SpecialException) {
// Suppress
}
}
}
aliceNode.services.startFlow(flow).resultFuture.get()
}
private class ErroredResult : FlowAsyncOperation<Unit> {
override fun execute(deduplicationId: String): CordaFuture<Unit> {
val future = openFuture<Unit>()
future.setException(Exception())
future.setException(SpecialException())
return future
}
}
private class SpecialException : Exception()
@Test(timeout = 30_000)
fun `flows waiting on an async operation do not block the thread`() {
// Kick off 10 flows that submit a task to the service and wait until completion