From 020a594a601ffef0f0deb1f20d19484e841f7eb5 Mon Sep 17 00:00:00 2001 From: Mike Hearn Date: Tue, 15 Dec 2015 13:01:57 +0100 Subject: [PATCH] State machines: thread safety and a few more comments. --- .../kotlin/core/messaging/StateMachines.kt | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/core/messaging/StateMachines.kt b/src/main/kotlin/core/messaging/StateMachines.kt index e3e7331354..82562dca9f 100644 --- a/src/main/kotlin/core/messaging/StateMachines.kt +++ b/src/main/kotlin/core/messaging/StateMachines.kt @@ -27,6 +27,7 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import java.util.* import java.util.concurrent.Executor +import javax.annotation.concurrent.ThreadSafe /** * A StateMachineManager is responsible for coordination and persistence of multiple [ProtocolStateMachine] objects. @@ -41,18 +42,24 @@ import java.util.concurrent.Executor * * TODO: The framework should propagate exceptions and handle error handling automatically. * TODO: This needs extension to the >2 party case. - * TODO: Thread safety + * TODO: Consider the issue of continuation identity more deeply: is it a safe assumption that a serialised + * continuation is always unique? */ +@ThreadSafe class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor) { // This map is backed by a database and will be used to store serialised state machines to disk, so we can resurrect // them across node restarts. private val checkpointsMap = serviceHub.storageService.getMap("state machines") // A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines // property. - private val _stateMachines = ArrayList>() + private val _stateMachines = Collections.synchronizedList(ArrayList>()) /** Returns a snapshot of the currently registered state machines. */ - val stateMachines: List> get() = ArrayList(_stateMachines) + val stateMachines: List> get() { + synchronized(_stateMachines) { + return ArrayList(_stateMachines) + } + } // This class will be serialised, so everything it points to transitively must also be serialisable (with Kryo). private class Checkpoint( @@ -151,6 +158,8 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor) } private fun persistCheckpoint(prev: ByteArray?, new: ByteArray) { + // It's OK for this to be unsynchronised, as the prev/new byte arrays are specific to a continuation instance, + // and the underlying map provided by the database layer is expected to be thread safe. if (prev != null) checkpointsMap.remove(SecureHash.sha256(prev)) checkpointsMap[SecureHash.sha256(new)] = new @@ -237,12 +246,16 @@ abstract class ProtocolStateMachine : Runnable { // These fields shouldn't be serialised. @Transient private var _resultFuture: SettableFuture = SettableFuture.create() + /** This future will complete when the call method returns. */ val resultFuture: ListenableFuture get() = _resultFuture + + /** This field is initialised by the framework to point to various infrastructure submodules. */ @Transient lateinit var serviceHub: ServiceHub abstract fun call(args: T): R override fun run() { + // TODO: Catch any exceptions here and put them in the future. val r = call(Continuation.getContext() as T) if (r != null) _resultFuture.set(r)