Fixed intermittent stack serialization issue with persistent scheduler.

Improved exception reporting when fiber serialization fails or other internal Quasar error to help with future Kryo errors when checkpointing.
This commit is contained in:
rick.parker 2016-10-18 17:05:10 +01:00
parent 6a20f32a7a
commit 8a3027ffd6
2 changed files with 15 additions and 6 deletions

View File

@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.ThreadBox
import com.r3corda.core.contracts.SchedulableState
import com.r3corda.core.contracts.ScheduledActivity
import com.r3corda.core.contracts.ScheduledStateRef
import com.r3corda.core.contracts.StateRef
import com.r3corda.core.node.services.SchedulerService
@ -167,17 +168,27 @@ class NodeSchedulerService(private val database: Database,
override fun call(): Unit {
progressTracker.currentStep = RUNNING
// Ensure we are still scheduled.
val scheduledLogic: ProtocolLogic<*>? = getScheduledLogic()
if(scheduledLogic != null) {
subProtocol(scheduledLogic)
}
}
private fun getScheduledaActivity(): ScheduledActivity? {
val txState = serviceHub.loadState(scheduledState.ref)
val state = txState.data as SchedulableState
val scheduledActivity = try {
return try {
// This can throw as running contract code.
state.nextScheduledActivity(scheduledState.ref, scheduler.protocolLogicRefFactory)
} catch(e: Exception) {
logger.error("Attempt to run scheduled state $scheduledState resulted in error.", e)
null
}
}
// Ensure we are still scheduled.
private fun getScheduledLogic(): ProtocolLogic<*>? {
val scheduledActivity = getScheduledaActivity()
var scheduledLogic: ProtocolLogic<*>? = null
scheduler.mutex.locked {
// need to remove us from those scheduled, but only if we are still next
@ -206,9 +217,7 @@ class NodeSchedulerService(private val database: Database,
recomputeEarliest()
scheduler.rescheduleWakeUp()
}
if(scheduledLogic != null) {
subProtocol(scheduledLogic!!)
}
return scheduledLogic
}
}
}

View File

@ -240,7 +240,7 @@ class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
actionOnSuspend(ioRequest)
} catch (t: Throwable) {
// Do not throw exception again - Quasar completely bins it.
logger.warn("Captured exception which was swallowed by Quasar", t)
logger.warn("Captured exception which was swallowed by Quasar for $logic at ${fiber.stackTrace.toList().joinToString("\n")}", t)
// TODO When error handling is introduced, look into whether we should be deleting the checkpoint and
// completing the Future
processException(t)