[CORDA-2265]: Nodes hangs when using AppServiceHub to start a flow in a blocking way (fix) (#4376)

This commit is contained in:
Michele Sollecito 2018-12-06 15:13:32 +00:00 committed by GitHub
parent c46fde1133
commit e2ea97bae7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 68 additions and 2 deletions

View File

@ -28,6 +28,8 @@ Unreleased
* Improved exception thrown by `AttachmentsClassLoader` when an attachment cannot be used because its uploader is not trusted. * Improved exception thrown by `AttachmentsClassLoader` when an attachment cannot be used because its uploader is not trusted.
* Fixed deadlocks generated by starting flow from within CordaServices.
* Marked the ``Attachment`` interface as ``@DoNotImplement`` because it is not meant to be extended by CorDapp developers. If you have already * Marked the ``Attachment`` interface as ``@DoNotImplement`` because it is not meant to be extended by CorDapp developers. If you have already
done so, please get in contact on the usual communication channels. done so, please get in contact on the usual communication channels.

View File

@ -0,0 +1,55 @@
package net.corda.node.services
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.StartableByService
import net.corda.core.messaging.startFlow
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.getOrThrow
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
class CordaServiceFlowTests {
@Test
fun `corda service can start a flow and wait for it`() {
driver(DriverParameters(startNodesInProcess = true)) {
val node = startNode().getOrThrow()
val text = "191ejodaimadc8i"
val length = node.rpc.startFlow(::ComputeTextLengthThroughCordaService, text).returnValue.getOrThrow()
assertThat(length).isEqualTo(text.length)
}
}
@StartableByRPC
class ComputeTextLengthThroughCordaService(private val text: String) : FlowLogic<Int>() {
@Suspendable
override fun call(): Int {
val service = serviceHub.cordaService(TextLengthComputingService::class.java)
return service.computeLength(text)
}
}
@StartableByService
class ActuallyComputeTextLength(private val text: String) : FlowLogic<Int>() {
@Suspendable
override fun call(): Int {
return text.length
}
}
@CordaService
class TextLengthComputingService(private val services: AppServiceHub) : SingletonSerializeAsToken() {
fun computeLength(text: String): Int {
// Just to check this works with Quasar.
require(text.isNotEmpty()) { "Length must be at least 1." }
return services.startFlow(ActuallyComputeTextLength(text)).returnValue.toCompletableFuture().getOrThrow()
}
}
}

View File

@ -17,6 +17,7 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.* import net.corda.core.internal.*
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.notary.NotaryService import net.corda.core.internal.notary.NotaryService
@ -601,8 +602,16 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
} }
override fun <T> startFlow(flow: FlowLogic<T>): FlowHandle<T> { override fun <T> startFlow(flow: FlowLogic<T>): FlowHandle<T> {
val parentFlow = FlowLogic.currentTopLevel
return if (parentFlow != null) {
val result = parentFlow.subFlow(flow)
// Accessing the flow id must happen after the flow has started.
val flowId = flow.runId
FlowHandleImpl(flowId, doneFuture(result))
} else {
val stateMachine = startFlowChecked(flow) val stateMachine = startFlowChecked(flow)
return FlowHandleImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture) FlowHandleImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture)
}
} }
private fun <T> startFlowChecked(flow: FlowLogic<T>): FlowStateMachine<T> { private fun <T> startFlowChecked(flow: FlowLogic<T>): FlowStateMachine<T> {