ENT-1725: Introduce FlowAsyncOperation (#658)

* ENT-1725: Introduce FlowAsyncOperation, which allows suspending a flow on a custom operation, such as long running I/O requests, notary commit, etc.

* Move async execute to internal, add more tests.

* Add a 30s test timeout

* Update API doc
This commit is contained in:
Andrius Dagys 2018-04-04 09:52:49 +01:00 committed by GitHub
parent ff3d497d15
commit 5909a49c30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 232 additions and 5 deletions

View File

@ -16,10 +16,7 @@ import net.corda.core.CordaInternal
import net.corda.core.crypto.SecureHash
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.abbreviate
import net.corda.core.internal.uncheckedCast
import net.corda.core.internal.*
import net.corda.core.messaging.DataFeed
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub

View File

@ -0,0 +1,33 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
package net.corda.core.internal
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.FlowLogic
import net.corda.core.serialization.CordaSerializable
/**
* Interface for arbitrary operations that can be invoked in a flow asynchronously - the flow will suspend until the
* operation completes. Operation parameters are expected to be injected via constructor.
*/
@CordaSerializable
interface FlowAsyncOperation<R : Any> {
/** Performs the operation in a non-blocking fashion. */
fun execute(): CordaFuture<R>
}
/** Executes the specified [operation] and suspends until operation completion. */
@Suspendable
fun <T, R : Any> FlowLogic<T>.executeAsync(operation: FlowAsyncOperation<R>, maySkipCheckpoint: Boolean = false): R {
val request = FlowIORequest.ExecuteAsyncOperation(operation)
return stateMachine.suspend(request, maySkipCheckpoint)
}

View File

@ -91,5 +91,10 @@ sealed class FlowIORequest<out R : Any> {
* Suspend the flow until all Initiating sessions are confirmed.
*/
object WaitForSessionConfirmations : FlowIORequest<Unit>()
/**
* Execute the specified [operation], suspend the flow until completion.
*/
data class ExecuteAsyncOperation<T : Any>(val operation: FlowAsyncOperation<T>) : FlowIORequest<T>()
}

View File

@ -13,6 +13,7 @@ package net.corda.node.services.statemachine
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.FlowAsyncOperation
import net.corda.node.services.messaging.DeduplicationHandler
import java.time.Instant
@ -121,6 +122,11 @@ sealed class Action {
* Commit the current database transaction.
*/
object CommitTransaction : Action() { override fun toString() = "CommitTransaction" }
/**
* Execute the specified [operation].
*/
data class ExecuteAsyncOperation(val operation: FlowAsyncOperation<*>) : Action()
}
/**

View File

@ -81,6 +81,7 @@ class ActionExecutorImpl(
is Action.CreateTransaction -> executeCreateTransaction()
is Action.RollbackTransaction -> executeRollbackTransaction()
is Action.CommitTransaction -> executeCommitTransaction()
is Action.ExecuteAsyncOperation -> executeAsyncOperation(fiber, action)
}
}
@ -218,6 +219,19 @@ class ActionExecutorImpl(
}
}
@Suspendable
private fun executeAsyncOperation(fiber: FlowFiber, action: Action.ExecuteAsyncOperation) {
val operationFuture = action.operation.execute()
operationFuture.thenMatch(
success = { result ->
fiber.scheduleEvent(Event.AsyncOperationCompletion(result))
},
failure = { exception ->
fiber.scheduleEvent(Event.Error(exception))
}
)
}
private fun serializeCheckpoint(checkpoint: Checkpoint): SerializedBytes<Checkpoint> {
return checkpoint.serialize(context = checkpointSerializationContext)
}

View File

@ -124,4 +124,13 @@ sealed class Event {
* @param returnValue the return value of the flow.
*/
data class FlowFinish(val returnValue: Any?) : Event()
/**
* Signals the completion of a [FlowAsyncOperation].
*
* Scheduling is triggered by the service that completes the future returned by the async operation.
*
* @param returnValue the result of the operation.
*/
data class AsyncOperationCompletion(val returnValue: Any?) : Event()
}

View File

@ -18,6 +18,7 @@ import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachine
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NonEmptySet
import net.corda.core.utilities.UntrustworthyData
@ -60,7 +61,10 @@ class FlowSessionImpl(
sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT)),
shouldRetrySend = false
)
return getFlowStateMachine().suspend(request, maySkipCheckpoint)[this]!!.checkPayloadIs(receiveType)
val responseValues: Map<FlowSession, SerializedBytes<Any>> = getFlowStateMachine().suspend(request, maySkipCheckpoint)
val responseForCurrentSession = responseValues[this]!!
return responseForCurrentSession.checkPayloadIs(receiveType)
}
@Suspendable

View File

