diff --git a/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt index 1d2ad91afa..ed2e4f36f6 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt @@ -6,7 +6,9 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer import com.codahale.metrics.Gauge import com.esotericsoftware.kryo.Kryo import com.google.common.base.Throwables +import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.SettableFuture import com.r3corda.core.abbreviate import com.r3corda.core.messaging.MessageRecipients import com.r3corda.core.messaging.runOnNextMessage @@ -62,6 +64,9 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService // property. private val stateMachines = synchronizedMap(LinkedHashMap, Checkpoint>()) + // A map from fibers to futures that will be completed when the last corresponding checkpoint is removed + private val finalCheckpointRemovedFutures = synchronizedMap(HashMap, SettableFuture>()) + // Monitoring support. private val metrics = serviceHub.monitoringService.metrics @@ -178,12 +183,15 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService private fun initFiber(psm: ProtocolStateMachineImpl<*>, checkpoint: Checkpoint?) { stateMachines[psm] = checkpoint notifyChangeObservers(psm, AddOrRemove.ADD) + val finalCheckpointRemovedFuture: SettableFuture = SettableFuture.create() + finalCheckpointRemovedFutures[psm] = finalCheckpointRemovedFuture psm.resultFuture.then(executor) { psm.logic.progressTracker?.currentStep = ProgressTracker.DONE val finalCheckpoint = stateMachines.remove(psm) if (finalCheckpoint != null) { checkpointStorage.removeCheckpoint(finalCheckpoint) } + finalCheckpointRemovedFuture.set(Unit) totalFinishedProtocols.inc() notifyChangeObservers(psm, AddOrRemove.REMOVE) } @@ -206,7 +214,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService } totalStartedProtocols.inc() } - return fiber.resultFuture + val finalCheckpointRemovedFuture = finalCheckpointRemovedFutures.remove(fiber) + return Futures.transformAsync(finalCheckpointRemovedFuture, { fiber.resultFuture }) } catch(e: Throwable) { e.printStackTrace() throw e