diff --git a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/AbstractCashSelection.kt b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/AbstractCashSelection.kt index 7f7ecea797..c2c9f0eb79 100644 --- a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/AbstractCashSelection.kt +++ b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/AbstractCashSelection.kt @@ -26,11 +26,8 @@ import net.corda.finance.contracts.asset.Cash import java.sql.Connection import java.sql.DatabaseMetaData import java.sql.ResultSet -import java.sql.SQLException import java.util.* import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.locks.ReentrantLock -import kotlin.concurrent.withLock /** * Pluggable interface to allow for different cash selection provider implementations @@ -61,9 +58,6 @@ abstract class AbstractCashSelection(private val maxRetries : Int = 8, private v private val log = contextLogger() } - // coin selection retry loop counter, sleep (msecs) and lock for selecting states - private val spendLock: ReentrantLock = ReentrantLock() - /** * Upon dynamically loading configured Cash Selection algorithms declared in META-INF/services * this method determines whether the loaded implementation is compatible and usable with the currently @@ -134,54 +128,48 @@ abstract class AbstractCashSelection(private val maxRetries : Int = 8, private v } private fun attemptSpend(services: ServiceHub, amount: Amount, lockId: UUID, notary: Party?, onlyFromIssuerParties: Set, withIssuerRefs: Set, stateAndRefs: MutableList>): Boolean { - spendLock.withLock { - val connection = services.jdbcSession() - try { - // we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null) - // the softLockReserve update will detect whether we try to lock states locked by others - return executeQuery(connection, amount, lockId, notary, onlyFromIssuerParties, withIssuerRefs) { rs -> - stateAndRefs.clear() + val connection = services.jdbcSession() + try { + // we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null) + // the softLockReserve update will detect whether we try to lock states locked by others + return executeQuery(connection, amount, lockId, notary, onlyFromIssuerParties, withIssuerRefs) { rs -> + stateAndRefs.clear() - var totalPennies = 0L - val stateRefs = mutableSetOf() - while (rs.next()) { - val txHash = SecureHash.parse(rs.getString(1)) - val index = rs.getInt(2) - val pennies = rs.getLong(3) - totalPennies = rs.getLong(4) - val rowLockId = rs.getString(5) - stateRefs.add(StateRef(txHash, index)) - log.trace { "ROW: $rowLockId ($lockId): ${StateRef(txHash, index)} : $pennies ($totalPennies)" } - } - - if (stateRefs.isNotEmpty()) { - // TODO: future implementation to retrieve contract states from a Vault BLOB store - stateAndRefs.addAll(uncheckedCast(services.loadStates(stateRefs))) - } - - val success = stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity - if (success) { - // we should have a minimum number of states to satisfy our selection `amount` criteria - log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs") - - // With the current single threaded state machine available states are guaranteed to lock. - // TODO However, we will have to revisit these methods in the future multi-threaded. - services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet()) - } else { - log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}") - } - success + var totalPennies = 0L + val stateRefs = mutableSetOf() + while (rs.next()) { + val txHash = SecureHash.parse(rs.getString(1)) + val index = rs.getInt(2) + val pennies = rs.getLong(3) + totalPennies = rs.getLong(4) + val rowLockId = rs.getString(5) + stateRefs.add(StateRef(txHash, index)) + log.trace { "ROW: $rowLockId ($lockId): ${StateRef(txHash, index)} : $pennies ($totalPennies)" } } - // retry as more states may become available - } catch (e: SQLException) { - log.error("""Failed retrieving unconsumed states for: amount [$amount], onlyFromIssuerParties [$onlyFromIssuerParties], notary [$notary], lockId [$lockId] - $e. - """) - } catch (e: StatesNotAvailableException) { // Should never happen with single threaded state machine - log.warn(e.message) - // retry only if there are locked states that may become available again (or consumed with change) + if (stateRefs.isNotEmpty()) { + // TODO: future implementation to retrieve contract states from a Vault BLOB store + stateAndRefs.addAll(uncheckedCast(services.loadStates(stateRefs))) + } + + val success = stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity + if (success) { + // we should have a minimum number of states to satisfy our selection `amount` criteria + log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs") + + // With the current single threaded state machine available states are guaranteed to lock. + // TODO However, we will have to revisit these methods in the future multi-threaded. + services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet()) + } else { + log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}") + } + success } + + // retry as more states may become available + } catch (e: StatesNotAvailableException) { // Should never happen with single threaded state machine + log.warn(e.message) + // retry only if there are locked states that may become available again (or consumed with change) } return false } diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 7ae01af130..b3629b1c95 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -360,6 +360,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, drainingModePollPeriod = configuration.drainingModePollPeriod, nodeProperties = nodeProperties) + runOnStop += { schedulerService.join() } (serverThread as? ExecutorService)?.let { runOnStop += { // We wait here, even though any in-flight messages should have been drained away because the diff --git a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt index d9440cabf8..bc69b9987c 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt @@ -21,13 +21,18 @@ import net.corda.core.contracts.ScheduledStateRef import net.corda.core.contracts.StateRef import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogicRefFactory -import net.corda.core.internal.* +import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.ThreadBox +import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.openFuture +import net.corda.core.internal.join +import net.corda.core.internal.until import net.corda.core.node.ServicesForResolution import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.seconds import net.corda.core.utilities.trace import net.corda.node.CordaClock import net.corda.node.MutableClock @@ -45,7 +50,15 @@ import org.slf4j.Logger import java.io.Serializable import java.time.Duration import java.time.Instant -import java.util.concurrent.* +import java.util.concurrent.CancellationException +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionStage +import java.util.concurrent.ExecutionException +import java.util.concurrent.Executor +import java.util.concurrent.Executors +import java.util.concurrent.Future +import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException import javax.annotation.concurrent.ThreadSafe import javax.persistence.Column import javax.persistence.EmbeddedId @@ -92,7 +105,7 @@ class NodeSchedulerService(private val clock: CordaClock, // to wait in our code, rather than Thread.sleep() or other time-based pauses. @Suspendable @VisibleForTesting - // We specify full classpath on SettableFuture to differentiate it from the Quasar class of the same name + // We specify full classpath on SettableFuture to differentiate it from the Quasar class of the same name fun awaitWithDeadline(clock: CordaClock, deadline: Instant, future: Future<*> = GuavaSettableFuture.create()): Boolean { var nanos: Long do { @@ -151,26 +164,32 @@ class NodeSchedulerService(private val clock: CordaClock, private class InnerState { var rescheduled: GuavaSettableFuture? = null var nextScheduledAction: ScheduledStateRef? = null + var running: Boolean = true } // Used to de-duplicate flow starts in case a flow is starting but the corresponding entry hasn't been removed yet // from the database private val startingStateRefs = ConcurrentHashSet() - private val mutex = ThreadBox(InnerState()) + private val schedulerTimerExecutor = Executors.newSingleThreadExecutor() + + // if there's nothing to do, check every minute if something fell through the cracks. + // any new state should trigger a reschedule immediately if nothing is scheduled, so I would not expect + // this to usually trigger anything. + private val idleWaitSeconds = 60.seconds + // We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow. fun start() { - mutex.locked { - rescheduleWakeUp() - } + schedulerTimerExecutor.execute { runLoopFunction() } } override fun scheduleStateActivity(action: ScheduledStateRef) { log.trace { "Schedule $action" } // Only increase the number of unfinished schedules if the state didn't already exist on the queue - val countUp = !schedulerRepo.merge(action) + if (!schedulerRepo.merge(action)) { + unfinishedSchedules.countUp() + } contextTransaction.onCommit { - if (countUp) unfinishedSchedules.countUp() mutex.locked { if (action.scheduledAt < nextScheduledAction?.scheduledAt ?: Instant.MAX) { // We are earliest @@ -196,41 +215,44 @@ class NodeSchedulerService(private val clock: CordaClock, } } - private val schedulerTimerExecutor = Executors.newSingleThreadExecutor() - /** - * This method first cancels the [java.util.concurrent.Future] for any pending action so that the - * [awaitWithDeadline] used below drops through without running the action. We then create a new - * [java.util.concurrent.Future] for the new action (so it too can be cancelled), and then await the arrival of the - * scheduled time. If we reach the scheduled time (the deadline) without the [java.util.concurrent.Future] being - * cancelled then we run the scheduled action. Finally we remove that action from the scheduled actions and - * recompute the next scheduled action. - */ - private fun rescheduleWakeUp() { - // Note, we already have the mutex but we need the scope again here - val (scheduledState, ourRescheduledFuture) = mutex.alreadyLocked { - rescheduled?.cancel(false) - rescheduled = GuavaSettableFuture.create() - //get the next scheduled action that isn't currently running - nextScheduledAction = schedulerRepo.getLatest(startingStateRefs.size + 1).firstOrNull { !startingStateRefs.contains(it.second) }?.second - Pair(nextScheduledAction, rescheduled!!) - } - if (scheduledState != null) { - schedulerTimerExecutor.execute { - log.trace(schedulingAsNextFormat, scheduledState) - // This will block the scheduler single thread until the scheduled time (returns false) OR - // the Future is cancelled due to rescheduling (returns true). + private fun runLoopFunction() { + while (mutex.locked { running }) { + val (scheduledState, ourRescheduledFuture) = mutex.locked { + rescheduled = GuavaSettableFuture.create() + //get the next scheduled action that isn't currently running + val deduplicate = HashSet(startingStateRefs) // Take an immutable copy to remove races with afterDatabaseCommit. + nextScheduledAction = schedulerRepo.getLatest(deduplicate.size + 1).firstOrNull { !deduplicate.contains(it.second) }?.second + Pair(nextScheduledAction, rescheduled!!) + } + log.trace(schedulingAsNextFormat, scheduledState) + // This will block the scheduler single thread until the scheduled time (returns false) OR + // the Future is cancelled due to rescheduling (returns true). + if (scheduledState != null) { if (!awaitWithDeadline(clock, scheduledState.scheduledAt, ourRescheduledFuture)) { log.trace { "Invoking as next $scheduledState" } onTimeReached(scheduledState) } else { log.trace { "Rescheduled $scheduledState" } } + } else { + awaitWithDeadline(clock, clock.instant() + idleWaitSeconds, ourRescheduledFuture) } + + } + } + + private fun rescheduleWakeUp() { + mutex.alreadyLocked { + rescheduled?.cancel(false) } } @VisibleForTesting internal fun join() { + mutex.locked { + running = false + rescheduleWakeUp() + } schedulerTimerExecutor.join() } @@ -320,8 +342,6 @@ class NodeSchedulerService(private val clock: CordaClock, } } } - // and schedule the next one - rescheduleWakeUp() } return scheduledFlow } diff --git a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt index 3867c20b4a..af55b3c7a6 100644 --- a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt @@ -10,8 +10,22 @@ package net.corda.node.services.events -import com.nhaarman.mockito_kotlin.* -import net.corda.core.contracts.* +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.argForWhich +import com.nhaarman.mockito_kotlin.doAnswer +import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.eq +import com.nhaarman.mockito_kotlin.same +import com.nhaarman.mockito_kotlin.timeout +import com.nhaarman.mockito_kotlin.verify +import com.nhaarman.mockito_kotlin.verifyNoMoreInteractions +import com.nhaarman.mockito_kotlin.whenever +import junit.framework.Assert.fail +import net.corda.core.contracts.SchedulableState +import net.corda.core.contracts.ScheduledActivity +import net.corda.core.contracts.ScheduledStateRef +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TransactionState import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogicRef @@ -33,6 +47,7 @@ import net.corda.testing.internal.spectator import net.corda.testing.node.MockServices import net.corda.testing.node.TestClock import org.junit.After +import org.junit.Before import org.junit.Ignore import org.junit.Rule import org.junit.Test @@ -42,6 +57,9 @@ import org.slf4j.Logger import java.time.Clock import java.time.Duration import java.time.Instant +import java.util.* +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit import kotlin.test.assertEquals open class NodeSchedulerServiceTestBase { @@ -78,16 +96,30 @@ open class NodeSchedulerServiceTestBase { protected val servicesForResolution = rigorousMock().also { doLookup(transactionStates).whenever(it).loadState(any()) } + + protected val traces = Collections.synchronizedList(mutableListOf()) + + @Before + fun resetTraces() { + traces.clear() + } + protected val log = spectator().also { doReturn(false).whenever(it).isTraceEnabled + doAnswer { + traces += it.getArgument(1) + }.whenever(it).trace(eq(NodeSchedulerService.schedulingAsNextFormat), any()) } - protected fun assertWaitingFor(ssr: ScheduledStateRef, total: Int = 1) { - // The timeout is to make verify wait, which is necessary as we're racing the NSS thread i.e. we often get here just before the trace: - verify(log, timeout(5000).times(total)).trace(NodeSchedulerService.schedulingAsNextFormat, ssr) + protected fun assertWaitingFor(ssr: ScheduledStateRef) { + val endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(5) + while (System.currentTimeMillis() < endTime) { + if (traces.lastOrNull() == ssr) return + } + fail("Was expecting to by waiting for $ssr") } - protected fun assertWaitingFor(event: Event, total: Int = 1) = assertWaitingFor(event.ssr, total) + protected fun assertWaitingFor(event: Event) = assertWaitingFor(event.ssr) protected fun assertStarted(flowLogic: FlowLogic<*>) { // Like in assertWaitingFor, use timeout to make verify wait as we often race the call to startFlow: @@ -98,7 +130,7 @@ open class NodeSchedulerServiceTestBase { } class MockScheduledFlowRepository : ScheduledFlowRepository { - private val map = HashMap() + private val map = ConcurrentHashMap() override fun getLatest(lookahead: Int): List> { return map.values.sortedBy { it.scheduledAt }.map { Pair(it.ref, it) } @@ -189,7 +221,7 @@ class NodeSchedulerServiceTest : NodeSchedulerServiceTestBase() { assertWaitingFor(event1) testClock.advanceBy(1.days) assertStarted(event1) - assertWaitingFor(event2, 2) + assertWaitingFor(event2) testClock.advanceBy(1.days) assertStarted(event2) } @@ -210,7 +242,6 @@ class NodeSchedulerServiceTest : NodeSchedulerServiceTestBase() { fun `test activity due in the future and schedule another for same time`() { val eventA = schedule(mark + 1.days) val eventB = schedule(mark + 1.days) - assertWaitingFor(eventA) testClock.advanceBy(1.days) assertStarted(eventA) assertStarted(eventB)