CORDA-959 Filter unschedules and remove database activity from inside mutex. (#2426)

* Filter unschedules and remove database activity from inside mutex.

* Race condition fix

* Bug fix
This commit is contained in:
Rick Parker 2018-01-26 17:44:42 +00:00 committed by GitHub
parent 4257891c98
commit e19f51d9ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 16 additions and 13 deletions

View File

@ -61,7 +61,7 @@ class NodeSchedulerService(private val clock: CordaClock,
private val serverThread: Executor,
private val flowLogicRefFactory: FlowLogicRefFactory,
private val log: Logger = staticLog,
scheduledStates: MutableMap<StateRef, ScheduledStateRef> = createMap())
private val scheduledStates: MutableMap<StateRef, ScheduledStateRef> = createMap())
: SchedulerService, SingletonSerializeAsToken() {
companion object {
@ -153,13 +153,13 @@ class NodeSchedulerService(private val clock: CordaClock,
var scheduledAt: Instant = Instant.now()
)
private class InnerState(var scheduledStates: MutableMap<StateRef, ScheduledStateRef>) {
private class InnerState {
var scheduledStatesQueue: PriorityQueue<ScheduledStateRef> = PriorityQueue({ a, b -> a.scheduledAt.compareTo(b.scheduledAt) })
var rescheduled: GuavaSettableFuture<Boolean>? = null
}
private val mutex = ThreadBox(InnerState(scheduledStates))
private val mutex = ThreadBox(InnerState())
// We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow.
fun start() {
mutex.locked {
@ -170,9 +170,9 @@ class NodeSchedulerService(private val clock: CordaClock,
override fun scheduleStateActivity(action: ScheduledStateRef) {
log.trace { "Schedule $action" }
val previousState = scheduledStates[action.ref]
scheduledStates[action.ref] = action
mutex.locked {
val previousState = scheduledStates[action.ref]
scheduledStates[action.ref] = action
val previousEarliest = scheduledStatesQueue.peek()
scheduledStatesQueue.remove(previousState)
scheduledStatesQueue.add(action)
@ -192,12 +192,15 @@ class NodeSchedulerService(private val clock: CordaClock,
override fun unscheduleStateActivity(ref: StateRef) {
log.trace { "Unschedule $ref" }
val removedAction = scheduledStates.remove(ref)
mutex.locked {
val removedAction = scheduledStates.remove(ref)
if (removedAction != null) {
scheduledStatesQueue.remove(removedAction)
unfinishedSchedules.countDown()
if (removedAction == scheduledStatesQueue.peek()) {
val wasNext = (removedAction == scheduledStatesQueue.peek())
val wasRemoved = scheduledStatesQueue.remove(removedAction)
if (wasRemoved) {
unfinishedSchedules.countDown()
}
if (wasNext) {
rescheduleWakeUp()
}
}

View File

@ -18,7 +18,7 @@ class ScheduledActivityObserver private constructor(private val schedulerService
fun install(vaultService: VaultService, schedulerService: SchedulerService, flowLogicRefFactory: FlowLogicRefFactory) {
val observer = ScheduledActivityObserver(schedulerService, flowLogicRefFactory)
vaultService.rawUpdates.subscribe { (consumed, produced) ->
consumed.forEach { schedulerService.unscheduleStateActivity(it.ref) }
consumed.forEach { if (it.state.data is SchedulableState) schedulerService.unscheduleStateActivity(it.ref) }
produced.forEach { observer.scheduleStateActivity(it) }
}
}

View File

@ -6,16 +6,16 @@ import net.corda.core.contracts.*
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRef
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.utilities.days
import net.corda.testing.internal.rigorousMock
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.uncheckedCast
import net.corda.core.node.StateLoader
import net.corda.core.utilities.days
import net.corda.node.services.api.FlowStarter
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.testing.internal.doLookup
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.TestClock
import org.junit.Rule
import org.junit.Test
@ -163,7 +163,7 @@ class NodeSchedulerServiceTest {
val eventA = schedule(mark + 1.days)
val eventB = schedule(mark + 1.days)
scheduler.unscheduleStateActivity(eventA.stateRef)
assertWaitingFor(eventA) // XXX: Shouldn't it be waiting for eventB now?
assertWaitingFor(eventB)
testClock.advanceBy(1.days)
assertStarted(eventB)
}