ENT-4601 Public API to run external operations from a flow (#5833)

Deprecate FlowAsyncOperation and reimplement public versions FlowExternalOperation and FlowExternalAsyncOperation.

await added to FlowLogic to allow easy calling from both Java and Kotlin. There are two overrides of await (one for FlowExternalOperation and FlowExternalAsyncOperation).

Implementations of FlowExternalOperation return a result (written as blocking code) from their execute function. This operation will then be executed using a thread provided by the externalOperationExecutor.

Implementations of FlowExternalAsyncOperation return a future from their execute function. This operation must be executed on a newly spawned thread or one provided by a thread pool. It is up to developers to handle threading in this scenario.

The default thread pool (externalOperationExecutor) can be configured through the flowExternalOperationThreadPoolSize node config.

The current implementation leaves FlowAsyncOperation alone, meaning that any developers that have used it (even though it is internal) won't need to change their apps. If this was not concern I would delete it completely and replumb the state machine code. Instead, it has been marked with @DoNotImplement and executeAsync is annotated with @Deprecated
This commit is contained in:
Dan Newton 2020-01-22 09:27:17 +00:00 committed by Rick Parker
parent 0978500a9a
commit 4bae045a58
29 changed files with 2235 additions and 129 deletions

View File

@ -2205,6 +2205,14 @@ public class net.corda.core.flows.FlowException extends net.corda.core.CordaExce
public final Long getOriginalErrorId()
public final void setOriginalErrorId(Long)
##
public interface net.corda.core.flows.FlowExternalAsyncOperation
@NotNull
public abstract java.util.concurrent.CompletableFuture<R> execute(String)
##
public interface net.corda.core.flows.FlowExternalOperation
@NotNull
public abstract R execute(String)
##
@CordaSerializable
public final class net.corda.core.flows.FlowInfo extends java.lang.Object
public <init>(int, String)
@ -2300,6 +2308,12 @@ public static final class net.corda.core.flows.FlowInitiator$Shell extends net.c
public abstract class net.corda.core.flows.FlowLogic extends java.lang.Object
public <init>()
@Suspendable
@NotNull
public final R await(net.corda.core.flows.FlowExternalAsyncOperation<R>)
@Suspendable
@NotNull
public final R await(net.corda.core.flows.FlowExternalOperation<R>)
@Suspendable
public abstract T call()
public final void checkFlowPermission(String, java.util.Map<String, String>)
@Suspendable

View File

@ -0,0 +1,198 @@
package net.corda.coretests.flows;
import co.paralleluniverse.fibers.Suspendable;
import net.corda.core.flows.FlowExternalAsyncOperation;
import net.corda.core.flows.FlowExternalOperation;
import net.corda.core.flows.StartableByRPC;
import net.corda.core.identity.Party;
import net.corda.core.utilities.KotlinUtilsKt;
import net.corda.testing.core.TestConstants;
import net.corda.testing.core.TestUtils;
import net.corda.testing.driver.DriverParameters;
import net.corda.testing.driver.NodeHandle;
import net.corda.testing.driver.NodeParameters;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import static net.corda.testing.driver.Driver.driver;
import static org.junit.Assert.assertEquals;
public class FlowExternalOperationInJavaTest extends AbstractFlowExternalOperationTest {
@Test
public void awaitFlowExternalOperationInJava() {
driver(new DriverParameters().withStartNodesInProcess(true), driver -> {
NodeHandle alice = KotlinUtilsKt.getOrThrow(
driver.startNode(new NodeParameters().withProvidedName(TestConstants.ALICE_NAME)),
Duration.of(20, ChronoUnit.SECONDS)
);
NodeHandle bob = KotlinUtilsKt.getOrThrow(
driver.startNode(new NodeParameters().withProvidedName(TestConstants.BOB_NAME)),
Duration.of(20, ChronoUnit.SECONDS)
);
return KotlinUtilsKt.getOrThrow(alice.getRpc().startFlowDynamic(
FlowWithExternalOperationInJava.class,
TestUtils.singleIdentity(bob.getNodeInfo())
).getReturnValue(), Duration.of(20, ChronoUnit.SECONDS));
});
}
@Test
public void awaitFlowExternalAsyncOperationInJava() {
driver(new DriverParameters().withStartNodesInProcess(true), driver -> {
NodeHandle alice = KotlinUtilsKt.getOrThrow(
driver.startNode(new NodeParameters().withProvidedName(TestConstants.ALICE_NAME)),
Duration.of(20, ChronoUnit.SECONDS)
);
NodeHandle bob = KotlinUtilsKt.getOrThrow(
driver.startNode(new NodeParameters().withProvidedName(TestConstants.BOB_NAME)),
Duration.of(20, ChronoUnit.SECONDS)
);
return KotlinUtilsKt.getOrThrow(alice.getRpc().startFlowDynamic(
FlowWithExternalAsyncOperationInJava.class,
TestUtils.singleIdentity(bob.getNodeInfo())
).getReturnValue(), Duration.of(20, ChronoUnit.SECONDS));
});
}
@Test
public void awaitFlowExternalOperationInJavaCanBeRetried() {
driver(new DriverParameters().withStartNodesInProcess(true), driver -> {
NodeHandle alice = KotlinUtilsKt.getOrThrow(
driver.startNode(new NodeParameters().withProvidedName(TestConstants.ALICE_NAME)),
Duration.of(20, ChronoUnit.SECONDS)
);
NodeHandle bob = KotlinUtilsKt.getOrThrow(
driver.startNode(new NodeParameters().withProvidedName(TestConstants.BOB_NAME)),
Duration.of(20, ChronoUnit.SECONDS)
);
KotlinUtilsKt.getOrThrow(alice.getRpc().startFlowDynamic(
FlowWithExternalOperationThatGetsRetriedInJava.class,
TestUtils.singleIdentity(bob.getNodeInfo())
).getReturnValue(), Duration.of(20, ChronoUnit.SECONDS));
HospitalCounts counts = KotlinUtilsKt.getOrThrow(alice.getRpc().startFlowDynamic(
GetHospitalCountersFlow.class
).getReturnValue(), Duration.of(20, ChronoUnit.SECONDS));
assertEquals(1, counts.getDischarge());
assertEquals(0, counts.getObservation());
return null;
});
}
@StartableByRPC
public static class FlowWithExternalOperationInJava extends FlowWithExternalProcess {
private static Logger log = LoggerFactory.getLogger(FlowWithExternalOperationInJava.class);
public FlowWithExternalOperationInJava(Party party) {
super(party);
}
@NotNull
@Override
@Suspendable
public Object testCode() {
return await(
new ExternalOperation<>(
getServiceHub().cordaService(FutureService.class),
(BiFunction<FutureService, String, Object> & Serializable) (futureService, deduplicationId) -> {
log.info("Inside of background process - " + deduplicationId);
return "Background process completed - (" + deduplicationId + ")";
}
)
);
}
}
@StartableByRPC
public static class FlowWithExternalAsyncOperationInJava extends FlowWithExternalProcess {
public FlowWithExternalAsyncOperationInJava(Party party) {
super(party);
}
@NotNull
@Override
@Suspendable
public Object testCode() {
return await(new ExternalAsyncOperation<>(
getServiceHub().cordaService(FutureService.class),
(BiFunction<FutureService, String, CompletableFuture<Object>> & Serializable) (futureService, deduplicationId) ->
futureService.createFuture()
));
}
}
@StartableByRPC
public static class FlowWithExternalOperationThatGetsRetriedInJava extends FlowWithExternalProcess {
private static boolean flag = false;
public FlowWithExternalOperationThatGetsRetriedInJava(Party party) {
super(party);
}
@NotNull
@Override
@Suspendable
public Object testCode() {
return await(
new ExternalOperation<>(
getServiceHub().cordaService(FutureService.class),
(BiFunction<FutureService, String, Object> & Serializable) (futureService, deduplicationId) -> {
if (!flag) {
flag = true;
return futureService.throwHospitalHandledException();
} else {
return "finished";
}
}
)
);
}
}
public static class ExternalAsyncOperation<R> implements FlowExternalAsyncOperation<R> {
private FutureService futureService;
private BiFunction<FutureService, String, CompletableFuture<R>> operation;
public ExternalAsyncOperation(FutureService futureService, BiFunction<FutureService, String, CompletableFuture<R>> operation) {
this.futureService = futureService;
this.operation = operation;
}
@NotNull
@Override
public CompletableFuture<R> execute(@NotNull String deduplicationId) {
return operation.apply(futureService, deduplicationId);
}
}
public static class ExternalOperation<R> implements FlowExternalOperation<R> {
private FutureService futureService;
private BiFunction<FutureService, String, R> operation;
public ExternalOperation(FutureService futureService, BiFunction<FutureService, String, R> operation) {
this.futureService = futureService;
this.operation = operation;
}
@NotNull
@Override
public R execute(@NotNull String deduplicationId) {
return operation.apply(futureService, deduplicationId);
}
}
}

View File

@ -0,0 +1,233 @@
package net.corda.coretests.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.CordaException
import net.corda.core.flows.FlowExternalAsyncOperation
import net.corda.core.flows.FlowExternalOperation
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.StartableByService
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.doOnComplete
import net.corda.core.node.AppServiceHub
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.schemas.MappedSchema
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.statemachine.StaffedFlowHospital
import java.sql.SQLTransientConnectionException
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executors
import java.util.function.Supplier
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
import javax.persistence.Table
abstract class AbstractFlowExternalOperationTest {
@StartableByRPC
@InitiatingFlow
@StartableByService
open class FlowWithExternalProcess(val party: Party) : FlowLogic<Any>() {
companion object {
val log = contextLogger()
}
@Suspendable
override fun call(): Any {
log.info("Started my flow")
val result = testCode()
val session = initiateFlow(party)
session.send("hi there")
log.info("ServiceHub value = $serviceHub")
session.receive<String>()
log.info("Finished my flow")
return result
}
@Suspendable
open fun testCode(): Any = await(ExternalOperation(serviceHub) { _, deduplicationId ->
log.info("Inside of background process - $deduplicationId")
"Background process completed - ($deduplicationId)"
})
}
@InitiatedBy(FlowWithExternalProcess::class)
class FlowWithExternalOperationResponder(val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>()
session.send("go away")
}
}
@CordaService
class FutureService(private val services: AppServiceHub) : SingletonSerializeAsToken() {
companion object {
val log = contextLogger()
}
private val executorService = Executors.newFixedThreadPool(8)
private val deduplicationIds = mutableSetOf<String>()
fun createFuture(): CompletableFuture<Any> {
return CompletableFuture.supplyAsync(Supplier<Any> {
log.info("Starting sleep inside of future")
Thread.sleep(2000)
log.info("Finished sleep inside of future")
"Here is your return value"
}, executorService)
}
fun createExceptionFutureWithDeduplication(deduplicationId: String): CompletableFuture<Any> {
return CompletableFuture.supplyAsync(
Supplier<Any> { createExceptionWithDeduplication(deduplicationId) },
executorService
)
}
fun createExceptionWithDeduplication(deduplicationId: String): Any {
log.info("Creating future")
if (deduplicationId !in deduplicationIds) {
deduplicationIds += deduplicationId
throw SQLTransientConnectionException("fake exception - connection is not available")
}
throw DuplicatedProcessException(deduplicationId)
}
fun setHospitalHandledException(): CompletableFuture<Any> = CompletableFuture<Any>().apply {
completeExceptionally(SQLTransientConnectionException("fake exception - connection is not available"))
}
fun throwHospitalHandledException(): Nothing = throw SQLTransientConnectionException("fake exception - connection is not available")
fun startMultipleFuturesAndJoin(): CompletableFuture<List<Any>> {
return CompletableFuture.supplyAsync(
Supplier<List<Any>> {
log.info("Creating multiple futures")
(1..5).map { createFuture().getOrThrow() }
},
executorService
)
}
fun startFlow(party: Party): CompletableFuture<Any> {
return CompletableFuture.supplyAsync(
Supplier<Any> {
log.info("Starting new flow")
services.startFlow(FlowWithExternalProcess(party)).returnValue
.doOnComplete { log.info("Finished new flow") }.get()
},
executorService
)
}
fun startFlows(party: Party): CompletableFuture<List<Any>> {
return CompletableFuture.supplyAsync(
Supplier<List<Any>> {
log.info("Starting new flows")
(1..5).map { i ->
services.startFlow(FlowWithExternalProcess(party))
.returnValue
.doOnComplete { log.info("Finished new flow $i") }
.getOrThrow()
}
},
executorService
)
}
fun readFromDatabase(name: String): CustomTableEntity? = services.withEntityManager { find(CustomTableEntity::class.java, name) }
fun saveToDatabaseWithEntityManager(entity: CustomTableEntity): Unit = services.withEntityManager {
persist(entity)
}
fun saveToDatabaseWithJdbcSession(entity: CustomTableEntity): Unit = services.database.transaction {
services.jdbcSession()
.createStatement()
.execute("INSERT INTO custom_table (name, quote) VALUES ('${entity.name}', '${entity.quote}');")
}
fun saveToDatabaseWithDatabaseTransaction(entity: CustomTableEntity): Unit = services.database.transaction {
session.save(entity)
}
fun throwExceptionInsideOfDatabaseTransaction(): Nothing = services.database.transaction {
throw SQLTransientConnectionException("connection is not available")
}
}
@Entity
@Table(name = "custom_table")
data class CustomTableEntity constructor(
@Id
@Column(name = "name", nullable = false)
var name: String,
@Column(name = "quote", nullable = false)
var quote: String
)
object CustomSchema
object CustomMappedSchema : MappedSchema(CustomSchema::class.java, 1, listOf(CustomTableEntity::class.java))
// Internal use for testing only!!
@StartableByRPC
class GetHospitalCountersFlow : FlowLogic<HospitalCounts>() {
override fun call(): HospitalCounts =
HospitalCounts(
serviceHub.cordaService(HospitalCounter::class.java).dischargeCounter,
serviceHub.cordaService(HospitalCounter::class.java).observationCounter
)
}
@CordaSerializable
data class HospitalCounts(val discharge: Int, val observation: Int)
@Suppress("UNUSED_PARAMETER")
@CordaService
class HospitalCounter(services: AppServiceHub) : SingletonSerializeAsToken() {
var observationCounter: Int = 0
var dischargeCounter: Int = 0
init {
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++dischargeCounter }
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter }
}
}
class MyCordaException(message: String) : CordaException(message)
class DirectlyAccessedServiceHubException : CordaException("Null pointer from accessing flow's serviceHub")
class DuplicatedProcessException(private val deduplicationId: String) : CordaException("Duplicated process: $deduplicationId")
class ExternalOperation<R : Any>(
private val serviceHub: ServiceHub,
private val operation: (serviceHub: ServiceHub, deduplicationId: String) -> R
) : FlowExternalOperation<R> {
override fun execute(deduplicationId: String): R {
return operation(serviceHub, deduplicationId)
}
}
class ExternalAsyncOperation<R : Any>(
private val serviceHub: ServiceHub,
private val function: (serviceHub: ServiceHub, deduplicationId: String) -> CompletableFuture<R>
) : FlowExternalAsyncOperation<R> {
override fun execute(deduplicationId: String): CompletableFuture<R> {
return function(serviceHub, deduplicationId)
}
}
}

