mirror of
https://github.com/corda/corda.git
synced 2025-06-18 23:28:21 +00:00
Merged in bugfix-for-persistent-scheduler (pull request #410)
Fixed intermittent stack serialization issue with persistent scheduler.
This commit is contained in:
@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable
|
|||||||
import com.google.common.util.concurrent.SettableFuture
|
import com.google.common.util.concurrent.SettableFuture
|
||||||
import com.r3corda.core.ThreadBox
|
import com.r3corda.core.ThreadBox
|
||||||
import com.r3corda.core.contracts.SchedulableState
|
import com.r3corda.core.contracts.SchedulableState
|
||||||
|
import com.r3corda.core.contracts.ScheduledActivity
|
||||||
import com.r3corda.core.contracts.ScheduledStateRef
|
import com.r3corda.core.contracts.ScheduledStateRef
|
||||||
import com.r3corda.core.contracts.StateRef
|
import com.r3corda.core.contracts.StateRef
|
||||||
import com.r3corda.core.node.services.SchedulerService
|
import com.r3corda.core.node.services.SchedulerService
|
||||||
@ -167,17 +168,27 @@ class NodeSchedulerService(private val database: Database,
|
|||||||
override fun call(): Unit {
|
override fun call(): Unit {
|
||||||
progressTracker.currentStep = RUNNING
|
progressTracker.currentStep = RUNNING
|
||||||
|
|
||||||
|
// Ensure we are still scheduled.
|
||||||
|
val scheduledLogic: ProtocolLogic<*>? = getScheduledLogic()
|
||||||
|
if(scheduledLogic != null) {
|
||||||
|
subProtocol(scheduledLogic)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun getScheduledaActivity(): ScheduledActivity? {
|
||||||
val txState = serviceHub.loadState(scheduledState.ref)
|
val txState = serviceHub.loadState(scheduledState.ref)
|
||||||
val state = txState.data as SchedulableState
|
val state = txState.data as SchedulableState
|
||||||
val scheduledActivity = try {
|
return try {
|
||||||
// This can throw as running contract code.
|
// This can throw as running contract code.
|
||||||
state.nextScheduledActivity(scheduledState.ref, scheduler.protocolLogicRefFactory)
|
state.nextScheduledActivity(scheduledState.ref, scheduler.protocolLogicRefFactory)
|
||||||
} catch(e: Exception) {
|
} catch(e: Exception) {
|
||||||
logger.error("Attempt to run scheduled state $scheduledState resulted in error.", e)
|
logger.error("Attempt to run scheduled state $scheduledState resulted in error.", e)
|
||||||
null
|
null
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure we are still scheduled.
|
private fun getScheduledLogic(): ProtocolLogic<*>? {
|
||||||
|
val scheduledActivity = getScheduledaActivity()
|
||||||
var scheduledLogic: ProtocolLogic<*>? = null
|
var scheduledLogic: ProtocolLogic<*>? = null
|
||||||
scheduler.mutex.locked {
|
scheduler.mutex.locked {
|
||||||
// need to remove us from those scheduled, but only if we are still next
|
// need to remove us from those scheduled, but only if we are still next
|
||||||
@ -206,9 +217,7 @@ class NodeSchedulerService(private val database: Database,
|
|||||||
recomputeEarliest()
|
recomputeEarliest()
|
||||||
scheduler.rescheduleWakeUp()
|
scheduler.rescheduleWakeUp()
|
||||||
}
|
}
|
||||||
if(scheduledLogic != null) {
|
return scheduledLogic
|
||||||
subProtocol(scheduledLogic!!)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -240,7 +240,7 @@ class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
actionOnSuspend(ioRequest)
|
actionOnSuspend(ioRequest)
|
||||||
} catch (t: Throwable) {
|
} catch (t: Throwable) {
|
||||||
// Do not throw exception again - Quasar completely bins it.
|
// Do not throw exception again - Quasar completely bins it.
|
||||||
logger.warn("Captured exception which was swallowed by Quasar", t)
|
logger.warn("Captured exception which was swallowed by Quasar for $logic at ${fiber.stackTrace.toList().joinToString("\n")}", t)
|
||||||
// TODO When error handling is introduced, look into whether we should be deleting the checkpoint and
|
// TODO When error handling is introduced, look into whether we should be deleting the checkpoint and
|
||||||
// completing the Future
|
// completing the Future
|
||||||
processException(t)
|
processException(t)
|
||||||
|
Reference in New Issue
Block a user