mirror of
https://github.com/corda/corda.git
synced 2025-06-17 22:58:19 +00:00
Throw a diagnostic exception if your FlowLogic.call method is not marked as @Suspendable.
This catches a bunch of unit tests where it's missing and also resolves an issue I saw Roger hit the other day.
This commit is contained in:
@ -235,15 +235,21 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
|
|||||||
|
|
||||||
private fun onTimeReached(scheduledState: ScheduledStateRef) {
|
private fun onTimeReached(scheduledState: ScheduledStateRef) {
|
||||||
serverThread.execute {
|
serverThread.execute {
|
||||||
|
var flowName: String? = "(unknown)"
|
||||||
|
try {
|
||||||
services.database.transaction {
|
services.database.transaction {
|
||||||
val scheduledFlow = getScheduledFlow(scheduledState)
|
val scheduledFlow = getScheduledFlow(scheduledState)
|
||||||
if (scheduledFlow != null) {
|
if (scheduledFlow != null) {
|
||||||
|
flowName = scheduledFlow.javaClass.name
|
||||||
val future = services.startFlow(scheduledFlow, FlowInitiator.Scheduled(scheduledState)).resultFuture
|
val future = services.startFlow(scheduledFlow, FlowInitiator.Scheduled(scheduledState)).resultFuture
|
||||||
future.then {
|
future.then {
|
||||||
unfinishedSchedules.countDown()
|
unfinishedSchedules.countDown()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} catch (e: Exception) {
|
||||||
|
log.error("Failed to start scheduled flow $flowName for $scheduledState due to an internal error", e)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,6 +2,7 @@ package net.corda.node.services.statemachine
|
|||||||
|
|
||||||
import co.paralleluniverse.fibers.Fiber
|
import co.paralleluniverse.fibers.Fiber
|
||||||
import co.paralleluniverse.fibers.FiberExecutorScheduler
|
import co.paralleluniverse.fibers.FiberExecutorScheduler
|
||||||
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
import co.paralleluniverse.fibers.instrument.SuspendableHelper
|
import co.paralleluniverse.fibers.instrument.SuspendableHelper
|
||||||
import co.paralleluniverse.strands.Strand
|
import co.paralleluniverse.strands.Strand
|
||||||
import com.codahale.metrics.Gauge
|
import com.codahale.metrics.Gauge
|
||||||
@ -427,6 +428,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun initFiber(fiber: FlowStateMachineImpl<*>) {
|
private fun initFiber(fiber: FlowStateMachineImpl<*>) {
|
||||||
|
verifyFlowLogicIsSuspendable(fiber.logic)
|
||||||
fiber.database = database
|
fiber.database = database
|
||||||
fiber.serviceHub = serviceHub
|
fiber.serviceHub = serviceHub
|
||||||
fiber.actionOnSuspend = { ioRequest ->
|
fiber.actionOnSuspend = { ioRequest ->
|
||||||
@ -458,6 +460,19 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun verifyFlowLogicIsSuspendable(logic: FlowLogic<Any?>) {
|
||||||
|
// Quasar requires (in Java 8) that at least the call method be annotated suspendable. Unfortunately, it's
|
||||||
|
// easy to forget to add this when creating a new flow, so we check here to give the user a better error.
|
||||||
|
//
|
||||||
|
// The Kotlin compiler can sometimes generate a synthetic bridge method from a single call declaration, which
|
||||||
|
// forwards to the void method and then returns Unit. However annotations do not get copied across to this
|
||||||
|
// bridge, so we have to do a more complex scan here.
|
||||||
|
val call = logic.javaClass.methods.first { !it.isSynthetic && it.name == "call" && it.parameterCount == 0 }
|
||||||
|
if (call.getAnnotation(Suspendable::class.java) == null) {
|
||||||
|
throw FlowException("${logic.javaClass.name}.call() is not annotated as @Suspendable. Please fix this.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private fun endAllFiberSessions(fiber: FlowStateMachineImpl<*>, result: Try<*>, propagated: Boolean) {
|
private fun endAllFiberSessions(fiber: FlowStateMachineImpl<*>, result: Try<*>, propagated: Boolean) {
|
||||||
openSessions.values.removeIf { session ->
|
openSessions.values.removeIf { session ->
|
||||||
if (session.fiber == fiber) {
|
if (session.fiber == fiber) {
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package net.corda.node.services.events
|
package net.corda.node.services.events
|
||||||
|
|
||||||
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
import net.corda.core.contracts.*
|
import net.corda.core.contracts.*
|
||||||
import net.corda.core.flows.FlowLogic
|
import net.corda.core.flows.FlowLogic
|
||||||
import net.corda.core.flows.FlowLogicRef
|
import net.corda.core.flows.FlowLogicRef
|
||||||
@ -136,6 +137,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
class TestFlowLogic(val increment: Int = 1) : FlowLogic<Unit>() {
|
class TestFlowLogic(val increment: Int = 1) : FlowLogic<Unit>() {
|
||||||
|
@Suspendable
|
||||||
override fun call() {
|
override fun call() {
|
||||||
(serviceHub as TestReference).testReference.calls += increment
|
(serviceHub as TestReference).testReference.calls += increment
|
||||||
(serviceHub as TestReference).testReference.countDown.countDown()
|
(serviceHub as TestReference).testReference.countDown.countDown()
|
||||||
|
@ -913,6 +913,7 @@ class FlowFrameworkTests {
|
|||||||
override val progressTracker: ProgressTracker = ProgressTracker(START_STEP)
|
override val progressTracker: ProgressTracker = ProgressTracker(START_STEP)
|
||||||
lateinit var exceptionThrown: E
|
lateinit var exceptionThrown: E
|
||||||
|
|
||||||
|
@Suspendable
|
||||||
override fun call(): Nothing {
|
override fun call(): Nothing {
|
||||||
progressTracker.currentStep = START_STEP
|
progressTracker.currentStep = START_STEP
|
||||||
exceptionThrown = exception()
|
exceptionThrown = exception()
|
||||||
|
@ -23,6 +23,7 @@ object UpdateBusinessDayFlow {
|
|||||||
|
|
||||||
@InitiatedBy(Broadcast::class)
|
@InitiatedBy(Broadcast::class)
|
||||||
private class UpdateBusinessDayHandler(val otherParty: Party) : FlowLogic<Unit>() {
|
private class UpdateBusinessDayHandler(val otherParty: Party) : FlowLogic<Unit>() {
|
||||||
|
@Suspendable
|
||||||
override fun call() {
|
override fun call() {
|
||||||
val message = receive<UpdateBusinessDayMessage>(otherParty).unwrap { it }
|
val message = receive<UpdateBusinessDayMessage>(otherParty).unwrap { it }
|
||||||
(serviceHub.clock as TestClock).updateDate(message.date)
|
(serviceHub.clock as TestClock).updateDate(message.date)
|
||||||
|
Reference in New Issue
Block a user