View File

@ -0,0 +1,294 @@
package net.corda.coretests.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.DirectlyAccessedServiceHubException
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.ExternalAsyncOperation
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.FlowWithExternalProcess
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.FutureService
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.MyCordaException
import net.corda.node.services.statemachine.StateTransitionException
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import org.junit.Test
import java.sql.SQLTransientConnectionException
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
@Test
fun `external async operation`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
alice.rpc.startFlow(::FlowWithExternalAsyncOperation, bob.nodeInfo.singleIdentity())
.returnValue.getOrThrow(20.seconds)
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(0, discharged)
assertEquals(0, observation)
}
}
@Test
fun `external async operation that checks deduplicationId is not rerun when flow is retried`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
assertFailsWith<DuplicatedProcessException> {
alice.rpc.startFlow(
::FlowWithExternalAsyncOperationWithDeduplication,
bob.nodeInfo.singleIdentity()
).returnValue.getOrThrow(20.seconds)
}
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(1, discharged)
assertEquals(0, observation)
}
}
@Test
fun `external async operation propagates exception to calling flow`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
assertFailsWith<MyCordaException> {
alice.rpc.startFlow(
::FlowWithExternalAsyncOperationPropagatesException,
bob.nodeInfo.singleIdentity(),
MyCordaException::class.java
).returnValue.getOrThrow(20.seconds)
}
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(0, discharged)
assertEquals(0, observation)
}
}
@Test
fun `external async operation exception can be caught in flow`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
val result = alice.rpc.startFlow(
::FlowWithExternalAsyncOperationThatThrowsExceptionAndCaughtInFlow,
bob.nodeInfo.singleIdentity()
).returnValue.getOrThrow(20.seconds)
assertTrue(result as Boolean)
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(0, discharged)
assertEquals(0, observation)
}
}
@Test
fun `external async operation with exception that hospital keeps for observation does not fail`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(
::FlowWithExternalAsyncOperationPropagatesException,
bob.nodeInfo.singleIdentity(),
HospitalizeFlowException::class.java
).returnValue.getOrThrow(20.seconds)
}
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(0, discharged)
assertEquals(1, observation)
}
}
@Test
fun `external async operation with exception that hospital discharges is retried and runs the future again`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(
::FlowWithExternalAsyncOperationPropagatesException,
bob.nodeInfo.singleIdentity(),
SQLTransientConnectionException::class.java
).returnValue.getOrThrow(20.seconds)
}
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(3, discharged)
assertEquals(1, observation)
}
}
@Test
fun `external async operation that throws exception rather than completing future exceptionally fails with internal exception`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
assertFailsWith<StateTransitionException> {
alice.rpc.startFlow(::FlowWithExternalAsyncOperationUnhandledException, bob.nodeInfo.singleIdentity())
.returnValue.getOrThrow(20.seconds)
}
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(0, discharged)
assertEquals(0, observation)
}
}
@Test
fun `external async operation that passes serviceHub into process can be retried`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(
::FlowWithExternalAsyncOperationThatPassesInServiceHubCanRetry,
bob.nodeInfo.singleIdentity()
).returnValue.getOrThrow(20.seconds)
}
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(3, discharged)
assertEquals(1, observation)
}
}
@Test
fun `external async operation that accesses serviceHub from flow directly will fail when retried`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
assertFailsWith<DirectlyAccessedServiceHubException> {
alice.rpc.startFlow(
::FlowWithExternalAsyncOperationThatDirectlyAccessesServiceHubFailsRetry,
bob.nodeInfo.singleIdentity()
).returnValue.getOrThrow(20.seconds)
}
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(1, discharged)
assertEquals(0, observation)
}
}
@Test
fun `starting multiple futures and joining on their results`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
alice.rpc.startFlow(::FlowThatStartsMultipleFuturesAndJoins, bob.nodeInfo.singleIdentity()).returnValue.getOrThrow(20.seconds)
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(0, discharged)
assertEquals(0, observation)
}
}
@StartableByRPC
class FlowWithExternalAsyncOperation(party: Party) : FlowWithExternalProcess(party) {
@Suspendable
override fun testCode(): Any =
await(ExternalAsyncOperation(serviceHub) { _, _ ->
serviceHub.cordaService(FutureService::class.java).createFuture()
})
}
@StartableByRPC
class FlowWithExternalAsyncOperationPropagatesException<T>(party: Party, private val exceptionType: Class<T>) :
FlowWithExternalProcess(party) {
@Suspendable
override fun testCode(): Any =
await(ExternalAsyncOperation(serviceHub) { _, _ ->
CompletableFuture<Any>().apply {
completeExceptionally(createException())
}
})
private fun createException() = when (exceptionType) {
HospitalizeFlowException::class.java -> HospitalizeFlowException("keep it around")
SQLTransientConnectionException::class.java -> SQLTransientConnectionException("fake exception - connection is not available")
else -> MyCordaException("boom")
}
}
@StartableByRPC
class FlowWithExternalAsyncOperationThatThrowsExceptionAndCaughtInFlow(party: Party) :
FlowWithExternalProcess(party) {
@Suspendable
override fun testCode(): Any = try {
await(ExternalAsyncOperation(serviceHub) { _, _ ->
CompletableFuture<Any>().apply {
completeExceptionally(IllegalStateException("threw exception in external async operation"))
}
})
} catch (e: IllegalStateException) {
log.info("Exception was caught")
true
}
}
@StartableByRPC
class FlowWithExternalAsyncOperationUnhandledException(party: Party) : FlowWithExternalProcess(party) {
@Suspendable
override fun testCode(): Any =
await(ExternalAsyncOperation(serviceHub) { _, _ -> throw MyCordaException("threw exception in external async operation") })
}
@StartableByRPC
class FlowWithExternalAsyncOperationThatPassesInServiceHubCanRetry(party: Party) : FlowWithExternalProcess(party) {
@Suspendable
override fun testCode(): Any =
await(ExternalAsyncOperation(serviceHub) { serviceHub, _ ->
serviceHub.cordaService(FutureService::class.java).throwHospitalHandledException()
})
}
@StartableByRPC
class FlowWithExternalAsyncOperationThatDirectlyAccessesServiceHubFailsRetry(party: Party) : FlowWithExternalProcess(party) {
@Suppress("TooGenericExceptionCaught")
@Suspendable
override fun testCode(): Any {
return await(ExternalAsyncOperation(serviceHub) { _, _ ->
try {
serviceHub.cordaService(FutureService::class.java).setHospitalHandledException()
} catch (e: NullPointerException) {
// Catch the [NullPointerException] thrown from accessing the flow's [ServiceHub]
// set the future so that the exception can be asserted from the test
CompletableFuture<Any>().apply { completeExceptionally(DirectlyAccessedServiceHubException()) }
}
})
}
}
@StartableByRPC
class FlowWithExternalAsyncOperationWithDeduplication(party: Party) : FlowWithExternalProcess(party) {
@Suspendable
override fun testCode(): Any {
return await(ExternalAsyncOperation(serviceHub) { serviceHub, deduplicationId ->
serviceHub.cordaService(FutureService::class.java).createExceptionFutureWithDeduplication(deduplicationId)
})
}
}
@StartableByRPC
class FlowThatStartsMultipleFuturesAndJoins(party: Party) : FlowWithExternalProcess(party) {
@Suspendable
override fun testCode(): Any =
await(ExternalAsyncOperation(serviceHub) { serviceHub, _ ->
serviceHub.cordaService(FutureService::class.java).startMultipleFuturesAndJoin()
}.also { log.info("Result - $it") })
}
}

View File

@ -0,0 +1,70 @@
package net.corda.coretests.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import org.junit.Test
import kotlin.test.assertEquals
class FlowExternalOperationStartFlowTest : AbstractFlowExternalOperationTest() {
@Test
fun `starting a flow inside of a flow that starts a future will succeed`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
alice.rpc.startFlow(::FlowThatStartsAnotherFlowInAnExternalOperation, bob.nodeInfo.singleIdentity())
.returnValue.getOrThrow(40.seconds)
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(0, discharged)
assertEquals(0, observation)
}
}
@Test
fun `multiple flows can be started and their futures joined from inside a flow`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
alice.rpc.startFlow(::ForkJoinFlows, bob.nodeInfo.singleIdentity())
.returnValue.getOrThrow(40.seconds)
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(0, discharged)
assertEquals(0, observation)
}
}
@StartableByRPC
class FlowThatStartsAnotherFlowInAnExternalOperation(party: Party) : FlowWithExternalProcess(party) {
@Suspendable
override fun testCode(): Any {
return await(
ExternalAsyncOperation(serviceHub) { serviceHub, _ ->
serviceHub.cordaService(FutureService::class.java).startFlow(party)
}.also { log.info("Result - $it") }
)
}
}
@StartableByRPC
class ForkJoinFlows(party: Party) : FlowWithExternalProcess(party) {
@Suspendable
override fun testCode(): Any {
return await(
ExternalAsyncOperation(serviceHub) { serviceHub, _ ->
serviceHub.cordaService(FutureService::class.java).startFlows(party)
}.also { log.info("Result - $it") }
)
}
}
}

