From e2ea97bae7260ff223e60aa5359ba11a137eb43d Mon Sep 17 00:00:00 2001 From: Michele Sollecito Date: Thu, 6 Dec 2018 15:13:32 +0000 Subject: [PATCH] [CORDA-2265]: Nodes hangs when using AppServiceHub to start a flow in a blocking way (fix) (#4376) --- docs/source/changelog.rst | 2 + .../node/services/CordaServiceFlowTests.kt | 55 +++++++++++++++++++ .../net/corda/node/internal/AbstractNode.kt | 13 ++++- 3 files changed, 68 insertions(+), 2 deletions(-) create mode 100644 node/src/integration-test/kotlin/net/corda/node/services/CordaServiceFlowTests.kt diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 7e7f07ed1c..bc361a6c35 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -28,6 +28,8 @@ Unreleased * Improved exception thrown by `AttachmentsClassLoader` when an attachment cannot be used because its uploader is not trusted. +* Fixed deadlocks generated by starting flow from within CordaServices. + * Marked the ``Attachment`` interface as ``@DoNotImplement`` because it is not meant to be extended by CorDapp developers. If you have already done so, please get in contact on the usual communication channels. diff --git a/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceFlowTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceFlowTests.kt new file mode 100644 index 0000000000..2c9e27d1ce --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceFlowTests.kt @@ -0,0 +1,55 @@ +package net.corda.node.services + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC +import net.corda.core.flows.StartableByService +import net.corda.core.messaging.startFlow +import net.corda.core.node.AppServiceHub +import net.corda.core.node.services.CordaService +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.utilities.getOrThrow +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test + +class CordaServiceFlowTests { + @Test + fun `corda service can start a flow and wait for it`() { + driver(DriverParameters(startNodesInProcess = true)) { + val node = startNode().getOrThrow() + val text = "191ejodaimadc8i" + + val length = node.rpc.startFlow(::ComputeTextLengthThroughCordaService, text).returnValue.getOrThrow() + + assertThat(length).isEqualTo(text.length) + } + } + + @StartableByRPC + class ComputeTextLengthThroughCordaService(private val text: String) : FlowLogic() { + @Suspendable + override fun call(): Int { + val service = serviceHub.cordaService(TextLengthComputingService::class.java) + return service.computeLength(text) + } + } + + @StartableByService + class ActuallyComputeTextLength(private val text: String) : FlowLogic() { + @Suspendable + override fun call(): Int { + return text.length + } + } + + @CordaService + class TextLengthComputingService(private val services: AppServiceHub) : SingletonSerializeAsToken() { + fun computeLength(text: String): Int { + // Just to check this works with Quasar. + require(text.isNotEmpty()) { "Length must be at least 1." } + return services.startFlow(ActuallyComputeTextLength(text)).returnValue.toCompletableFuture().getOrThrow() + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index bd1829a3a3..bb358b6993 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -17,6 +17,7 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate import net.corda.core.internal.* +import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.notary.NotaryService @@ -601,8 +602,16 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } override fun startFlow(flow: FlowLogic): FlowHandle { - val stateMachine = startFlowChecked(flow) - return FlowHandleImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture) + val parentFlow = FlowLogic.currentTopLevel + return if (parentFlow != null) { + val result = parentFlow.subFlow(flow) + // Accessing the flow id must happen after the flow has started. + val flowId = flow.runId + FlowHandleImpl(flowId, doneFuture(result)) + } else { + val stateMachine = startFlowChecked(flow) + FlowHandleImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture) + } } private fun startFlowChecked(flow: FlowLogic): FlowStateMachine {