CORDA-1524 Fix NodeSchedulerService (#3238)

* Simplify flow scheduler

* Fix mutex and count up on the unfinished flows latch

* Fix missing import

* Some code layout shifting

* Undo automated change

* minor format changes from code review.

* Fix up tests to work with changes to scheduler

* Formatting fixes

* Remove commented out line.

* Fix race condition.

* We were not waiting for the scheduler to stop, or indeed stopping it.
This commit is contained in:
Christian Sailer 2018-05-29 12:20:30 +01:00 committed by Rick Parker
parent accb9eb5b3
commit 6791ea800d
3 changed files with 95 additions and 43 deletions

View File

@ -330,6 +330,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
drainingModePollPeriod = configuration.drainingModePollPeriod,
nodeProperties = nodeProperties)
runOnStop += { schedulerService.join() }
(serverThread as? ExecutorService)?.let {
runOnStop += {
// We wait here, even though any in-flight messages should have been drained away because the

View File

@ -11,13 +11,18 @@ import net.corda.core.contracts.ScheduledStateRef
import net.corda.core.contracts.StateRef
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.internal.*
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.join
import net.corda.core.internal.until
import net.corda.core.node.ServicesForResolution
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.seconds
import net.corda.core.utilities.trace
import net.corda.node.CordaClock
import net.corda.node.MutableClock
@ -35,7 +40,15 @@ import org.slf4j.Logger
import java.io.Serializable
import java.time.Duration
import java.time.Instant
import java.util.concurrent.*
import java.util.concurrent.CancellationException
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.concurrent.ExecutionException
import java.util.concurrent.Executor
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import javax.annotation.concurrent.ThreadSafe
import javax.persistence.Column
import javax.persistence.EmbeddedId
@ -141,26 +154,32 @@ class NodeSchedulerService(private val clock: CordaClock,
private class InnerState {
var rescheduled: GuavaSettableFuture<Boolean>? = null
var nextScheduledAction: ScheduledStateRef? = null
var running: Boolean = true
}
// Used to de-duplicate flow starts in case a flow is starting but the corresponding entry hasn't been removed yet
// from the database
private val startingStateRefs = ConcurrentHashSet<ScheduledStateRef>()
private val mutex = ThreadBox(InnerState())
private val schedulerTimerExecutor = Executors.newSingleThreadExecutor()
// if there's nothing to do, check every minute if something fell through the cracks.
// any new state should trigger a reschedule immediately if nothing is scheduled, so I would not expect
// this to usually trigger anything.
private val idleWaitSeconds = 60.seconds
// We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow.
fun start() {
mutex.locked {
rescheduleWakeUp()
}
schedulerTimerExecutor.execute { runLoopFunction() }
}
override fun scheduleStateActivity(action: ScheduledStateRef) {
log.trace { "Schedule $action" }
// Only increase the number of unfinished schedules if the state didn't already exist on the queue
val countUp = !schedulerRepo.merge(action)
if (!schedulerRepo.merge(action)) {
unfinishedSchedules.countUp()
}
contextTransaction.onCommit {
if (countUp) unfinishedSchedules.countUp()
mutex.locked {
if (action.scheduledAt < nextScheduledAction?.scheduledAt ?: Instant.MAX) {
// We are earliest
@ -186,41 +205,44 @@ class NodeSchedulerService(private val clock: CordaClock,
}
}
private val schedulerTimerExecutor = Executors.newSingleThreadExecutor()
/**
* This method first cancels the [java.util.concurrent.Future] for any pending action so that the
* [awaitWithDeadline] used below drops through without running the action. We then create a new
* [java.util.concurrent.Future] for the new action (so it too can be cancelled), and then await the arrival of the
* scheduled time. If we reach the scheduled time (the deadline) without the [java.util.concurrent.Future] being
* cancelled then we run the scheduled action. Finally we remove that action from the scheduled actions and
* recompute the next scheduled action.
*/
private fun rescheduleWakeUp() {
// Note, we already have the mutex but we need the scope again here
val (scheduledState, ourRescheduledFuture) = mutex.alreadyLocked {
rescheduled?.cancel(false)
private fun runLoopFunction() {
while (mutex.locked { running }) {
val (scheduledState, ourRescheduledFuture) = mutex.locked {
rescheduled = GuavaSettableFuture.create()
//get the next scheduled action that isn't currently running
nextScheduledAction = schedulerRepo.getLatest(startingStateRefs.size + 1).firstOrNull { !startingStateRefs.contains(it.second) }?.second
val deduplicate = HashSet(startingStateRefs) // Take an immutable copy to remove races with afterDatabaseCommit.
nextScheduledAction = schedulerRepo.getLatest(deduplicate.size + 1).firstOrNull { !deduplicate.contains(it.second) }?.second
Pair(nextScheduledAction, rescheduled!!)
}
if (scheduledState != null) {
schedulerTimerExecutor.execute {
log.trace(schedulingAsNextFormat, scheduledState)
// This will block the scheduler single thread until the scheduled time (returns false) OR
// the Future is cancelled due to rescheduling (returns true).
if (scheduledState != null) {
if (!awaitWithDeadline(clock, scheduledState.scheduledAt, ourRescheduledFuture)) {
log.trace { "Invoking as next $scheduledState" }
onTimeReached(scheduledState)
} else {
log.trace { "Rescheduled $scheduledState" }
}
} else {
awaitWithDeadline(clock, clock.instant() + idleWaitSeconds, ourRescheduledFuture)
}
}
}
private fun rescheduleWakeUp() {
mutex.alreadyLocked {
rescheduled?.cancel(false)
}
}
@VisibleForTesting
internal fun join() {
mutex.locked {
running = false
rescheduleWakeUp()
}
schedulerTimerExecutor.join()
}
@ -310,8 +332,6 @@ class NodeSchedulerService(private val clock: CordaClock,
}
}
}
// and schedule the next one
rescheduleWakeUp()
}
return scheduledFlow
}

View File

@ -1,7 +1,21 @@
package net.corda.node.services.events
import com.nhaarman.mockito_kotlin.*
import net.corda.core.contracts.*
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.argForWhich
import com.nhaarman.mockito_kotlin.doAnswer
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.eq
import com.nhaarman.mockito_kotlin.same
import com.nhaarman.mockito_kotlin.timeout
import com.nhaarman.mockito_kotlin.verify
import com.nhaarman.mockito_kotlin.verifyNoMoreInteractions
import com.nhaarman.mockito_kotlin.whenever
import junit.framework.Assert.fail
import net.corda.core.contracts.SchedulableState
import net.corda.core.contracts.ScheduledActivity
import net.corda.core.contracts.ScheduledStateRef
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionState
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRef
@ -23,6 +37,7 @@ import net.corda.testing.internal.spectator
import net.corda.testing.node.MockServices
import net.corda.testing.node.TestClock
import org.junit.After
import org.junit.Before
import org.junit.Ignore
import org.junit.Rule
import org.junit.Test
@ -32,6 +47,9 @@ import org.slf4j.Logger
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
open class NodeSchedulerServiceTestBase {
@ -68,16 +86,30 @@ open class NodeSchedulerServiceTestBase {
protected val servicesForResolution = rigorousMock<ServicesForResolution>().also {
doLookup(transactionStates).whenever(it).loadState(any())
}
protected val traces = Collections.synchronizedList(mutableListOf<ScheduledStateRef>())
@Before
fun resetTraces() {
traces.clear()
}
protected val log = spectator<Logger>().also {
doReturn(false).whenever(it).isTraceEnabled
doAnswer {
traces += it.getArgument<ScheduledStateRef>(1)
}.whenever(it).trace(eq(NodeSchedulerService.schedulingAsNextFormat), any<Object>())
}
protected fun assertWaitingFor(ssr: ScheduledStateRef, total: Int = 1) {
// The timeout is to make verify wait, which is necessary as we're racing the NSS thread i.e. we often get here just before the trace:
verify(log, timeout(5000).times(total)).trace(NodeSchedulerService.schedulingAsNextFormat, ssr)
protected fun assertWaitingFor(ssr: ScheduledStateRef) {
val endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(5)
while (System.currentTimeMillis() < endTime) {
if (traces.lastOrNull() == ssr) return
}
fail("Was expecting to by waiting for $ssr")
}
protected fun assertWaitingFor(event: Event, total: Int = 1) = assertWaitingFor(event.ssr, total)
protected fun assertWaitingFor(event: Event) = assertWaitingFor(event.ssr)
protected fun assertStarted(flowLogic: FlowLogic<*>) {
// Like in assertWaitingFor, use timeout to make verify wait as we often race the call to startFlow:
@ -88,7 +120,7 @@ open class NodeSchedulerServiceTestBase {
}
class MockScheduledFlowRepository : ScheduledFlowRepository {
private val map = HashMap<StateRef, ScheduledStateRef>()
private val map = ConcurrentHashMap<StateRef, ScheduledStateRef>()
override fun getLatest(lookahead: Int): List<Pair<StateRef, ScheduledStateRef>> {
return map.values.sortedBy { it.scheduledAt }.map { Pair(it.ref, it) }
@ -179,7 +211,7 @@ class NodeSchedulerServiceTest : NodeSchedulerServiceTestBase() {
assertWaitingFor(event1)
testClock.advanceBy(1.days)
assertStarted(event1)
assertWaitingFor(event2, 2)
assertWaitingFor(event2)
testClock.advanceBy(1.days)
assertStarted(event2)
}
@ -200,7 +232,6 @@ class NodeSchedulerServiceTest : NodeSchedulerServiceTestBase() {
fun `test activity due in the future and schedule another for same time`() {
val eventA = schedule(mark + 1.days)
val eventB = schedule(mark + 1.days)
assertWaitingFor(eventA)
testClock.advanceBy(1.days)
assertStarted(eventA)
assertStarted(eventB)