State machines: thread safety and a few more comments.

This commit is contained in:
Mike Hearn 2015-12-15 13:01:57 +01:00
parent 3c578550a9
commit 020a594a60

View File

@ -27,6 +27,7 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.*
import java.util.concurrent.Executor
import javax.annotation.concurrent.ThreadSafe
/**
* A StateMachineManager is responsible for coordination and persistence of multiple [ProtocolStateMachine] objects.
@ -41,18 +42,24 @@ import java.util.concurrent.Executor
*
* TODO: The framework should propagate exceptions and handle error handling automatically.
* TODO: This needs extension to the >2 party case.
* TODO: Thread safety
* TODO: Consider the issue of continuation identity more deeply: is it a safe assumption that a serialised
* continuation is always unique?
*/
@ThreadSafe
class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor) {
// This map is backed by a database and will be used to store serialised state machines to disk, so we can resurrect
// them across node restarts.
private val checkpointsMap = serviceHub.storageService.getMap<SecureHash, ByteArray>("state machines")
// A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines
// property.
private val _stateMachines = ArrayList<ProtocolStateMachine<*,*>>()
private val _stateMachines = Collections.synchronizedList(ArrayList<ProtocolStateMachine<*,*>>())
/** Returns a snapshot of the currently registered state machines. */
val stateMachines: List<ProtocolStateMachine<*,*>> get() = ArrayList(_stateMachines)
val stateMachines: List<ProtocolStateMachine<*,*>> get() {
synchronized(_stateMachines) {
return ArrayList(_stateMachines)
}
}
// This class will be serialised, so everything it points to transitively must also be serialisable (with Kryo).
private class Checkpoint(
@ -151,6 +158,8 @@ class StateMachineManager(val serviceHub: ServiceHub, val runInThread: Executor)
}
private fun persistCheckpoint(prev: ByteArray?, new: ByteArray) {
// It's OK for this to be unsynchronised, as the prev/new byte arrays are specific to a continuation instance,
// and the underlying map provided by the database layer is expected to be thread safe.
if (prev != null)
checkpointsMap.remove(SecureHash.sha256(prev))
checkpointsMap[SecureHash.sha256(new)] = new
@ -237,12 +246,16 @@ abstract class ProtocolStateMachine<T, R> : Runnable {
// These fields shouldn't be serialised.
@Transient private var _resultFuture: SettableFuture<R> = SettableFuture.create<R>()
/** This future will complete when the call method returns. */
val resultFuture: ListenableFuture<R> get() = _resultFuture
/** This field is initialised by the framework to point to various infrastructure submodules. */
@Transient lateinit var serviceHub: ServiceHub
abstract fun call(args: T): R
override fun run() {
// TODO: Catch any exceptions here and put them in the future.
val r = call(Continuation.getContext() as T)
if (r != null)
_resultFuture.set(r)