View File

@ -0,0 +1,438 @@
package net.corda.coretests.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.internal.packageName
import net.corda.core.messaging.startFlow
import net.corda.core.node.services.queryBy
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.CustomTableEntity
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.DirectlyAccessedServiceHubException
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.ExternalOperation
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.FlowWithExternalProcess
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.FutureService
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.MyCordaException
import net.corda.testing.contracts.DummyContract
import net.corda.testing.contracts.DummyState
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.internal.cordappsForPackages
import org.junit.Test
import java.lang.IllegalStateException
import java.sql.SQLTransientConnectionException
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
class FlowExternalOperationTest : AbstractFlowExternalOperationTest() {
@Test
fun `external operation`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
alice.rpc.startFlow(::FlowWithExternalOperation, bob.nodeInfo.singleIdentity())
.returnValue.getOrThrow(20.seconds)
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(0, discharged)
assertEquals(0, observation)
}
}
@Test
fun `external operation that checks deduplicationId is not rerun when flow is retried`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
assertFailsWith<DuplicatedProcessException> {
alice.rpc.startFlow(
::FlowWithExternalOperationWithDeduplication,
bob.nodeInfo.singleIdentity()
).returnValue.getOrThrow(20.seconds)
}
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(1, discharged)
assertEquals(0, observation)
}
}
@Test
fun `external operation propagates exception to calling flow`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
assertFailsWith<MyCordaException> {
alice.rpc.startFlow(
::FlowWithExternalOperationPropagatesException,
bob.nodeInfo.singleIdentity(),
MyCordaException::class.java
).returnValue.getOrThrow(20.seconds)
}
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(0, discharged)
assertEquals(0, observation)
}
}
@Test
fun `external operation exception can be caught in flow`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
alice.rpc.startFlow(::FlowWithExternalOperationThatThrowsExceptionAndCaughtInFlow, bob.nodeInfo.singleIdentity())
.returnValue.getOrThrow(20.seconds)
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(0, discharged)
assertEquals(0, observation)
}
}
@Test
fun `external operation with exception that hospital keeps for observation does not fail`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(
::FlowWithExternalOperationPropagatesException,
bob.nodeInfo.singleIdentity(),
HospitalizeFlowException::class.java
).returnValue.getOrThrow(20.seconds)
}
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(0, discharged)
assertEquals(1, observation)
}
}
@Test
fun `external operation with exception that hospital discharges is retried and runs the external operation again`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(
::FlowWithExternalOperationPropagatesException,
bob.nodeInfo.singleIdentity(),
SQLTransientConnectionException::class.java
).returnValue.getOrThrow(20.seconds)
}
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(3, discharged)
assertEquals(1, observation)
}
}
@Test
fun `external async operation that passes serviceHub into process can be retried`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(
::FlowWithExternalOperationThatPassesInServiceHubCanRetry,
bob.nodeInfo.singleIdentity()
).returnValue.getOrThrow(20.seconds)
}
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(3, discharged)
assertEquals(1, observation)
}
}
@Test
fun `external async operation that accesses serviceHub from flow directly will fail when retried`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
assertFailsWith<DirectlyAccessedServiceHubException> {
alice.rpc.startFlow(
::FlowWithExternalOperationThatDirectlyAccessesServiceHubFailsRetry,
bob.nodeInfo.singleIdentity()
).returnValue.getOrThrow(20.seconds)
}
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(1, discharged)
assertEquals(0, observation)
}
}
@Test
fun `vault can be queried`() {
driver(
DriverParameters(
cordappsForAllNodes = cordappsForPackages(DummyState::class.packageName),
startNodesInProcess = true
)
) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val success = alice.rpc.startFlow(::FlowWithWithExternalOperationThatQueriesVault)
.returnValue.getOrThrow(20.seconds)
assertTrue(success)
}
}
@Test
fun `data can be persisted to node database via entity manager`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val success = alice.rpc.startFlow(::FlowWithExternalOperationThatPersistsViaEntityManager)
.returnValue.getOrThrow(20.seconds)
assertTrue(success)
}
}
@Test
fun `data can be persisted to node database via jdbc session`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val success = alice.rpc.startFlow(::FlowWithExternalOperationThatPersistsViaJdbcSession)
.returnValue.getOrThrow(20.seconds)
assertTrue(success)
}
}
@Test
fun `data can be persisted to node database via servicehub database transaction`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val success = alice.rpc.startFlow(::FlowWithExternalOperationThatPersistsViaDatabaseTransaction)
.returnValue.getOrThrow(20.seconds)
assertTrue(success)
}
}
@Test
fun `data can be persisted to node database in external operation and read from another process once finished`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val success = alice.rpc.startFlow(::FlowWithExternalOperationThatPersistsToDatabaseAndReadsFromExternalOperation)
.returnValue.getOrThrow(20.seconds)
assertTrue(success)
}
}
@Test
fun `external operation can be retried when an error occurs inside of database transaction`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
val success = alice.rpc.startFlow(
::FlowWithExternalOperationThatErrorsInsideOfDatabaseTransaction,
bob.nodeInfo.singleIdentity()
).returnValue.getOrThrow(20.seconds)
assertTrue(success as Boolean)
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
assertEquals(1, discharged)
assertEquals(0, observation)
}
}
@StartableByRPC
class FlowWithExternalOperation(party: Party) : FlowWithExternalProcess(party) {
@Suspendable
override fun testCode(): Any = await(ExternalOperation(serviceHub) { _, _ -> "please return my message" })
}
@StartableByRPC
class FlowWithExternalOperationWithDeduplication(party: Party) : FlowWithExternalProcess(party) {
@Suspendable
override fun testCode(): Any {
return await(ExternalOperation(serviceHub) { serviceHub, deduplicationId ->
serviceHub.cordaService(FutureService::class.java).createExceptionWithDeduplication(deduplicationId)
})
}
}
@StartableByRPC
class FlowWithExternalOperationPropagatesException<T>(party: Party, private val exceptionType: Class<T>) :
FlowWithExternalProcess(party) {
@Suspendable
override fun testCode(): Any = await(ExternalOperation(serviceHub) { _, _ -> throw createException() })
private fun createException() = when (exceptionType) {
HospitalizeFlowException::class.java -> HospitalizeFlowException("keep it around")
SQLTransientConnectionException::class.java -> SQLTransientConnectionException("fake exception - connection is not available")
else -> MyCordaException("boom")
}
}
@StartableByRPC
class FlowWithExternalOperationThatThrowsExceptionAndCaughtInFlow(party: Party) :
FlowWithExternalProcess(party) {
@Suspendable
override fun testCode(): Any = try {
await(ExternalOperation(serviceHub) { _, _ ->
throw IllegalStateException("threw exception in background process")
})
} catch (e: IllegalStateException) {
log.info("Exception was caught")
"Exception was caught"
}
}
@StartableByRPC
class FlowWithExternalOperationThatPassesInServiceHubCanRetry(party: Party) : FlowWithExternalProcess(party) {
@Suspendable
override fun testCode(): Any =
await(ExternalOperation(serviceHub) { serviceHub, _ ->
serviceHub.cordaService(FutureService::class.java).throwHospitalHandledException()
})
}
@StartableByRPC
class FlowWithExternalOperationThatDirectlyAccessesServiceHubFailsRetry(party: Party) : FlowWithExternalProcess(party) {
@Suppress("TooGenericExceptionCaught")
@Suspendable
override fun testCode(): Any {
try {
await(ExternalOperation(serviceHub) { _, _ ->
serviceHub.cordaService(FutureService::class.java).throwHospitalHandledException()
})
} catch (e: NullPointerException) {
throw DirectlyAccessedServiceHubException()
}
}
}
@StartableByRPC
class FlowWithWithExternalOperationThatQueriesVault : FlowLogic<Boolean>() {
@Suspendable
override fun call(): Boolean {
val state = DummyState(1, listOf(ourIdentity))
val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply {
addOutputState(state)
addCommand(DummyContract.Commands.Create(), listOf(ourIdentity.owningKey))
}
val stx = serviceHub.signInitialTransaction(tx)
serviceHub.recordTransactions(stx)
return await(ExternalOperation(serviceHub) { serviceHub, _ ->
serviceHub.vaultService.queryBy<DummyState>().states.single().state.data == state
})
}
}
abstract class FlowWithExternalOperationThatPersistsToDatabase : FlowLogic<Boolean>() {
@Suspendable
override fun call(): Boolean {
val (entityOne, entityTwo, entityThree) = saveToDatabase()
return serviceHub.cordaService(FutureService::class.java).readFromDatabase(entityOne.name) == entityOne &&
serviceHub.cordaService(FutureService::class.java).readFromDatabase(entityTwo.name) == entityTwo &&
serviceHub.cordaService(FutureService::class.java).readFromDatabase(entityThree.name) == entityThree
}
@Suspendable
abstract fun saveToDatabase(): Triple<CustomTableEntity, CustomTableEntity, CustomTableEntity>
}
@StartableByRPC
class FlowWithExternalOperationThatPersistsViaEntityManager : FlowWithExternalOperationThatPersistsToDatabase() {
@Suspendable
override fun saveToDatabase(): Triple<CustomTableEntity, CustomTableEntity, CustomTableEntity> {
val entityOne = CustomTableEntity("Darth Vader", "I find your lack of faith disturbing.")
val entityTwo = CustomTableEntity("Obi-Wan Kenobi", "The Force will be with you. Always.")
val entityThree = CustomTableEntity("Admiral Ackbar", "Its a trap!")
await(ExternalOperation(serviceHub) { serviceHub, _ ->
serviceHub.cordaService(FutureService::class.java).saveToDatabaseWithEntityManager(entityOne)
serviceHub.cordaService(FutureService::class.java).saveToDatabaseWithEntityManager(entityTwo)
serviceHub.cordaService(FutureService::class.java).saveToDatabaseWithEntityManager(entityThree)
})
return Triple(entityOne, entityTwo, entityThree)
}
}
@StartableByRPC
class FlowWithExternalOperationThatPersistsViaJdbcSession : FlowWithExternalOperationThatPersistsToDatabase() {
@Suspendable
override fun saveToDatabase(): Triple<CustomTableEntity, CustomTableEntity, CustomTableEntity> {
val entityOne = CustomTableEntity("Tony Stark", "I am Iron Man.")
val entityTwo = CustomTableEntity("Captain America", "I can do this all day.")
val entityThree = CustomTableEntity("Hulk", "Puny god.")
await(ExternalOperation(serviceHub) { serviceHub, _ ->
serviceHub.cordaService(FutureService::class.java).saveToDatabaseWithJdbcSession(entityOne)
serviceHub.cordaService(FutureService::class.java).saveToDatabaseWithJdbcSession(entityTwo)
serviceHub.cordaService(FutureService::class.java).saveToDatabaseWithJdbcSession(entityThree)
})
return Triple(entityOne, entityTwo, entityThree)
}
}
@StartableByRPC
class FlowWithExternalOperationThatPersistsViaDatabaseTransaction : FlowWithExternalOperationThatPersistsToDatabase() {
@Suspendable
override fun saveToDatabase(): Triple<CustomTableEntity, CustomTableEntity, CustomTableEntity> {
val entityOne = CustomTableEntity("Groot", "We are Groot.")
val entityTwo = CustomTableEntity("Drax", "Nothing goes over my head. My reflexes are too fast. I would catch it.")
val entityThree = CustomTableEntity("Doctor Strange", "Dormammu, Ive come to bargain.")
await(ExternalOperation(serviceHub) { serviceHub, _ ->
serviceHub.cordaService(FutureService::class.java).saveToDatabaseWithDatabaseTransaction(entityOne)
serviceHub.cordaService(FutureService::class.java).saveToDatabaseWithDatabaseTransaction(entityTwo)
serviceHub.cordaService(FutureService::class.java).saveToDatabaseWithDatabaseTransaction(entityThree)
})
return Triple(entityOne, entityTwo, entityThree)
}
}
@StartableByRPC
class FlowWithExternalOperationThatPersistsToDatabaseAndReadsFromExternalOperation : FlowLogic<Boolean>() {
@Suspendable
override fun call(): Boolean {
val entityOne = CustomTableEntity("Emperor Palpatine", "Now, young Skywalker, you will die.")
val entityTwo = CustomTableEntity("Yoda", "My ally is the Force, and a powerful ally it is.")
val entityThree = CustomTableEntity("Han Solo", "Never tell me the odds!")
await(ExternalOperation(serviceHub) { serviceHub, _ ->
serviceHub.cordaService(FutureService::class.java).saveToDatabaseWithEntityManager(entityOne)
serviceHub.cordaService(FutureService::class.java).saveToDatabaseWithJdbcSession(entityTwo)
serviceHub.cordaService(FutureService::class.java).saveToDatabaseWithDatabaseTransaction(entityThree)
})
return await(ExternalOperation(serviceHub) { serviceHub, _ ->
return@ExternalOperation serviceHub.cordaService(FutureService::class.java).readFromDatabase(entityOne.name) == entityOne &&
serviceHub.cordaService(FutureService::class.java).readFromDatabase(entityTwo.name) == entityTwo &&
serviceHub.cordaService(FutureService::class.java).readFromDatabase(entityThree.name) == entityThree
})
}
}
@StartableByRPC
class FlowWithExternalOperationThatErrorsInsideOfDatabaseTransaction(party: Party) : FlowWithExternalProcess(party) {
private companion object {
var flag = false
}
@Suspendable
override fun testCode(): Boolean {
return await(ExternalOperation(serviceHub) { serviceHub, _ ->
if (!flag) {
flag = true
serviceHub.cordaService(FutureService::class.java).throwExceptionInsideOfDatabaseTransaction()
} else {
val entity = CustomTableEntity("Emperor Palpatine", "Now, young Skywalker, you will die.")
serviceHub.cordaService(FutureService::class.java).saveToDatabaseWithDatabaseTransaction(entity)
return@ExternalOperation serviceHub.cordaService(FutureService::class.java).readFromDatabase(entity.name) != null
}
})
}
}
}

