SMM: Better fiber scheduler that always schedules onto the SMM thread.

This commit is contained in:
Mike Hearn 2016-04-25 16:43:46 +02:00
parent 746aca8290
commit e5a0a211da
2 changed files with 8 additions and 8 deletions

View File

@ -7,7 +7,6 @@ import com.codahale.metrics.Gauge
import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Input
import com.google.common.base.Throwables import com.google.common.base.Throwables
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import core.crypto.SecureHash import core.crypto.SecureHash
import core.crypto.sha256 import core.crypto.sha256
import core.node.services.ServiceHub import core.node.services.ServiceHub
@ -54,6 +53,10 @@ import javax.annotation.concurrent.ThreadSafe
*/ */
@ThreadSafe @ThreadSafe
class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExecutor) { class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExecutor) {
inner class FiberScheduler : FiberExecutorScheduler("Same thread scheduler", executor)
val scheduler = FiberScheduler()
// This map is backed by a database and will be used to store serialised state machines to disk, so we can resurrect // This map is backed by a database and will be used to store serialised state machines to disk, so we can resurrect
// them across node restarts. // them across node restarts.
private val checkpointsMap = serviceHub.storageService.stateMachines private val checkpointsMap = serviceHub.storageService.stateMachines
@ -125,7 +128,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
logger.trace { "<- $topic : message of type ${obj.javaClass.name}" } logger.trace { "<- $topic : message of type ${obj.javaClass.name}" }
iterateStateMachine(psm, serviceHub.networkService, logger, obj, checkpointKey) { iterateStateMachine(psm, serviceHub.networkService, logger, obj, checkpointKey) {
try { try {
Fiber.unparkDeserialized(it, SameThreadFiberScheduler) Fiber.unparkDeserialized(it, scheduler)
} catch(e: Throwable) { } catch(e: Throwable) {
logError(e, logger, obj, topic, it) logError(e, logger, obj, topic, it)
} }
@ -159,7 +162,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
fun <T> add(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> { fun <T> add(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> {
try { try {
val logger = LoggerFactory.getLogger(loggerName) val logger = LoggerFactory.getLogger(loggerName)
val fiber = ProtocolStateMachine(logic) val fiber = ProtocolStateMachine(logic, scheduler)
// Need to add before iterating in case of immediate completion // Need to add before iterating in case of immediate completion
_stateMachines.add(logic) _stateMachines.add(logic)
executor.executeASAP { executor.executeASAP {
@ -263,9 +266,6 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
} }
} }
// TODO: Override more of this to avoid the case where Strand.sleep triggers a call to a scheduler that then runs on the wrong thread.
object SameThreadFiberScheduler : FiberExecutorScheduler("Same thread scheduler", MoreExecutors.directExecutor())
// TODO: Clean this up // TODO: Clean this up
open class FiberRequest(val topic: String, val destination: MessageRecipients?, open class FiberRequest(val topic: String, val destination: MessageRecipients?,
val sessionIDForSend: Long, val sessionIDForReceive: Long, val obj: Any?) { val sessionIDForSend: Long, val sessionIDForReceive: Long, val obj: Any?) {
@ -289,7 +289,6 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
obj: Any? obj: Any?
) : FiberRequest(topic, destination, sessionIDForSend, -1, obj) ) : FiberRequest(topic, destination, sessionIDForSend, -1, obj)
} }
} }
class StackSnapshot : Throwable("This is a stack trace to help identify the source of the underlying problem") class StackSnapshot : Throwable("This is a stack trace to help identify the source of the underlying problem")

View File

@ -1,6 +1,7 @@
package core.protocols package core.protocols
import co.paralleluniverse.fibers.Fiber import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.FiberScheduler
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.io.serialization.kryo.KryoSerializer import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.io.Output
@ -22,7 +23,7 @@ import java.io.ByteArrayOutputStream
* a protocol invokes a sub-protocol, then it will pass along the PSM to the child. The call method of the topmost * a protocol invokes a sub-protocol, then it will pass along the PSM to the child. The call method of the topmost
* logic element gets to return the value that the entire state machine resolves to. * logic element gets to return the value that the entire state machine resolves to.
*/ */
class ProtocolStateMachine<R>(val logic: ProtocolLogic<R>) : Fiber<R>("protocol", StateMachineManager.SameThreadFiberScheduler) { class ProtocolStateMachine<R>(val logic: ProtocolLogic<R>, scheduler: FiberScheduler) : Fiber<R>("protocol", scheduler) {
// These fields shouldn't be serialised, so they are marked @Transient. // These fields shouldn't be serialised, so they are marked @Transient.
@Transient private var suspendFunc: ((result: StateMachineManager.FiberRequest, serFiber: ByteArray) -> Unit)? = null @Transient private var suspendFunc: ((result: StateMachineManager.FiberRequest, serFiber: ByteArray) -> Unit)? = null
@Transient private var resumeWithObject: Any? = null @Transient private var resumeWithObject: Any? = null