@ -49,6 +49,7 @@ class StartedFlowTransition(
is FlowIORequest.Sleep -> sleepTransition(flowIORequest)
is FlowIORequest.GetFlowInfo -> getFlowInfoTransition(flowIORequest)
is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition()
is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest)
}
}
@ -388,6 +389,9 @@ class StartedFlowTransition(
is FlowIORequest.WaitForSessionConfirmations -> {
collectErroredInitiatingSessionErrors(checkpoint)
}
is FlowIORequest.ExecuteAsyncOperation<*> -> {
emptyList()
}
}
}
@ -406,4 +410,11 @@ class StartedFlowTransition(
firstPayload = payload
)
}
private fun executeAsyncOperation(flowIORequest: FlowIORequest.ExecuteAsyncOperation<*>): TransitionResult {
return builder {
actions.add(Action.ExecuteAsyncOperation(flowIORequest.operation))
FlowContinuation.ProcessEvents
}
}
}

View File

@ -39,6 +39,7 @@ class TopLevelTransition(
is Event.Suspend -> suspendTransition(event)
is Event.FlowFinish -> flowFinishTransition(event)
is Event.InitiateFlow -> initiateFlowTransition(event)
is Event.AsyncOperationCompletion -> asyncOperationCompletionTransition(event)
}
}
@ -243,4 +244,10 @@ class TopLevelTransition(
}
return null
}
private fun asyncOperationCompletionTransition(event: Event.AsyncOperationCompletion): TransitionResult {
return builder {
resumeFlowLogic(event.returnValue)
}
}
}

View File

@ -0,0 +1,141 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.FlowLogic
import net.corda.core.internal.FlowAsyncOperation
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.executeAsync
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.node.internal.StartedNode
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.startFlow
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ExecutionException
import kotlin.test.assertFailsWith
class FlowAsyncOperationTests {
private lateinit var mockNet: InternalMockNetwork
private lateinit var aliceNode: StartedNode<InternalMockNetwork.MockNode>
@Before
fun setup() {
mockNet = InternalMockNetwork(
cordappPackages = listOf("net.corda.testing.contracts", "net.corda.node.services.statemachine"),
notarySpecs = emptyList()
)
aliceNode = mockNet.createNode()
}
@After
fun cleanUp() {
mockNet.stopNodes()
}
@Test
fun `operation errors are propagated correctly`() {
val flow = object : FlowLogic<Unit>() {
@Suspendable
override fun call() {
executeAsync(ErroredExecute())
}
}
assertFailsWith<ExecutionException> { aliceNode.services.startFlow(flow).resultFuture.get() }
}
private class ErroredExecute : FlowAsyncOperation<Unit> {
override fun execute(): CordaFuture<Unit> {
throw Exception()
}
}
@Test
fun `operation result errors are propagated correctly`() {
val flow = object : FlowLogic<Unit>() {
@Suspendable
override fun call() {
executeAsync(ErroredResult())
}
}
assertFailsWith<ExecutionException> { aliceNode.services.startFlow(flow).resultFuture.get() }
}
private class ErroredResult : FlowAsyncOperation<Unit> {
override fun execute(): CordaFuture<Unit> {
val future = openFuture<Unit>()
future.setException(Exception())
return future
}
}
@Test(timeout = 30_000)
fun `flows waiting on an async operation do not block the thread`() {
// Kick off 10 flows that submit a task to the service and wait until completion
val numFlows = 10
val futures = (1..10).map {
aliceNode.services.startFlow(TestFlowWithAsyncAction(false)).resultFuture
}
// Make sure all flows submitted a task to the service and are awaiting completion
val service = aliceNode.services.cordaService(WorkerService::class.java)
while (service.pendingCount != numFlows) Thread.sleep(100)
// Complete all pending tasks. If async operations aren't handled as expected, and one of the previous flows is
// actually blocking the thread, the following flow will deadlock and the test won't finish.
aliceNode.services.startFlow(TestFlowWithAsyncAction(true)).resultFuture.get()
// Make sure all waiting flows completed successfully
futures.transpose().get()
}
private class TestFlowWithAsyncAction(val completeAllTasks: Boolean) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val scv = serviceHub.cordaService(WorkerService::class.java)
executeAsync(WorkerServiceTask(completeAllTasks, scv))
}
}
private class WorkerServiceTask(val completeAllTasks: Boolean, val service: WorkerService) : FlowAsyncOperation<Unit> {
override fun execute(): CordaFuture<Unit> {
return service.performTask(completeAllTasks)
}
}
/** A dummy worker service that queues up tasks and allows clearing the entire task backlog. */
@CordaService
class WorkerService(val serviceHub: AppServiceHub) : SingletonSerializeAsToken() {
private val pendingTasks = ConcurrentLinkedQueue<OpenFuture<Unit>>()
val pendingCount: Int get() = pendingTasks.count()
fun performTask(completeAllTasks: Boolean): CordaFuture<Unit> {
val taskFuture = openFuture<Unit>()
pendingTasks.add(taskFuture)
if (completeAllTasks) {
synchronized(this) {
while (!pendingTasks.isEmpty()) {
val fut = pendingTasks.poll()!!
fut.set(Unit)
}
}
}
return taskFuture
}
}
}