View File

@ -0,0 +1,67 @@
package net.corda.core.flows
import net.corda.core.internal.ServiceHubCoreInternal
import net.corda.core.node.ServiceHub
import java.util.concurrent.CompletableFuture
/**
* [FlowExternalAsyncOperation] represents an external future that blocks a flow from continuing until the future returned by
* [FlowExternalAsyncOperation.execute] has completed. Examples of external processes where [FlowExternalAsyncOperation] would be useful
* include, triggering a long running process on an external system or retrieving information from a service that might be down.
*
* The flow will suspend while it is blocked to free up a flow worker thread, which allows other flows to continue processing while waiting
* for the result of this process.
*
* Implementations of [FlowExternalAsyncOperation] should ideally hold references to any external values required by [execute]. These
* references should be passed into the implementation's constructor. For example, an amount or a reference to a Corda Service could be
* passed in.
*
* It is discouraged to insert into the node's database from a [FlowExternalAsyncOperation], except for keeping track of [deduplicationId]s
* that have been processed. It is possible to interact with the database from inside a [FlowExternalAsyncOperation] but, for most
* operations, is not currently supported.
*/
interface FlowExternalAsyncOperation<R : Any> {
/**
* Executes a future.
*
* The future created and returned from [execute] must handle its own threads. If a new thread is not spawned or taken from a thread
* pool, then the flow worker thread will be used. This removes any benefit from using an [FlowExternalAsyncOperation].
*
* @param deduplicationId If the flow restarts from a checkpoint (due to node restart, or via a visit to the flow
* hospital following an error) the execute method might be called more than once by the Corda flow state machine.
* For each duplicate call, the deduplicationId is guaranteed to be the same allowing duplicate requests to be
* de-duplicated if necessary inside the execute method.
*/
fun execute(deduplicationId: String): CompletableFuture<R>
}
/**
* [FlowExternalOperation] represents an external process that blocks a flow from continuing until the result of [execute]
* has been retrieved. Examples of external processes where [FlowExternalOperation] would be useful include, triggering a long running
* process on an external system or retrieving information from a service that might be down.
*
* The flow will suspend while it is blocked to free up a flow worker thread, which allows other flows to continue processing while waiting
* for the result of this process.
*
* Implementations of [FlowExternalOperation] should ideally hold references to any external values required by [execute]. These references
* should be passed into the implementation's constructor. For example, an amount or a reference to a Corda Service could be passed in.
*
* It is discouraged to insert into the node's database from a [FlowExternalOperation], except for keeping track of [deduplicationId]s that
* have been processed. It is possible to interact with the database from inside a [FlowExternalOperation] but, for most operations, is not
* currently supported.
*/
interface FlowExternalOperation<R : Any> {
/**
* Executes a blocking operation.
*
* The execution of [execute] will be run on a thread from the node's external process thread pool when called by [FlowLogic.await].
*
* @param deduplicationId If the flow restarts from a checkpoint (due to node restart, or via a visit to the flow
* hospital following an error) the execute method might be called more than once by the Corda flow state machine.
* For each duplicate call, the deduplicationId is guaranteed to be the same allowing duplicate requests to be
* de-duplicated if necessary inside the execute method.
*/
fun execute(deduplicationId: String): R
}

View File

@ -4,13 +4,22 @@ import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import net.corda.core.CordaInternal
import net.corda.core.DeleteForDJVM
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.SecureHash
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.*
import net.corda.core.internal.FlowAsyncOperation
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.ServiceHubCoreInternal
import net.corda.core.internal.WaitForStateConsumption
import net.corda.core.internal.abbreviate
import net.corda.core.internal.checkPayloadIs
import net.corda.core.internal.concurrent.asCordaFuture
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.DataFeed
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
@ -24,7 +33,10 @@ import net.corda.core.utilities.debug
import net.corda.core.utilities.toNonEmptySet
import org.slf4j.Logger
import java.time.Duration
import java.util.*
import java.util.HashMap
import java.util.LinkedHashMap
import java.util.concurrent.CompletableFuture
import java.util.function.Supplier
/**
* A sub-class of [FlowLogic<T>] implements a flow using direct, straight line blocking code. Thus you
@ -432,7 +444,12 @@ abstract class FlowLogic<out T> {
* @param stateRefs the StateRefs which will be consumed in the future.
*/
@Suspendable
fun waitForStateConsumption(stateRefs: Set<StateRef>) = executeAsync(WaitForStateConsumption(stateRefs, serviceHub))
fun waitForStateConsumption(stateRefs: Set<StateRef>) {
// Manually call the equivalent of [await] to remove extra wrapping of objects
// Makes serializing of object easier for [CheckpointDumper] as well
val request = FlowIORequest.ExecuteAsyncOperation(WaitForStateConsumption(stateRefs, serviceHub))
return stateMachine.suspend(request, false)
}
/**
* Returns a shallow copy of the Quasar stack frames at the time of call to [flowStackSnapshot]. Use this to inspect
@ -503,6 +520,72 @@ abstract class FlowLogic<out T> {
private fun <R> castMapValuesToKnownType(map: Map<FlowSession, UntrustworthyData<Any>>): List<UntrustworthyData<R>> {
return map.values.map { uncheckedCast<Any, UntrustworthyData<R>>(it) }
}
/**
* Executes the specified [operation] and suspends until operation completion.
*
* An implementation of [FlowExternalAsyncOperation] should be provided that creates a new future that the state machine awaits
* completion of.
*
*/
@Suspendable
fun <R : Any> await(operation: FlowExternalAsyncOperation<R>): R {
// Wraps the passed in [FlowExternalAsyncOperation] so its [CompletableFuture] can be converted into a [CordaFuture]
val flowAsyncOperation = object : FlowAsyncOperation<R>, WrappedFlowExternalAsyncOperation<R> {
override val operation = operation
override fun execute(deduplicationId: String): CordaFuture<R> {
return this.operation.execute(deduplicationId).asCordaFuture()
}
}
val request = FlowIORequest.ExecuteAsyncOperation(flowAsyncOperation)
return stateMachine.suspend(request, false)
}
/**
* Executes the specified [operation] and suspends until operation completion.
*
* An implementation of [FlowExternalOperation] should be provided that returns a result which the state machine will run on a separate
* thread (using the node's external operation thread pool).
*
*/
@Suspendable
fun <R : Any> await(operation: FlowExternalOperation<R>): R {
val flowAsyncOperation = object : FlowAsyncOperation<R>, WrappedFlowExternalOperation<R> {
override val serviceHub = this@FlowLogic.serviceHub as ServiceHubCoreInternal
override val operation = operation
override fun execute(deduplicationId: String): CordaFuture<R> {
// Using a [CompletableFuture] allows unhandled exceptions to be thrown inside the background operation
// the exceptions will be set on the future by [CompletableFuture.AsyncSupply.run]
return CompletableFuture.supplyAsync(
Supplier { this.operation.execute(deduplicationId) },
serviceHub.externalOperationExecutor
).asCordaFuture()
}
}
val request = FlowIORequest.ExecuteAsyncOperation(flowAsyncOperation)
return stateMachine.suspend(request, false)
}
}
/**
* [WrappedFlowExternalAsyncOperation] is added to allow jackson to properly reference the data stored within the wrapped
* [FlowExternalAsyncOperation].
*/
private interface WrappedFlowExternalAsyncOperation<R : Any> {
val operation: FlowExternalAsyncOperation<R>
}
/**
* [WrappedFlowExternalOperation] is added to allow jackson to properly reference the data stored within the wrapped
* [FlowExternalOperation].
*
* The reference to [ServiceHub] is is also needed by Kryo to properly keep a reference to [ServiceHub] so that
* [FlowExternalOperation] can be run from the [ServiceHubCoreInternal.externalOperationExecutor] without causing errors when retrying a
* flow. A [NullPointerException] is thrown if [FlowLogic.serviceHub] is accessed from [FlowLogic.await] when retrying a flow.
*/
private interface WrappedFlowExternalOperation<R : Any> {
val serviceHub: ServiceHub
val operation: FlowExternalOperation<R>
}
/**

View File

@ -5,7 +5,6 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.FlowLogic
import net.corda.core.serialization.CordaSerializable
// DOCSTART FlowAsyncOperation
/**
* Interface for arbitrary operations that can be invoked in a flow asynchronously - the flow will suspend until the
* operation completes. Operation parameters are expected to be injected via constructor.
@ -21,13 +20,14 @@ interface FlowAsyncOperation<R : Any> {
*/
fun execute(deduplicationId: String): CordaFuture<R>
}
// DOCEND FlowAsyncOperation
// DOCSTART executeAsync
/** Executes the specified [operation] and suspends until operation completion. */
@Deprecated(
"This has been replaced by [FlowLogic.await] that provides an improved and public API",
ReplaceWith("net.corda.core.flows.FlowLogic.await")
)
@Suspendable
fun <T, R : Any> FlowLogic<T>.executeAsync(operation: FlowAsyncOperation<R>, maySkipCheckpoint: Boolean = false): R {
val request = FlowIORequest.ExecuteAsyncOperation(operation)
return stateMachine.suspend(request, maySkipCheckpoint)
}
// DOCEND executeAsync

View File

