From 7a9ce48996b150dfb81a0949a05958bae6a3e2c8 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Fri, 2 Feb 2018 17:00:37 +0000 Subject: [PATCH] Only emit SM Add events when the flow signals that it started --- .../services/statemachine/MultiThreadedStateMachineManager.kt | 4 +++- .../statemachine/SingleThreadedStateMachineManager.kt | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt index ad14c1c36d..cc05ea8b9d 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt @@ -265,6 +265,9 @@ class MultiThreadedStateMachineManager( override fun signalFlowHasStarted(flowId: StateMachineRunId) { concurrentBox.concurrent { startedFutures.remove(flowId)?.set(Unit) + flows[flowId]?.let { flow -> + changesPublisher.onNext(StateMachineManager.Change.Add(flow.fiber.logic)) + } } } @@ -571,7 +574,6 @@ class MultiThreadedStateMachineManager( Fiber.unparkDeserialized(flow.fiber, scheduler) } } - changesPublisher.onNext(StateMachineManager.Change.Add(flow.fiber.logic)) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index e1f308f881..78c666f985 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -264,6 +264,9 @@ class SingleThreadedStateMachineManager( override fun signalFlowHasStarted(flowId: StateMachineRunId) { mutex.locked { startedFutures.remove(flowId)?.set(Unit) + flows[flowId]?.let { flow -> + changesPublisher.onNext(StateMachineManager.Change.Add(flow.fiber.logic)) + } } } @@ -574,7 +577,6 @@ class SingleThreadedStateMachineManager( Fiber.unparkDeserialized(flow.fiber, scheduler) } } - changesPublisher.onNext(StateMachineManager.Change.Add(flow.fiber.logic)) } } }