logger name moved into protocol state machine

This commit is contained in:
Shams Asari 2016-05-09 09:57:42 +01:00
parent 20c6be193a
commit 2e7b004eb5
2 changed files with 41 additions and 40 deletions

View File

@ -19,8 +19,6 @@ import core.serialization.serialize
import core.utilities.AffinityExecutor
import core.utilities.ProgressTracker
import core.utilities.trace
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.io.PrintWriter
import java.io.StringWriter
import java.util.*
@ -69,7 +67,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
private val metrics = serviceHub.monitoringService.metrics
init {
metrics.register("Protocols.InFlight", Gauge<kotlin.Int> { stateMachines.size })
metrics.register("Protocols.InFlight", Gauge<Int> { stateMachines.size })
}
private val checkpointingMeter = metrics.meter("Protocols.Checkpointing Rate")
@ -94,7 +92,6 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
// This class will be serialised, so everything it points to transitively must also be serialisable (with Kryo).
private class Checkpoint(
val serialisedFiber: ByteArray,
val loggerName: String,
val awaitingTopic: String,
val awaitingObjectOfType: String // java class name
)
@ -116,7 +113,6 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
// so we can deserialised the nested stream that holds the fiber.
val psm = deserializeFiber(checkpoint.serialisedFiber)
stateMachines.add(psm.logic)
val logger = LoggerFactory.getLogger(checkpoint.loggerName)
val awaitingObjectOfType = Class.forName(checkpoint.awaitingObjectOfType)
val topic = checkpoint.awaitingTopic
@ -126,25 +122,25 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
val obj: Any = THREAD_LOCAL_KRYO.get().readClassAndObject(Input(netMsg.data))
if (!awaitingObjectOfType.isInstance(obj))
throw ClassCastException("Received message of unexpected type: ${obj.javaClass.name} vs ${awaitingObjectOfType.name}")
logger.trace { "<- $topic : message of type ${obj.javaClass.name}" }
iterateStateMachine(psm, logger, obj, checkpointKey) {
psm.logger.trace { "<- $topic : message of type ${obj.javaClass.name}" }
iterateStateMachine(psm, obj, checkpointKey) {
try {
Fiber.unparkDeserialized(it, scheduler)
} catch(e: Throwable) {
logError(e, logger, obj, topic, it)
logError(e, obj, topic, it)
}
}
}
}
}
private fun logError(e: Throwable, logger: Logger, obj: Any, topic: String, psm: ProtocolStateMachine<*>) {
logger.error("Protocol state machine ${psm.javaClass.name} threw '${Throwables.getRootCause(e)}' " +
private fun logError(e: Throwable, obj: Any, topic: String, psm: ProtocolStateMachine<*>) {
psm.logger.error("Protocol state machine ${psm.javaClass.name} threw '${Throwables.getRootCause(e)}' " +
"when handling a message of type ${obj.javaClass.name} on topic $topic")
if (logger.isTraceEnabled) {
if (psm.logger.isTraceEnabled) {
val s = StringWriter()
Throwables.getRootCause(e).printStackTrace(PrintWriter(s))
logger.trace("Stack trace of protocol error is: $s")
psm.logger.trace("Stack trace of protocol error is: $s")
}
}
@ -162,12 +158,11 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
*/
fun <T> add(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> {
try {
val logger = LoggerFactory.getLogger(loggerName)
val fiber = ProtocolStateMachine(logic, scheduler)
val fiber = ProtocolStateMachine(logic, scheduler, loggerName)
// Need to add before iterating in case of immediate completion
stateMachines.add(logic)
executor.executeASAP {
iterateStateMachine(fiber, logger, null, null) {
iterateStateMachine(fiber, null, null) {
it.start()
}
totalStartedProtocols.inc()
@ -191,7 +186,6 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
}
private fun iterateStateMachine(psm: ProtocolStateMachine<*>,
logger: Logger,
obj: Any?,
prevCheckpointKey: SecureHash?,
resumeFunc: (ProtocolStateMachine<*>) -> Unit) {
@ -200,27 +194,27 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
// We have a request to do something: send, receive, or send-and-receive.
if (request is FiberRequest.ExpectingResponse<*>) {
// Prepare a listener on the network that runs in the background thread when we received a message.
checkpointAndSetupMessageHandler(logger, psm, request, prevCheckpointKey, serFiber)
checkpointAndSetupMessageHandler(psm, request, prevCheckpointKey, serFiber)
}
// If an object to send was provided (not null), send it now.
request.obj?.let {
val topic = "${request.topic}.${request.sessionIDForSend}"
logger.trace { "-> ${request.destination}/$topic : message of type ${it.javaClass.name}" }
psm.logger.trace { "-> ${request.destination}/$topic : message of type ${it.javaClass.name}" }
serviceHub.networkService.send(topic, it, request.destination!!)
}
if (request is FiberRequest.NotExpectingResponse) {
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
iterateStateMachine(psm, logger, null, prevCheckpointKey) {
iterateStateMachine(psm, null, prevCheckpointKey) {
try {
Fiber.unpark(it, QUASAR_UNBLOCKER)
} catch(e: Throwable) {
logError(e, logger, request.obj!!, request.topic, it)
logError(e, request.obj!!, request.topic, it)
}
}
}
}
psm.prepareForResumeWith(serviceHub, obj, logger, onSuspend)
psm.prepareForResumeWith(serviceHub, obj, onSuspend)
resumeFunc(psm)
@ -233,18 +227,17 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
}
}
private fun checkpointAndSetupMessageHandler(logger: Logger,
psm: ProtocolStateMachine<*>,
private fun checkpointAndSetupMessageHandler(psm: ProtocolStateMachine<*>,
request: FiberRequest.ExpectingResponse<*>,
prevCheckpointKey: SecureHash?,
serialisedFiber: ByteArray) {
executor.checkOnThread()
val topic = "${request.topic}.${request.sessionIDForReceive}"
val checkpoint = Checkpoint(serialisedFiber, logger.name, topic, request.responseType.name)
val checkpoint = Checkpoint(serialisedFiber, topic, request.responseType.name)
val curPersistedBytes = checkpoint.serialize().bits
persistCheckpoint(prevCheckpointKey, curPersistedBytes)
val newCheckpointKey = curPersistedBytes.sha256()
logger.trace { "Waiting for message of type ${request.responseType.name} on $topic" }
psm.logger.trace { "Waiting for message of type ${request.responseType.name} on $topic" }
val consumed = AtomicBoolean()
serviceHub.networkService.runOnNextMessage(topic, executor) { netMsg ->
// Some assertions to ensure we don't execute on the wrong thread or get executed more than once.
@ -261,11 +254,11 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec
val obj: Any = THREAD_LOCAL_KRYO.get().readClassAndObject(Input(netMsg.data))
if (!request.responseType.isInstance(obj))
throw IllegalStateException("Expected message of type ${request.responseType.name} but got ${obj.javaClass.name}", request.stackTraceInCaseOfProblems)
iterateStateMachine(psm, logger, obj, newCheckpointKey) {
iterateStateMachine(psm, obj, newCheckpointKey) {
try {
Fiber.unpark(it, QUASAR_UNBLOCKER)
} catch(e: Throwable) {
logError(e, logger, obj, topic, it)
logError(e, obj, topic, it)
}
}
}

View File

@ -13,6 +13,7 @@ import core.node.ServiceHub
import core.serialization.createKryo
import core.utilities.UntrustworthyData
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.io.ByteArrayOutputStream
/**
@ -23,27 +24,23 @@ import java.io.ByteArrayOutputStream
* a protocol invokes a sub-protocol, then it will pass along the PSM to the child. The call method of the topmost
* logic element gets to return the value that the entire state machine resolves to.
*/
class ProtocolStateMachine<R>(val logic: ProtocolLogic<R>, scheduler: FiberScheduler) : Fiber<R>("protocol", scheduler) {
class ProtocolStateMachine<R>(val logic: ProtocolLogic<R>, scheduler: FiberScheduler, val loggerName: String) : Fiber<R>("protocol", scheduler) {
// These fields shouldn't be serialised, so they are marked @Transient.
@Transient private var suspendFunc: ((result: StateMachineManager.FiberRequest, serFiber: ByteArray) -> Unit)? = null
@Transient private var resumeWithObject: Any? = null
@Transient lateinit var serviceHub: ServiceHub
@Transient lateinit var logger: Logger
init {
logic.psm = this
}
fun prepareForResumeWith(serviceHub: ServiceHub, withObject: Any?, logger: Logger,
suspendFunc: (StateMachineManager.FiberRequest, ByteArray) -> Unit) {
this.suspendFunc = suspendFunc
this.logger = logger
this.resumeWithObject = withObject
this.serviceHub = serviceHub
@Transient private var _logger: Logger? = null
val logger: Logger get() {
return _logger ?: run {
val l = LoggerFactory.getLogger(loggerName)
_logger = l
return l
}
}
@Transient private var _resultFuture: SettableFuture<R>? = SettableFuture.create<R>()
/** This future will complete when the call method returns. */
val resultFuture: ListenableFuture<R> get() {
return _resultFuture ?: run {
@ -53,6 +50,17 @@ class ProtocolStateMachine<R>(val logic: ProtocolLogic<R>, scheduler: FiberSched
}
}
init {
logic.psm = this
}
fun prepareForResumeWith(serviceHub: ServiceHub, withObject: Any?,
suspendFunc: (StateMachineManager.FiberRequest, ByteArray) -> Unit) {
this.suspendFunc = suspendFunc
this.resumeWithObject = withObject
this.serviceHub = serviceHub
}
@Suspendable @Suppress("UNCHECKED_CAST")
override fun run(): R {
try {