@ -4,11 +4,14 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.DeleteForDJVM
import net.corda.core.node.ServiceHub
import net.corda.core.node.StatesToRecord
import java.util.concurrent.ExecutorService
// TODO: This should really be called ServiceHubInternal but that name is already taken by net.corda.node.services.api.ServiceHubInternal.
@DeleteForDJVM
interface ServiceHubCoreInternal : ServiceHub {
val externalOperationExecutor: ExecutorService
val attachmentTrustCalculator: AttachmentTrustCalculator
fun createTransactionsResolver(flow: ResolveTransactionsFlow): TransactionsResolver

View File

@ -27,6 +27,17 @@ fun <V, W, X> CordaFuture<out V>.thenMatch(success: (V) -> W, failure: (Throwabl
/** When this future is done and the outcome is failure, log the throwable. */
fun CordaFuture<*>.andForget(log: Logger) = thenMatch({}, { log.error("Background task failed:", it) })
fun <RESULT> CordaFuture<out RESULT>.doOnComplete(accept: (RESULT) -> Unit): CordaFuture<RESULT> {
return CordaFutureImpl<RESULT>().also { result ->
thenMatch({
accept(it)
result.capture { it }
}, {
result.setException(it)
})
}
}
/**
* Returns a future that will have an outcome of applying the given transform to this future's value.
* But if this future fails, the transform is not invoked and the returned future becomes done with the same throwable.

View File

@ -1,7 +1,6 @@
package net.corda.core.internal.notary
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.Crypto
@ -9,16 +8,16 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignableData
import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.FlowExternalAsyncOperation
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.identity.Party
import net.corda.core.internal.FlowAsyncOperation
import net.corda.core.internal.executeAsync
import net.corda.core.internal.notary.UniquenessProvider.Result
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.contextLogger
import org.slf4j.Logger
import java.time.Duration
import java.util.concurrent.CompletableFuture
/** Base implementation for a notary service operated by a singe party. */
abstract class SinglePartyNotaryService : NotaryService() {
@ -48,7 +47,7 @@ abstract class SinglePartyNotaryService : NotaryService() {
val callingFlow = FlowLogic.currentTopLevel
?: throw IllegalStateException("This method should be invoked in a flow context.")
val result = callingFlow.executeAsync(
val result = callingFlow.await(
CommitOperation(
this,
inputs,
@ -87,10 +86,10 @@ abstract class SinglePartyNotaryService : NotaryService() {
val requestSignature: NotarisationRequestSignature,
val timeWindow: TimeWindow?,
val references: List<StateRef>
) : FlowAsyncOperation<Result> {
) : FlowExternalAsyncOperation<Result> {
override fun execute(deduplicationId: String): CordaFuture<Result> {
return service.uniquenessProvider.commit(inputs, txId, caller, requestSignature, timeWindow, references)
override fun execute(deduplicationId: String): CompletableFuture<Result> {
return service.uniquenessProvider.commit(inputs, txId, caller, requestSignature, timeWindow, references).toCompletableFuture()
}
}

View File

@ -880,15 +880,577 @@ We then update the progress tracker's current step as we progress through the fl
:end-before: DOCEND 18
:dedent: 12
HTTP and database calls
-----------------------
HTTP, database and other calls to external resources are allowed in flows. However, their support is currently limited:
.. _api_flows_external_operations:
* The call must be executed in a BLOCKING way. Flows don't currently support suspending to await the response to a call to an external resource
Calling external systems inside of flows
------------------------------------------
Flows provide the ability to await the result of an external operation running outside of the context of a flow. A flow will suspend while
awaiting a result. This frees up a flow worker thread to continuing processing other flows.
* For this reason, the call should be provided with a timeout to prevent the flow from suspending forever. If the timeout elapses, this should be treated as a soft failure and handled by the flow's business logic
.. note::
* The call must be idempotent. If the flow fails and has to restart from a checkpoint, the call will also be replayed
Flow worker threads belong to the thread pool that executes flows.
Examples of where this functionality is useful include:
* Triggering a long running process on an external system
* Retrieving information from a external service that might go down
``FlowLogic`` provides two ``await`` functions that allow custom operations to be defined and executed outside of the context of a flow.
Below are the interfaces that must be implemented and passed into ``await``, along with brief descriptions of what they do:
* ``FlowExternalOperation`` - An operation that returns a result which should be run using a thread from one of the node's
thread pools.
* ``FlowExternalAsyncOperation`` - An operation that returns a future which should be run on a thread provided to its implementation.
Threading needs to be explicitly handled when using ``FlowExternalAsyncOperation``.
FlowExternalOperation
^^^^^^^^^^^^^^^^^^^^^
``FlowExternalOperation`` allows developers to write an operation that will run on a thread provided by the node's flow external operation
thread pool.
.. note::
The size of the external operation thread pool can be configured, see :ref:`the node configuration documentation <corda_configuration_flow_external_operation_thread_pool_size>`.
Below is an example of how ``FlowExternalOperation`` can be called from a flow to run an operation on a new thread, allowing the flow to suspend:
.. container:: codeset
.. sourcecode:: kotlin
@StartableByRPC
class FlowUsingFlowExternalOperation : FlowLogic<Unit>() {
@Suspendable
override fun call() {
// Other flow operations
// Call [FlowLogic.await] to execute an external operation
// The result of the operation is returned to the flow
val response: Response = await(
// Pass in an implementation of [FlowExternalOperation]
RetrieveDataFromExternalSystem(
serviceHub.cordaService(ExternalService::class.java),
Data("amount", 1)
)
)
// Other flow operations
}
class RetrieveDataFromExternalSystem(
private val externalService: ExternalService,
private val data: Data
) : FlowExternalOperation<Response> {
// Implement [execute] which will be run on a thread outside of the flow's context
override fun execute(deduplicationId: String): Response {
return externalService.retrieveDataFromExternalSystem(deduplicationId, data)
}
}
}
@CordaService
class ExternalService(serviceHub: AppServiceHub) : SingletonSerializeAsToken() {
private val client: OkHttpClient = OkHttpClient()
fun retrieveDataFromExternalSystem(deduplicationId: String, data: Data): Response {
return try {
// [DeduplicationId] passed into the request so the external system can handle deduplication
client.newCall(
Request.Builder().url("https://externalsystem.com/endpoint/$deduplicationId").post(
RequestBody.create(
MediaType.parse("text/plain"), data.toString()
)
).build()
).execute()
} catch (e: IOException) {
// Handle checked exception
throw HospitalizeFlowException("External API call failed", e)
}
}
}
data class Data(val name: String, val value: Any)
.. sourcecode:: java
@StartableByRPC
public class FlowUsingFlowExternalOperation extends FlowLogic<Void> {
@Override
@Suspendable
public Void call() {
// Other flow operations
// Call [FlowLogic.await] to execute an external operation
// The result of the operation is returned to the flow
Response response = await(
// Pass in an implementation of [FlowExternalOperation]
new RetrieveDataFromExternalSystem(
getServiceHub().cordaService(ExternalService.class),
new Data("amount", 1)
)
);
// Other flow operations
return null;
}
public class RetrieveDataFromExternalSystem implements FlowExternalOperation<Response> {
private ExternalService externalService;
private Data data;
public RetrieveDataFromExternalSystem(ExternalService externalService, Data data) {
this.externalService = externalService;
this.data = data;
}
// Implement [execute] which will be run on a thread outside of the flow's context
@Override
public Response execute(String deduplicationId) {
return externalService.retrieveDataFromExternalSystem(deduplicationId, data);
}
}
}
@CordaService
public class ExternalService extends SingletonSerializeAsToken {
private OkHttpClient client = new OkHttpClient();
public ExternalService(AppServiceHub serviceHub) { }
public Response retrieveDataFromExternalSystem(String deduplicationId, Data data) {
try {
// [DeduplicationId] passed into the request so the external system can handle deduplication
return client.newCall(
new Request.Builder().url("https://externalsystem.com/endpoint/" + deduplicationId).post(
RequestBody.create(
MediaType.parse("text/plain"), data.toString()
)
).build()
).execute();
} catch (IOException e) {
// Must handle checked exception
throw new HospitalizeFlowException("External API call failed", e);
}
}
}
public class Data {
private String name;
private Object value;
public Data(String name, Object value) {
this.name = name;
this.value = value;
}
public String getName() {
return name;
}
public Object getValue() {
return value;
}
}
In summary, the following steps are taken in the code above:
* ``ExternalService`` is a Corda service that provides a way to contact an external system (by HTTP in this example).
* ``ExternalService.retrieveDataFromExternalSystem`` is passed a ``deduplicationId`` which is included as part of the request to the
external system. The external system, in this example, will handle deduplication and return the previous result if it was already
computed.
* An implementation of ``FlowExternalOperation`` (``RetrieveDataFromExternalSystem``) is created that calls ``ExternalService.retrieveDataFromExternalSystem``.
* ``RetrieveDataFromExternalSystem`` is then passed into ``await`` to execute the code contained in ``RetrieveDataFromExternalSystem.execute``.
* The result of ``RetrieveDataFromExternalSystem.execute`` is then returned to the flow once its execution finishes.
FlowExternalAsyncOperation
^^^^^^^^^^^^^^^^^^^^^^^^^^
``FlowExternalAsyncOperation`` allows developers to write an operation that returns a future whose threading is handled within the CorDapp.
.. warning::
Threading must be explicitly controlled when using ``FlowExternalAsyncOperation``. A future will be run on its current flow worker
thread if a new thread is not spawned or provided by a thread pool. This prevents the flow worker thread from freeing up and allowing
another flow to take control and run.
Implementations of ``FlowExternalAsyncOperation`` must return a ``CompletableFuture``. How this future is created is up to the developer.
It is recommended to use ``CompletableFuture.supplyAsync`` and supply an executor to run the future on. Other libraries can be used to
generate futures, as long as a ``CompletableFuture`` is returned out of ``FlowExternalAsyncOperation``. An example of creating a future
using :ref:`Guava's ListenableFuture <api_flows_guava_future_conversion>` is given in a following section.
.. note::
The future can be chained to execute further operations that continue using the same thread the future started on. For example,
``CompletableFuture``'s ``whenComplete``, ``exceptionally`` or ``thenApply`` could be used (their async versions are also valid).
Below is an example of how ``FlowExternalAsyncOperation`` can be called from a flow:
.. container:: codeset
.. sourcecode:: kotlin
@StartableByRPC
class FlowUsingFlowExternalAsyncOperation : FlowLogic<Unit>() {
@Suspendable
override fun call() {
// Other flow operations
// Call [FlowLogic.await] to execute an external operation
// The result of the operation is returned to the flow
val response: Response = await(
// Pass in an implementation of [FlowExternalAsyncOperation]
RetrieveDataFromExternalSystem(
serviceHub.cordaService(ExternalService::class.java),
Data("amount", 1)
)
)
// Other flow operations
}
class RetrieveDataFromExternalSystem(
private val externalService: ExternalService,
private val data: Data
) : FlowExternalAsyncOperation<Response> {
// Implement [execute] which needs to be provided with a new thread to benefit from suspending the flow
override fun execute(deduplicationId: String): CompletableFuture<Response> {
return externalService.retrieveDataFromExternalSystem(deduplicationId, data)
}
}
}
@CordaService
class ExternalService(serviceHub: AppServiceHub) : SingletonSerializeAsToken() {
private val client: OkHttpClient = OkHttpClient()
// [ExecutorService] created to provide a fixed number of threads to the futures created in this service
private val executor: ExecutorService = Executors.newFixedThreadPool(
4,
ThreadFactoryBuilder().setNameFormat("external-service-thread").build()
)
fun retrieveDataFromExternalSystem(deduplicationId: String, data: Data): CompletableFuture<Response> {
// Create a [CompletableFuture] to be executed by the [FlowExternalAsyncOperation]
return CompletableFuture.supplyAsync(
Supplier {
try {
// [DeduplicationId] passed into the request so the external system can handle deduplication
client.newCall(
Request.Builder().url("https://externalsystem.com/endpoint/$deduplicationId").post(
RequestBody.create(
MediaType.parse("text/plain"), data.toString()
)
).build()
).execute()
} catch (e: IOException) {
// Handle checked exception
throw HospitalizeFlowException("External API call failed", e)
}
},
// The future must run on a new thread
executor
)
}
}
data class Data(val name: String, val value: Any)
.. sourcecode:: java
@StartableByRPC
public class FlowUsingFlowExternalAsyncOperation extends FlowLogic<Void> {
@Override
@Suspendable
public Void call() {
// Other flow operations
// Call [FlowLogic.await] to execute an external operation
// The result of the operation is returned to the flow
Response response = await(
// Pass in an implementation of [FlowExternalAsyncOperation]
new RetrieveDataFromExternalSystem(
getServiceHub().cordaService(ExternalService.class),
new Data("amount", 1)
)
);
// Other flow operations
return null;
}
public class RetrieveDataFromExternalSystem implements FlowExternalAsyncOperation<Response> {
private ExternalService externalService;
private Data data;
public RetrieveDataFromExternalSystem(ExternalService externalService, Data data) {
this.externalService = externalService;
this.data = data;
}
// Implement [execute] which needs to be provided with a new thread to benefit from suspending the flow
@Override
public CompletableFuture<Response> execute(String deduplicationId) {
return externalService.retrieveDataFromExternalSystem(deduplicationId, data);
}
}
}
@CordaService
public class ExternalService extends SingletonSerializeAsToken {
private OkHttpClient client = new OkHttpClient();
// [ExecutorService] created to provide a fixed number of threads to the futures created in this service
private ExecutorService executor = Executors.newFixedThreadPool(
4,
new ThreadFactoryBuilder().setNameFormat("external-service-thread").build()
);
public ExternalService(AppServiceHub serviceHub) { }
public CompletableFuture<Response> retrieveDataFromExternalSystem(String deduplicationId, Data data) {
// Create a [CompletableFuture] to be executed by the [FlowExternalAsyncOperation]
return CompletableFuture.supplyAsync(
() -> {
try {
// [DeduplicationId] passed into the request so the external system can handle deduplication
return client.newCall(
new Request.Builder().url("https://externalsystem.com/endpoint/" + deduplicationId).post(
RequestBody.create(
MediaType.parse("text/plain"), data.toString()
)
).build()
).execute();
} catch (IOException e) {
// Must handle checked exception
throw new HospitalizeFlowException("External API call failed", e);
}
},
// The future must run on a new thread
executor
);
}
}
public class Data {
private String name;
private Object value;
public Data(String name, Object value) {
this.name = name;
this.value = value;
}
public String getName() {
return name;
}
public Object getValue() {
return value;
}
}
In summary, the following steps are taken in the code above:
* ``ExternalService`` is a Corda service that provides a way to contact an external system (by HTTP in this example).
* ``ExternalService.retrieveDataFromExternalSystem`` is passed a ``deduplicationId`` which is included as part of the request to the
external system. The external system, in this example, will handle deduplication and return the previous result if it was already
computed.
* A ``CompletableFuture`` is created that contacts the external system. ``CompletableFuture.supplyAsync`` takes in a reference to the
``ExecutorService`` which will provide a thread for the external operation to run on.
* An implementation of ``FlowExternalAsyncOperation`` (``RetrieveDataFromExternalSystem``) is created that calls the ``ExternalService.retrieveDataFromExternalSystem``.
* ``RetrieveDataFromExternalSystem`` is then passed into ``await`` to execute the code contained in ``RetrieveDataFromExternalSystem.execute``.
* The result of ``RetrieveDataFromExternalSystem.execute`` is then returned to the flow once its execution finishes.
Handling deduplication in external operations
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
A Flow has the ability to rerun from any point where it suspends. Due to this, a flow can execute code multiple times depending on where it
retries. For context contained inside a flow, values will be reset to their state recorded at the last suspension point. This makes most
properties existing inside a flow safe when retrying. External operations do not have the same guarantees as they are executed outside of
the context of flows.
External operations are provided with a ``deduplicationId`` to allow CorDapps to decide whether to run the operation again or return a
result retrieved from a previous attempt. How deduplication is handled depends on the CorDapp and how the external system works. For
example, an external system might already handle this scenario and return the result from a previous calculation or it could be idempotent
and can be safely executed multiple times.
.. warning::
There is no inbuilt deduplication for external operations. Any deduplication must be explicitly handled in whatever way is
appropriate for the CorDapp and external system.
The ``deduplicationId`` passed to an external operation is constructed from its calling flow's ID and the number of suspends the flow has
made. Therefore, the ``deduplicationId`` is guaranteed to be the same on a retry and will never be used again once the flow has successfully
reached its next suspension point.
.. note::
Any external operations that did not finish processing (or were kept in the flow hospital due to an error) will be retried upon node
restart.
Below are examples of how deduplication could be handled:
* The external system records successful computations and returns previous results if requested again.
* The external system is idempotent, meaning the computation can be made multiple times without altering any state (similar to the point above).
* An extra external service maintains a record of deduplication IDs.
* Recorded inside of the node's database.
.. note::
Handling deduplication on the external system's side is preferred compared to handling it inside of the node.
.. warning::
In-memory data structures should not be used for handling deduplication as their state will not survive node restarts.
.. _api_flows_guava_future_conversion:
Creating CompletableFutures from Guava's ListenableFutures
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The code below demonstrates how to convert a ``ListenableFuture`` into a ``CompletableFuture``, allowing the result to be executed using a
``FlowExternalAsyncOperation``.
.. container:: codeset
.. sourcecode:: kotlin
@CordaService
class ExternalService(serviceHub: AppServiceHub) : SingletonSerializeAsToken() {
private val client: OkHttpClient = OkHttpClient()
// Guava's [ListeningExecutorService] created to supply a fixed number of threads
private val guavaExecutor: ListeningExecutorService = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
4,
ThreadFactoryBuilder().setNameFormat("guava-thread").build()
)
)
fun retrieveDataFromExternalSystem(deduplicationId: String, data: Data): CompletableFuture<Response> {
// Create a Guava [ListenableFuture]
val guavaFuture: ListenableFuture<Response> = guavaExecutor.submit(Callable<Response> {
try {
// [DeduplicationId] passed into the request so the external system can handle deduplication
client.newCall(
Request.Builder().url("https://externalsystem.com/endpoint/$deduplicationId").post(
RequestBody.create(
MediaType.parse("text/plain"), data.toString()
)
).build()
).execute()
} catch (e: IOException) {
// Handle checked exception
throw HospitalizeFlowException("External API call failed", e)
}
})
// Create a [CompletableFuture]
return object : CompletableFuture<Response>() {
override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
return guavaFuture.cancel(mayInterruptIfRunning).also {
super.cancel(mayInterruptIfRunning)
}
}
}.also { completableFuture ->
// Create a callback that completes the returned [CompletableFuture] when the underlying [ListenableFuture] finishes
val callback = object : FutureCallback<Response> {
override fun onSuccess(result: Response?) {
completableFuture.complete(result)
}
override fun onFailure(t: Throwable) {
completableFuture.completeExceptionally(t)
}
}
// Register the callback
Futures.addCallback(guavaFuture, callback, guavaExecutor)
}
}
}
.. sourcecode:: java
@CordaService
public class ExternalService extends SingletonSerializeAsToken {
private OkHttpClient client = new OkHttpClient();
public ExternalService(AppServiceHub serviceHub) { }
private ListeningExecutorService guavaExecutor = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
4,
new ThreadFactoryBuilder().setNameFormat("guava-thread").build()
)
);
public CompletableFuture<Response> retrieveDataFromExternalSystem(String deduplicationId, Data data) {
// Create a Guava [ListenableFuture]
ListenableFuture<Response> guavaFuture = guavaExecutor.submit(() -> {
try {
// [DeduplicationId] passed into the request so the external system can handle deduplication
return client.newCall(
new Request.Builder().url("https://externalsystem.com/endpoint/" + deduplicationId).post(
RequestBody.create(
MediaType.parse("text/plain"), data.toString()
)
).build()
).execute();
} catch (IOException e) {
// Must handle checked exception
throw new HospitalizeFlowException("External API call failed", e);
}
});
// Create a [CompletableFuture]
CompletableFuture<Response> completableFuture = new CompletableFuture<Response>() {
// If the returned [CompletableFuture] is cancelled then the underlying [ListenableFuture] must be cancelled as well
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean result = guavaFuture.cancel(mayInterruptIfRunning);
super.cancel(mayInterruptIfRunning);
return result;
}
};
// Create a callback that completes the returned [CompletableFuture] when the underlying [ListenableFuture] finishes
FutureCallback<Response> callback = new FutureCallback<Response>() {
@Override
public void onSuccess(Response result) {
completableFuture.complete(result);
}
@Override
public void onFailure(Throwable t) {
completableFuture.completeExceptionally(t);
}
};
// Register the callback
Futures.addCallback(guavaFuture, callback, guavaExecutor);
return completableFuture;
}
}
In the code above:
* A ``ListenableFuture`` is created and receives a thread from the ``ListeningExecutorService``. This future does all the processing.
* A ``CompletableFuture`` is created, so that it can be returned to and executed by a ``FlowExternalAsyncOperation``.
* A ``FutureCallback`` is registered to the ``ListenableFuture``, which will complete the ``CompletableFuture`` (either successfully or
exceptionally) depending on the outcome of the ``ListenableFuture``.
* ``CompletableFuture.cancel`` is overridden to propagate its cancellation down to the underlying ``ListenableFuture``.
Concurrency, Locking and Waiting
--------------------------------

View File

@ -14,68 +14,10 @@ How to add suspending operations
--------------------------------
To add a suspending operation for a simple request-response type function that perhaps involves some external IO we can
use the internal ``FlowAsyncOperation`` interface.
use ``FlowExternalOperation`` or ``FlowExternalAsyncOperation``. These interfaces represent the public versions of the internal
``FlowAsyncOperation``.
.. container:: codeset
.. literalinclude:: ../../core/src/main/kotlin/net/corda/core/internal/FlowAsyncOperation.kt
:language: kotlin
:start-after: DOCSTART FlowAsyncOperation
:end-before: DOCEND FlowAsyncOperation
Let's imagine we want to add a suspending operation that takes two integers and returns their sum. To do this we
implement ``FlowAsyncOperation``:
.. container:: codeset
.. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/kotlin/tutorial/flowstatemachines/TutorialFlowAsyncOperation.kt
:language: kotlin
:start-after: DOCSTART SummingOperation
:end-before: DOCEND SummingOperation
.. literalinclude:: ../../docs/source/example-code/src/main/java/net/corda/docs/java/tutorial/flowstatemachines/SummingOperation.java
:language: java
:start-after: DOCSTART SummingOperation
:end-before: DOCEND SummingOperation
As we can see the constructor of ``SummingOperation`` takes the two numbers, and the ``execute`` function simply returns
a future that is immediately completed by the result of summing the numbers. Note how we don't use ``@Suspendable`` on
``execute``, this is because we'll never suspend inside this function, the suspension will happen before we're calling
it.
Note also how the input numbers are stored in the class as fields. This is important, because in the flow's checkpoint
we'll store an instance of this class whenever we're suspending on such an operation. If the node fails or restarts
while the operation is underway this class will be deserialized from the checkpoint and ``execute`` will be called
again.
Now we can use the internal function ``executeAsync`` to execute this operation from a flow.
.. container:: codeset
.. literalinclude:: ../../core/src/main/kotlin/net/corda/core/internal/FlowAsyncOperation.kt
:language: kotlin
:start-after: DOCSTART executeAsync
:end-before: DOCEND executeAsync
It simply takes a ``FlowAsyncOperation`` and an optional flag we don't care about for now. We can use this function in a
flow:
.. container:: codeset
.. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/kotlin/tutorial/flowstatemachines/TutorialFlowAsyncOperation.kt
:language: kotlin
:start-after: DOCSTART ExampleSummingFlow
:end-before: DOCEND ExampleSummingFlow
.. literalinclude:: ../../docs/source/example-code/src/main/java/net/corda/docs/java/tutorial/flowstatemachines/ExampleSummingFlow.java
:language: java
:start-after: DOCSTART ExampleSummingFlow
:end-before: DOCEND ExampleSummingFlow
That's it! Obviously this is a mostly useless example, but this is the basic code structure one could extend for heavier
computations/other IO. For example the function could call into a ``CordaService`` or something similar. One thing to
note is that the operation executed in ``execute`` must be redoable(= "idempotent") in case the node fails before the
next checkpoint is committed.
See :ref:`calling external systems inside of flows <api_flows_external_operations>` for more information on these public interfaces.
How to test
-----------

View File

@ -311,6 +311,13 @@ extraNetworkMapKeys
*Default:* not defined
.. _corda_configuration_flow_external_operation_thread_pool_size:
flowExternalOperationThreadPoolSize
The number of threads available to execute external operations called from flows. See the documentation on
:ref:`calling external systems inside of flows <api_flows_external_operations>` for more information.
*Default:* Set to the number of available cores on the machine the node is running on
flowMonitorPeriodMillis
Duration of the period suspended flows waiting for IO are logged.

View File

@ -1,38 +1,31 @@
package net.corda.docs.kotlin.tutorial.flowstatemachines
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.FlowExternalAsyncOperation
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.internal.FlowAsyncOperation
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.executeAsync
import java.util.concurrent.CompletableFuture
// DOCSTART SummingOperation
class SummingOperation(val a: Int, val b: Int) : FlowAsyncOperation<Int> {
override fun execute(deduplicationId: String): CordaFuture<Int> {
return doneFuture(a + b)
class SummingOperation(val a: Int, val b: Int) : FlowExternalAsyncOperation<Int> {
override fun execute(deduplicationId: String): CompletableFuture<Int> {
return CompletableFuture.completedFuture(a + b)
}
}
// DOCEND SummingOperation
// DOCSTART SummingOperationThrowing
class SummingOperationThrowing(val a: Int, val b: Int) : FlowAsyncOperation<Int> {
override fun execute(deduplicationId: String): CordaFuture<Int> {
class SummingOperationThrowing(val a: Int, val b: Int) : FlowExternalAsyncOperation<Int> {
override fun execute(deduplicationId: String): CompletableFuture<Int> {
throw IllegalStateException("You shouldn't be calling me")
}
}
// DOCEND SummingOperationThrowing
// DOCSTART ExampleSummingFlow
@StartableByRPC
class ExampleSummingFlow : FlowLogic<Int>() {
@Suspendable
override fun call(): Int {
val answer = executeAsync(SummingOperation(1, 2))
val answer = await(SummingOperation(1, 2))
return answer // hopefully 3
}
}
// DOCEND ExampleSummingFlow

View File

@ -121,6 +121,7 @@ abstract class StatemachineErrorHandlingTest {
}
}
// Internal use for testing only!!
@StartableByRPC
class GetHospitalCountersFlow : FlowLogic<HospitalCounts>() {
override fun call(): HospitalCounts =

View File

@ -35,6 +35,7 @@ import java.sql.SQLTransientConnectionException
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
@ -335,13 +336,13 @@ class AsyncRetryFlow() : FlowLogic<String>(), IdempotentFlow {
val deduplicationIds = mutableSetOf<String>()
}
class RecordDeduplicationId: FlowAsyncOperation<String> {
override fun execute(deduplicationId: String): CordaFuture<String> {
class RecordDeduplicationId: FlowExternalAsyncOperation<String> {
override fun execute(deduplicationId: String): CompletableFuture<String> {
val dedupeIdIsNew = deduplicationIds.add(deduplicationId)
if (dedupeIdIsNew) {
throw ExceptionToCauseFiniteRetry()
}
return doneFuture(deduplicationId)
return CompletableFuture.completedFuture(deduplicationId)
}
}
@ -350,7 +351,7 @@ class AsyncRetryFlow() : FlowLogic<String>(), IdempotentFlow {
@Suspendable
override fun call(): String {
progressTracker.currentStep = FIRST_STEP
executeAsync(RecordDeduplicationId())
await(RecordDeduplicationId())
return "Result"
}
}

View File

@ -3,6 +3,7 @@ package net.corda.node.internal
import com.codahale.metrics.MetricRegistry
import com.google.common.collect.MutableClassToInstanceMap
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.zaxxer.hikari.pool.HikariPool
import net.corda.confidential.SwapIdentitiesFlow
import net.corda.core.CordaException
@ -339,6 +340,8 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
*/
protected abstract val rxIoScheduler: Scheduler
val externalOperationExecutor = createExternalOperationExecutor(configuration.flowExternalOperationThreadPoolSize)
/**
* Completes once the node has successfully registered with the network map service
* or has loaded network map data from local database.
@ -731,6 +734,17 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
).tokenize()
}
private fun createExternalOperationExecutor(numberOfThreads: Int): ExecutorService {
when (numberOfThreads) {
1 -> log.info("Flow external operation executor has $numberOfThreads thread")
else -> log.info("Flow external operation executor has $numberOfThreads threads")
}
return Executors.newFixedThreadPool(
numberOfThreads,
ThreadFactoryBuilder().setNameFormat("flow-external-operation-thread").build()
)
}
private fun isRunningSimpleNotaryService(configuration: NodeConfiguration): Boolean {
return configuration.notary != null && configuration.notary?.className == SimpleNotaryService::class.java.name
}
@ -1111,6 +1125,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
override val networkParametersService: NetworkParametersStorage get() = this@AbstractNode.networkParametersStorage
override val attachmentTrustCalculator: AttachmentTrustCalculator get() = this@AbstractNode.attachmentTrustCalculator
override val diagnosticsService: DiagnosticsService get() = this@AbstractNode.diagnosticsService
override val externalOperationExecutor: ExecutorService get() = this@AbstractNode.externalOperationExecutor
private lateinit var _myInfo: NodeInfo
override val myInfo: NodeInfo get() = _myInfo

View File

@ -45,6 +45,10 @@ object ConfigHelper {
val smartDevMode = CordaSystemUtils.isOsMac() || (CordaSystemUtils.isOsWindows() && !CordaSystemUtils.getOsName().toLowerCase().contains("server"))
val devModeConfig = ConfigFactory.parseMap(mapOf("devMode" to smartDevMode))
// Detect the number of cores
val coreCount = Runtime.getRuntime().availableProcessors()
val multiThreadingConfig = configOf("flowExternalOperationThreadPoolSize" to coreCount.toString())
val systemOverrides = ConfigFactory.systemProperties().cordaEntriesOnly()
val environmentOverrides = ConfigFactory.systemEnvironment().cordaEntriesOnly()
val finalConfig = configOf(
@ -55,6 +59,7 @@ object ConfigHelper {
.withFallback(environmentOverrides)
.withFallback(appConfig)
.withFallback(devModeConfig) // this needs to be after the appConfig, so it doesn't override the configured devMode
.withFallback(multiThreadingConfig)
.withFallback(defaultConfig)
.resolve()

View File

@ -88,6 +88,8 @@ interface NodeConfiguration : ConfigurationWithOptionsContainer {
val blacklistedAttachmentSigningKeys: List<String>
val flowExternalOperationThreadPoolSize: Int
companion object {
// default to at least 8MB and a bit extra for larger heap sizes
val defaultTransactionCacheSize: Long = 8.MB + getAdditionalCacheMemory()

View File

@ -81,7 +81,8 @@ data class NodeConfigurationImpl(
override val networkParameterAcceptanceSettings: NetworkParameterAcceptanceSettings? =
Defaults.networkParameterAcceptanceSettings,
override val blacklistedAttachmentSigningKeys: List<String> = Defaults.blacklistedAttachmentSigningKeys,
override val configurationWithOptions: ConfigurationWithOptions
override val configurationWithOptions: ConfigurationWithOptions,
override val flowExternalOperationThreadPoolSize: Int = Defaults.flowExternalOperationThreadPoolSize
) : NodeConfiguration {
internal object Defaults {
val jmxMonitoringHttpPort: Int? = null
@ -117,6 +118,7 @@ data class NodeConfigurationImpl(
val cordappSignerKeyFingerprintBlacklist: List<String> = DEV_PUB_KEY_HASHES.map { it.toString() }
val networkParameterAcceptanceSettings: NetworkParameterAcceptanceSettings = NetworkParameterAcceptanceSettings()
val blacklistedAttachmentSigningKeys: List<String> = emptyList()
const val flowExternalOperationThreadPoolSize: Int = 1
fun cordappsDirectories(baseDirectory: Path) = listOf(baseDirectory / CORDAPPS_DIR_NAME_DEFAULT)

View File

@ -63,6 +63,7 @@ internal object V1NodeConfigurationSpec : Configuration.Specification<NodeConfig
private val networkParameterAcceptanceSettings by nested(NetworkParameterAcceptanceSettingsSpec)
.optional()
.withDefaultValue(Defaults.networkParameterAcceptanceSettings)
private val flowExternalOperationThreadPoolSize by int().optional().withDefaultValue(Defaults.flowExternalOperationThreadPoolSize)
@Suppress("unused")
private val custom by nestedObject().optional()
@Suppress("unused")
@ -126,7 +127,8 @@ internal object V1NodeConfigurationSpec : Configuration.Specification<NodeConfig
cordappSignerKeyFingerprintBlacklist = configuration[cordappSignerKeyFingerprintBlacklist],
blacklistedAttachmentSigningKeys = configuration[blacklistedAttachmentSigningKeys],
networkParameterAcceptanceSettings = configuration[networkParameterAcceptanceSettings],
configurationWithOptions = ConfigurationWithOptions(configuration, Configuration.Validation.Options.defaults)
configurationWithOptions = ConfigurationWithOptions(configuration, Configuration.Validation.Options.defaults),
flowExternalOperationThreadPoolSize = configuration[flowExternalOperationThreadPoolSize]
))
} catch (e: Exception) {
return when (e) {

View File

@ -4,13 +4,19 @@ import co.paralleluniverse.fibers.Stack
import com.fasterxml.jackson.annotation.JsonAutoDetect
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
import com.fasterxml.jackson.annotation.JsonFormat
import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.annotation.JsonUnwrapped
import com.fasterxml.jackson.annotation.JsonValue
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.util.DefaultIndenter
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter
import com.fasterxml.jackson.databind.*
import com.fasterxml.jackson.databind.BeanDescription
import com.fasterxml.jackson.databind.JsonSerializer
import com.fasterxml.jackson.databind.ObjectWriter
import com.fasterxml.jackson.databind.SerializationConfig
import com.fasterxml.jackson.databind.SerializerProvider
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.databind.ser.BeanPropertyWriter
import com.fasterxml.jackson.databind.ser.BeanSerializerModifier
@ -27,8 +33,16 @@ import net.corda.core.flows.FlowSession
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.node.AppServiceHub.Companion.SERVICE_PRIORITY_NORMAL
import net.corda.core.internal.FlowAsyncOperation
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.WaitForStateConsumption
import net.corda.core.internal.declaredField
import net.corda.core.internal.div
import net.corda.core.internal.exists
import net.corda.core.internal.objectOrNewInstance
import net.corda.core.internal.outputStream
import net.corda.core.internal.uncheckedCast
import net.corda.core.node.ServiceHub
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SerializedBytes
@ -45,7 +59,17 @@ import net.corda.nodeapi.internal.lifecycle.NodeLifecycleObserver
import net.corda.nodeapi.internal.lifecycle.NodeLifecycleObserver.Companion.reportSuccess
import net.corda.node.internal.NodeStartup
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.DataSessionMessage
import net.corda.node.services.statemachine.ErrorState
import net.corda.node.services.statemachine.FlowError
import net.corda.node.services.statemachine.FlowSessionImpl
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.InitiatedSessionState
import net.corda.node.services.statemachine.SessionId
import net.corda.node.services.statemachine.SessionState
import net.corda.node.services.statemachine.SubFlow
import net.corda.node.utilities.JVMAgentUtil.getJvmAgentProperties
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
@ -55,7 +79,7 @@ import java.time.Duration
import java.time.Instant
import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter
import java.util.*
import java.util.UUID
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.zip.ZipEntry
@ -97,6 +121,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
addSerializer(FlowSessionImplSerializer)
addSerializer(MapSerializer)
addSerializer(AttachmentSerializer)
setMixInAnnotation(FlowAsyncOperation::class.java, FlowAsyncOperationMixin::class.java)
setMixInAnnotation(FlowLogic::class.java, FlowLogicMixin::class.java)
setMixInAnnotation(SessionId::class.java, SessionIdMixin::class.java)
})
@ -367,6 +392,15 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
val toLong: Long
}
@Suppress("unused")
private interface FlowAsyncOperationMixin {
@get:JsonIgnore
val serviceHub: ServiceHub
// [Any] used so this single mixin can serialize [FlowExternalOperation] and [FlowExternalAsyncOperation]
@get:JsonUnwrapped
val operation: Any
}
@JsonAutoDetect(getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
private interface FlowLogicMixin

View File

@ -55,7 +55,6 @@ class TransitionExecutorImpl(
} else {
// Otherwise error the state manually keeping the old flow state and schedule a DoRemainingWork
// to trigger error propagation
log.info("Error while executing $action, with event $event, erroring state", exception)
if(previousState.isRemoved && exception is OptimisticLockException) {
log.debug("Flow has been killed and the following error is likely due to the flow's checkpoint being deleted. " +
"Occurred while executing $action, with event $event", exception)

View File

@ -1,13 +1,9 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.FlowExternalAsyncOperation
import net.corda.core.flows.FlowLogic
import net.corda.core.internal.FlowAsyncOperation
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.executeAsync
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.serialization.SingletonSerializeAsToken
@ -20,6 +16,7 @@ import net.corda.testing.node.internal.startFlow
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ExecutionException
import kotlin.test.assertFailsWith
@ -46,15 +43,15 @@ class FlowAsyncOperationTests {
val flow = object : FlowLogic<Unit>() {
@Suspendable
override fun call() {
executeAsync(ErroredExecute())
await(ErroredExecute())
}
}
assertFailsWith<ExecutionException> { aliceNode.services.startFlow(flow).resultFuture.get() }
}
private class ErroredExecute : FlowAsyncOperation<Unit> {
override fun execute(deduplicationId: String): CordaFuture<Unit> {
private class ErroredExecute : FlowExternalAsyncOperation<Unit> {
override fun execute(deduplicationId: String): CompletableFuture<Unit> {
throw Exception()
}
}
@ -64,7 +61,7 @@ class FlowAsyncOperationTests {
val flow = object : FlowLogic<Unit>() {
@Suspendable
override fun call() {
executeAsync(ErroredResult())
await(ErroredResult())
}
}
@ -77,7 +74,7 @@ class FlowAsyncOperationTests {
@Suspendable
override fun call() {
try {
executeAsync(ErroredResult())
await(ErroredResult())
} catch (e: SpecialException) {
// Suppress
}
@ -87,10 +84,10 @@ class FlowAsyncOperationTests {
aliceNode.services.startFlow(flow).resultFuture.get()
}
private class ErroredResult : FlowAsyncOperation<Unit> {
override fun execute(deduplicationId: String): CordaFuture<Unit> {
val future = openFuture<Unit>()
future.setException(SpecialException())
private class ErroredResult : FlowExternalAsyncOperation<Unit> {
override fun execute(deduplicationId: String): CompletableFuture<Unit> {
val future = CompletableFuture<Unit>()
future.completeExceptionally(SpecialException())
return future
}
}
@ -118,12 +115,12 @@ class FlowAsyncOperationTests {
@Suspendable
override fun call() {
val scv = serviceHub.cordaService(WorkerService::class.java)
executeAsync(WorkerServiceTask(completeAllTasks, scv))
await(WorkerServiceTask(completeAllTasks, scv))
}
}
private class WorkerServiceTask(val completeAllTasks: Boolean, val service: WorkerService) : FlowAsyncOperation<Unit> {
override fun execute(deduplicationId: String): CordaFuture<Unit> {
private class WorkerServiceTask(val completeAllTasks: Boolean, val service: WorkerService) : FlowExternalAsyncOperation<Unit> {
override fun execute(deduplicationId: String): CompletableFuture<Unit> {
return service.performTask(completeAllTasks)
}
}
@ -131,17 +128,17 @@ class FlowAsyncOperationTests {
/** A dummy worker service that queues up tasks and allows clearing the entire task backlog. */
@CordaService
class WorkerService(val serviceHub: AppServiceHub) : SingletonSerializeAsToken() {
private val pendingTasks = ConcurrentLinkedQueue<OpenFuture<Unit>>()
private val pendingTasks = ConcurrentLinkedQueue<CompletableFuture<Unit>>()
val pendingCount: Int get() = pendingTasks.count()
fun performTask(completeAllTasks: Boolean): CordaFuture<Unit> {
val taskFuture = openFuture<Unit>()
fun performTask(completeAllTasks: Boolean): CompletableFuture<Unit> {
val taskFuture = CompletableFuture<Unit>()
pendingTasks.add(taskFuture)
if (completeAllTasks) {
synchronized(this) {
while (!pendingTasks.isEmpty()) {
val fut = pendingTasks.poll()!!
fut.set(Unit)
fut.complete(Unit)
}
}
}

View File

@ -618,6 +618,7 @@ private fun mockNodeConfiguration(certificatesDirectory: Path): NodeConfiguratio
doReturn(null).whenever(it).devModeOptions
doReturn(NetworkParameterAcceptanceSettings()).whenever(it).networkParameterAcceptanceSettings
doReturn(rigorousMock<ConfigurationWithOptions>()).whenever(it).configurationWithOptions
doReturn(2).whenever(it).flowExternalOperationThreadPoolSize
}
}

View File

@ -1,5 +1,6 @@
package net.corda.testing.dsl
import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.core.DoNotImplement
import net.corda.core.contracts.*
import net.corda.core.cordapp.CordappProvider
@ -26,6 +27,8 @@ import net.corda.testing.services.MockAttachmentStorage
import java.io.InputStream
import java.security.PublicKey
import java.util.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.collections.set
@ -93,6 +96,11 @@ data class TestTransactionDSLInterpreter private constructor(
ledgerInterpreter.getTransaction(id)
}
override val externalOperationExecutor: ExecutorService = Executors.newFixedThreadPool(
2,
ThreadFactoryBuilder().setNameFormat("flow-external-operation-thread").build()
)
override val attachmentTrustCalculator: AttachmentTrustCalculator =
ledgerInterpreter.services.attachments.let {
// Wrapping to a [InternalMockAttachmentStorage] is needed to prevent leaking internal api

View File

@ -9,8 +9,10 @@ import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.doAnswer
import com.nhaarman.mockito_kotlin.mock
import net.corda.client.jackson.JacksonSupport
import net.corda.client.jackson.internal.valueAs
import net.corda.client.rpc.RPCException
import net.corda.core.contracts.*
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.*
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
@ -29,6 +31,7 @@ import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.internal.NodeStartup
import net.corda.node.services.Permissions
import net.corda.node.services.Permissions.Companion.all
@ -59,8 +62,12 @@ import org.junit.Before
import org.junit.Ignore
import org.junit.Rule
import org.junit.Test
import org.junit.jupiter.api.assertThrows
import org.junit.rules.TemporaryFolder
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeoutException
import java.util.zip.ZipInputStream
import javax.security.auth.x500.X500Principal
import kotlin.test.assertEquals
@ -284,6 +291,75 @@ class InteractiveShellIntegrationTest {
}
}
@Test
fun `dumpCheckpoints correctly serializes FlowExternalOperations`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
(alice.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).createDirectories()
alice.rpc.startFlow(::ExternalOperationFlow)
ExternalOperation.lock.acquire()
InteractiveShell.runDumpCheckpoints(alice.rpc as InternalCordaRPCOps)
ExternalOperation.lock2.release()
val zipFile = (alice.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).list().first { "checkpoints_dump-" in it.toString() }
val json = ZipInputStream(zipFile.inputStream()).use { zip ->
zip.nextEntry
ObjectMapper().readTree(zip)
}
assertEquals("hello there", json["suspendedOn"]["customOperation"]["operation"]["a"].asText())
assertEquals(123, json["suspendedOn"]["customOperation"]["operation"]["b"].asInt())
assertEquals("please work", json["suspendedOn"]["customOperation"]["operation"]["c"]["d"].asText())
assertEquals("I beg you", json["suspendedOn"]["customOperation"]["operation"]["c"]["e"].asText())
}
}
@Test
fun `dumpCheckpoints correctly serializes FlowExternalAsyncOperations`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
(alice.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).createDirectories()
alice.rpc.startFlow(::ExternalAsyncOperationFlow)
ExternalAsyncOperation.lock.acquire()
InteractiveShell.runDumpCheckpoints(alice.rpc as InternalCordaRPCOps)
ExternalAsyncOperation.future.complete(null)
val zipFile = (alice.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).list().first { "checkpoints_dump-" in it.toString() }
val json = ZipInputStream(zipFile.inputStream()).use { zip ->
zip.nextEntry
ObjectMapper().readTree(zip)
}
assertEquals("hello there", json["suspendedOn"]["customOperation"]["operation"]["a"].asText())
assertEquals(123, json["suspendedOn"]["customOperation"]["operation"]["b"].asInt())
assertEquals("please work", json["suspendedOn"]["customOperation"]["operation"]["c"]["d"].asText())
assertEquals("I beg you", json["suspendedOn"]["customOperation"]["operation"]["c"]["e"].asText())
}
}
@Test
fun `dumpCheckpoints correctly serializes WaitForStateConsumption`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
(alice.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).createDirectories()
val stateRefs = setOf(
StateRef(SecureHash.randomSHA256(), 0),
StateRef(SecureHash.randomSHA256(), 1),
StateRef(SecureHash.randomSHA256(), 2)
)
assertThrows<TimeoutException> {
alice.rpc.startFlow(::WaitForStateConsumptionFlow, stateRefs).returnValue.getOrThrow(10.seconds)
}
InteractiveShell.runDumpCheckpoints(alice.rpc as InternalCordaRPCOps)
val zipFile = (alice.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).list().first { "checkpoints_dump-" in it.toString() }
val json = ZipInputStream(zipFile.inputStream()).use { zip ->
zip.nextEntry
ObjectMapper().readTree(zip)
}
assertEquals(stateRefs, json["suspendedOn"]["waitForStateConsumption"].valueAs<List<StateRef>>(inputObjectMapper).toSet())
}
}
@Test
fun `dumpCheckpoints creates zip with json file for suspended flow`() {
val user = User("u", "p", setOf(all()))
@ -445,5 +521,54 @@ class InteractiveShellIntegrationTest {
override val participants: List<AbstractParty>
) : LinearState
@StartableByRPC
class ExternalAsyncOperationFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
await(ExternalAsyncOperation("hello there", 123, Data("please work", "I beg you")))
}
}
class ExternalAsyncOperation(val a: String, val b: Int, val c: Data): FlowExternalAsyncOperation<Unit> {
companion object {
val future = CompletableFuture<Unit>()
val lock = Semaphore(0)
}
override fun execute(deduplicationId: String): CompletableFuture<Unit> {
return future.also { lock.release() }
}
}
class Data(val d: String, val e: String)
@StartableByRPC
class ExternalOperationFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
await(ExternalOperation("hello there", 123, Data("please work", "I beg you")))
}
}
class ExternalOperation(val a: String, val b: Int, val c: Data): FlowExternalOperation<Unit> {
companion object {
val lock = Semaphore(0)
val lock2 = Semaphore(0)
}
override fun execute(deduplicationId: String) {
lock.release()
lock2.acquire()
}
}
@StartableByRPC
class WaitForStateConsumptionFlow(private val stateRefs: Set<StateRef>) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
waitForStateConsumption(stateRefs)
}
}
}