From 2e7b004eb5179b969504e17d5b31023cfac8cc28 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Mon, 9 May 2016 09:57:42 +0100 Subject: [PATCH] logger name moved into protocol state machine --- .../core/messaging/StateMachineManager.kt | 47 ++++++++----------- .../core/protocols/ProtocolStateMachine.kt | 34 +++++++++----- 2 files changed, 41 insertions(+), 40 deletions(-) diff --git a/src/main/kotlin/core/messaging/StateMachineManager.kt b/src/main/kotlin/core/messaging/StateMachineManager.kt index db6566a08b..bae66d3ec5 100644 --- a/src/main/kotlin/core/messaging/StateMachineManager.kt +++ b/src/main/kotlin/core/messaging/StateMachineManager.kt @@ -19,8 +19,6 @@ import core.serialization.serialize import core.utilities.AffinityExecutor import core.utilities.ProgressTracker import core.utilities.trace -import org.slf4j.Logger -import org.slf4j.LoggerFactory import java.io.PrintWriter import java.io.StringWriter import java.util.* @@ -69,7 +67,7 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec private val metrics = serviceHub.monitoringService.metrics init { - metrics.register("Protocols.InFlight", Gauge { stateMachines.size }) + metrics.register("Protocols.InFlight", Gauge { stateMachines.size }) } private val checkpointingMeter = metrics.meter("Protocols.Checkpointing Rate") @@ -94,7 +92,6 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec // This class will be serialised, so everything it points to transitively must also be serialisable (with Kryo). private class Checkpoint( val serialisedFiber: ByteArray, - val loggerName: String, val awaitingTopic: String, val awaitingObjectOfType: String // java class name ) @@ -116,7 +113,6 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec // so we can deserialised the nested stream that holds the fiber. val psm = deserializeFiber(checkpoint.serialisedFiber) stateMachines.add(psm.logic) - val logger = LoggerFactory.getLogger(checkpoint.loggerName) val awaitingObjectOfType = Class.forName(checkpoint.awaitingObjectOfType) val topic = checkpoint.awaitingTopic @@ -126,25 +122,25 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec val obj: Any = THREAD_LOCAL_KRYO.get().readClassAndObject(Input(netMsg.data)) if (!awaitingObjectOfType.isInstance(obj)) throw ClassCastException("Received message of unexpected type: ${obj.javaClass.name} vs ${awaitingObjectOfType.name}") - logger.trace { "<- $topic : message of type ${obj.javaClass.name}" } - iterateStateMachine(psm, logger, obj, checkpointKey) { + psm.logger.trace { "<- $topic : message of type ${obj.javaClass.name}" } + iterateStateMachine(psm, obj, checkpointKey) { try { Fiber.unparkDeserialized(it, scheduler) } catch(e: Throwable) { - logError(e, logger, obj, topic, it) + logError(e, obj, topic, it) } } } } } - private fun logError(e: Throwable, logger: Logger, obj: Any, topic: String, psm: ProtocolStateMachine<*>) { - logger.error("Protocol state machine ${psm.javaClass.name} threw '${Throwables.getRootCause(e)}' " + + private fun logError(e: Throwable, obj: Any, topic: String, psm: ProtocolStateMachine<*>) { + psm.logger.error("Protocol state machine ${psm.javaClass.name} threw '${Throwables.getRootCause(e)}' " + "when handling a message of type ${obj.javaClass.name} on topic $topic") - if (logger.isTraceEnabled) { + if (psm.logger.isTraceEnabled) { val s = StringWriter() Throwables.getRootCause(e).printStackTrace(PrintWriter(s)) - logger.trace("Stack trace of protocol error is: $s") + psm.logger.trace("Stack trace of protocol error is: $s") } } @@ -162,12 +158,11 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec */ fun add(loggerName: String, logic: ProtocolLogic): ListenableFuture { try { - val logger = LoggerFactory.getLogger(loggerName) - val fiber = ProtocolStateMachine(logic, scheduler) + val fiber = ProtocolStateMachine(logic, scheduler, loggerName) // Need to add before iterating in case of immediate completion stateMachines.add(logic) executor.executeASAP { - iterateStateMachine(fiber, logger, null, null) { + iterateStateMachine(fiber, null, null) { it.start() } totalStartedProtocols.inc() @@ -191,7 +186,6 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec } private fun iterateStateMachine(psm: ProtocolStateMachine<*>, - logger: Logger, obj: Any?, prevCheckpointKey: SecureHash?, resumeFunc: (ProtocolStateMachine<*>) -> Unit) { @@ -200,27 +194,27 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec // We have a request to do something: send, receive, or send-and-receive. if (request is FiberRequest.ExpectingResponse<*>) { // Prepare a listener on the network that runs in the background thread when we received a message. - checkpointAndSetupMessageHandler(logger, psm, request, prevCheckpointKey, serFiber) + checkpointAndSetupMessageHandler(psm, request, prevCheckpointKey, serFiber) } // If an object to send was provided (not null), send it now. request.obj?.let { val topic = "${request.topic}.${request.sessionIDForSend}" - logger.trace { "-> ${request.destination}/$topic : message of type ${it.javaClass.name}" } + psm.logger.trace { "-> ${request.destination}/$topic : message of type ${it.javaClass.name}" } serviceHub.networkService.send(topic, it, request.destination!!) } if (request is FiberRequest.NotExpectingResponse) { // We sent a message, but don't expect a response, so re-enter the continuation to let it keep going. - iterateStateMachine(psm, logger, null, prevCheckpointKey) { + iterateStateMachine(psm, null, prevCheckpointKey) { try { Fiber.unpark(it, QUASAR_UNBLOCKER) } catch(e: Throwable) { - logError(e, logger, request.obj!!, request.topic, it) + logError(e, request.obj!!, request.topic, it) } } } } - psm.prepareForResumeWith(serviceHub, obj, logger, onSuspend) + psm.prepareForResumeWith(serviceHub, obj, onSuspend) resumeFunc(psm) @@ -233,18 +227,17 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec } } - private fun checkpointAndSetupMessageHandler(logger: Logger, - psm: ProtocolStateMachine<*>, + private fun checkpointAndSetupMessageHandler(psm: ProtocolStateMachine<*>, request: FiberRequest.ExpectingResponse<*>, prevCheckpointKey: SecureHash?, serialisedFiber: ByteArray) { executor.checkOnThread() val topic = "${request.topic}.${request.sessionIDForReceive}" - val checkpoint = Checkpoint(serialisedFiber, logger.name, topic, request.responseType.name) + val checkpoint = Checkpoint(serialisedFiber, topic, request.responseType.name) val curPersistedBytes = checkpoint.serialize().bits persistCheckpoint(prevCheckpointKey, curPersistedBytes) val newCheckpointKey = curPersistedBytes.sha256() - logger.trace { "Waiting for message of type ${request.responseType.name} on $topic" } + psm.logger.trace { "Waiting for message of type ${request.responseType.name} on $topic" } val consumed = AtomicBoolean() serviceHub.networkService.runOnNextMessage(topic, executor) { netMsg -> // Some assertions to ensure we don't execute on the wrong thread or get executed more than once. @@ -261,11 +254,11 @@ class StateMachineManager(val serviceHub: ServiceHub, val executor: AffinityExec val obj: Any = THREAD_LOCAL_KRYO.get().readClassAndObject(Input(netMsg.data)) if (!request.responseType.isInstance(obj)) throw IllegalStateException("Expected message of type ${request.responseType.name} but got ${obj.javaClass.name}", request.stackTraceInCaseOfProblems) - iterateStateMachine(psm, logger, obj, newCheckpointKey) { + iterateStateMachine(psm, obj, newCheckpointKey) { try { Fiber.unpark(it, QUASAR_UNBLOCKER) } catch(e: Throwable) { - logError(e, logger, obj, topic, it) + logError(e, obj, topic, it) } } } diff --git a/src/main/kotlin/core/protocols/ProtocolStateMachine.kt b/src/main/kotlin/core/protocols/ProtocolStateMachine.kt index eb3ecd8f79..dc2f993707 100644 --- a/src/main/kotlin/core/protocols/ProtocolStateMachine.kt +++ b/src/main/kotlin/core/protocols/ProtocolStateMachine.kt @@ -13,6 +13,7 @@ import core.node.ServiceHub import core.serialization.createKryo import core.utilities.UntrustworthyData import org.slf4j.Logger +import org.slf4j.LoggerFactory import java.io.ByteArrayOutputStream /** @@ -23,27 +24,23 @@ import java.io.ByteArrayOutputStream * a protocol invokes a sub-protocol, then it will pass along the PSM to the child. The call method of the topmost * logic element gets to return the value that the entire state machine resolves to. */ -class ProtocolStateMachine(val logic: ProtocolLogic, scheduler: FiberScheduler) : Fiber("protocol", scheduler) { +class ProtocolStateMachine(val logic: ProtocolLogic, scheduler: FiberScheduler, val loggerName: String) : Fiber("protocol", scheduler) { + // These fields shouldn't be serialised, so they are marked @Transient. @Transient private var suspendFunc: ((result: StateMachineManager.FiberRequest, serFiber: ByteArray) -> Unit)? = null @Transient private var resumeWithObject: Any? = null @Transient lateinit var serviceHub: ServiceHub - @Transient lateinit var logger: Logger - init { - logic.psm = this - } - - fun prepareForResumeWith(serviceHub: ServiceHub, withObject: Any?, logger: Logger, - suspendFunc: (StateMachineManager.FiberRequest, ByteArray) -> Unit) { - this.suspendFunc = suspendFunc - this.logger = logger - this.resumeWithObject = withObject - this.serviceHub = serviceHub + @Transient private var _logger: Logger? = null + val logger: Logger get() { + return _logger ?: run { + val l = LoggerFactory.getLogger(loggerName) + _logger = l + return l + } } @Transient private var _resultFuture: SettableFuture? = SettableFuture.create() - /** This future will complete when the call method returns. */ val resultFuture: ListenableFuture get() { return _resultFuture ?: run { @@ -53,6 +50,17 @@ class ProtocolStateMachine(val logic: ProtocolLogic, scheduler: FiberSched } } + init { + logic.psm = this + } + + fun prepareForResumeWith(serviceHub: ServiceHub, withObject: Any?, + suspendFunc: (StateMachineManager.FiberRequest, ByteArray) -> Unit) { + this.suspendFunc = suspendFunc + this.resumeWithObject = withObject + this.serviceHub = serviceHub + } + @Suspendable @Suppress("UNCHECKED_CAST") override fun run(): R { try {