node: wait for last checkpoint remove before completing state machine future, fixes race in TwoPartyTradeProtocolTests

This commit is contained in:
Andras Slemmer
2016-06-10 12:26:36 +01:00
parent fe83e41f52
commit 0dda3b2473

View File

@ -6,7 +6,9 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import com.codahale.metrics.Gauge import com.codahale.metrics.Gauge
import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Kryo
import com.google.common.base.Throwables import com.google.common.base.Throwables
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.abbreviate import com.r3corda.core.abbreviate
import com.r3corda.core.messaging.MessageRecipients import com.r3corda.core.messaging.MessageRecipients
import com.r3corda.core.messaging.runOnNextMessage import com.r3corda.core.messaging.runOnNextMessage
@ -62,6 +64,9 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
// property. // property.
private val stateMachines = synchronizedMap(LinkedHashMap<ProtocolStateMachineImpl<*>, Checkpoint>()) private val stateMachines = synchronizedMap(LinkedHashMap<ProtocolStateMachineImpl<*>, Checkpoint>())
// A map from fibers to futures that will be completed when the last corresponding checkpoint is removed
private val finalCheckpointRemovedFutures = synchronizedMap(HashMap<ProtocolStateMachineImpl<*>, SettableFuture<Unit>>())
// Monitoring support. // Monitoring support.
private val metrics = serviceHub.monitoringService.metrics private val metrics = serviceHub.monitoringService.metrics
@ -178,12 +183,15 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
private fun initFiber(psm: ProtocolStateMachineImpl<*>, checkpoint: Checkpoint?) { private fun initFiber(psm: ProtocolStateMachineImpl<*>, checkpoint: Checkpoint?) {
stateMachines[psm] = checkpoint stateMachines[psm] = checkpoint
notifyChangeObservers(psm, AddOrRemove.ADD) notifyChangeObservers(psm, AddOrRemove.ADD)
val finalCheckpointRemovedFuture: SettableFuture<Unit> = SettableFuture.create()
finalCheckpointRemovedFutures[psm] = finalCheckpointRemovedFuture
psm.resultFuture.then(executor) { psm.resultFuture.then(executor) {
psm.logic.progressTracker?.currentStep = ProgressTracker.DONE psm.logic.progressTracker?.currentStep = ProgressTracker.DONE
val finalCheckpoint = stateMachines.remove(psm) val finalCheckpoint = stateMachines.remove(psm)
if (finalCheckpoint != null) { if (finalCheckpoint != null) {
checkpointStorage.removeCheckpoint(finalCheckpoint) checkpointStorage.removeCheckpoint(finalCheckpoint)
} }
finalCheckpointRemovedFuture.set(Unit)
totalFinishedProtocols.inc() totalFinishedProtocols.inc()
notifyChangeObservers(psm, AddOrRemove.REMOVE) notifyChangeObservers(psm, AddOrRemove.REMOVE)
} }
@ -206,7 +214,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
} }
totalStartedProtocols.inc() totalStartedProtocols.inc()
} }
return fiber.resultFuture val finalCheckpointRemovedFuture = finalCheckpointRemovedFutures.remove(fiber)
return Futures.transformAsync(finalCheckpointRemovedFuture, { fiber.resultFuture })
} catch(e: Throwable) { } catch(e: Throwable) {
e.printStackTrace() e.printStackTrace()
throw e throw e