mirror of
https://github.com/corda/corda.git
synced 2025-03-14 08:16:32 +00:00
Merge remote-tracking branch 'remotes/open/master' into parkri-os-merge-20180529-1
This commit is contained in:
commit
34a9a7aa05
@ -26,11 +26,8 @@ import net.corda.finance.contracts.asset.Cash
|
||||
import java.sql.Connection
|
||||
import java.sql.DatabaseMetaData
|
||||
import java.sql.ResultSet
|
||||
import java.sql.SQLException
|
||||
import java.util.*
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import kotlin.concurrent.withLock
|
||||
|
||||
/**
|
||||
* Pluggable interface to allow for different cash selection provider implementations
|
||||
@ -61,9 +58,6 @@ abstract class AbstractCashSelection(private val maxRetries : Int = 8, private v
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
// coin selection retry loop counter, sleep (msecs) and lock for selecting states
|
||||
private val spendLock: ReentrantLock = ReentrantLock()
|
||||
|
||||
/**
|
||||
* Upon dynamically loading configured Cash Selection algorithms declared in META-INF/services
|
||||
* this method determines whether the loaded implementation is compatible and usable with the currently
|
||||
@ -134,54 +128,48 @@ abstract class AbstractCashSelection(private val maxRetries : Int = 8, private v
|
||||
}
|
||||
|
||||
private fun attemptSpend(services: ServiceHub, amount: Amount<Currency>, lockId: UUID, notary: Party?, onlyFromIssuerParties: Set<AbstractParty>, withIssuerRefs: Set<OpaqueBytes>, stateAndRefs: MutableList<StateAndRef<Cash.State>>): Boolean {
|
||||
spendLock.withLock {
|
||||
val connection = services.jdbcSession()
|
||||
try {
|
||||
// we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null)
|
||||
// the softLockReserve update will detect whether we try to lock states locked by others
|
||||
return executeQuery(connection, amount, lockId, notary, onlyFromIssuerParties, withIssuerRefs) { rs ->
|
||||
stateAndRefs.clear()
|
||||
val connection = services.jdbcSession()
|
||||
try {
|
||||
// we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null)
|
||||
// the softLockReserve update will detect whether we try to lock states locked by others
|
||||
return executeQuery(connection, amount, lockId, notary, onlyFromIssuerParties, withIssuerRefs) { rs ->
|
||||
stateAndRefs.clear()
|
||||
|
||||
var totalPennies = 0L
|
||||
val stateRefs = mutableSetOf<StateRef>()
|
||||
while (rs.next()) {
|
||||
val txHash = SecureHash.parse(rs.getString(1))
|
||||
val index = rs.getInt(2)
|
||||
val pennies = rs.getLong(3)
|
||||
totalPennies = rs.getLong(4)
|
||||
val rowLockId = rs.getString(5)
|
||||
stateRefs.add(StateRef(txHash, index))
|
||||
log.trace { "ROW: $rowLockId ($lockId): ${StateRef(txHash, index)} : $pennies ($totalPennies)" }
|
||||
}
|
||||
|
||||
if (stateRefs.isNotEmpty()) {
|
||||
// TODO: future implementation to retrieve contract states from a Vault BLOB store
|
||||
stateAndRefs.addAll(uncheckedCast(services.loadStates(stateRefs)))
|
||||
}
|
||||
|
||||
val success = stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity
|
||||
if (success) {
|
||||
// we should have a minimum number of states to satisfy our selection `amount` criteria
|
||||
log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs")
|
||||
|
||||
// With the current single threaded state machine available states are guaranteed to lock.
|
||||
// TODO However, we will have to revisit these methods in the future multi-threaded.
|
||||
services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet())
|
||||
} else {
|
||||
log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}")
|
||||
}
|
||||
success
|
||||
var totalPennies = 0L
|
||||
val stateRefs = mutableSetOf<StateRef>()
|
||||
while (rs.next()) {
|
||||
val txHash = SecureHash.parse(rs.getString(1))
|
||||
val index = rs.getInt(2)
|
||||
val pennies = rs.getLong(3)
|
||||
totalPennies = rs.getLong(4)
|
||||
val rowLockId = rs.getString(5)
|
||||
stateRefs.add(StateRef(txHash, index))
|
||||
log.trace { "ROW: $rowLockId ($lockId): ${StateRef(txHash, index)} : $pennies ($totalPennies)" }
|
||||
}
|
||||
|
||||
// retry as more states may become available
|
||||
} catch (e: SQLException) {
|
||||
log.error("""Failed retrieving unconsumed states for: amount [$amount], onlyFromIssuerParties [$onlyFromIssuerParties], notary [$notary], lockId [$lockId]
|
||||
$e.
|
||||
""")
|
||||
} catch (e: StatesNotAvailableException) { // Should never happen with single threaded state machine
|
||||
log.warn(e.message)
|
||||
// retry only if there are locked states that may become available again (or consumed with change)
|
||||
if (stateRefs.isNotEmpty()) {
|
||||
// TODO: future implementation to retrieve contract states from a Vault BLOB store
|
||||
stateAndRefs.addAll(uncheckedCast(services.loadStates(stateRefs)))
|
||||
}
|
||||
|
||||
val success = stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity
|
||||
if (success) {
|
||||
// we should have a minimum number of states to satisfy our selection `amount` criteria
|
||||
log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs")
|
||||
|
||||
// With the current single threaded state machine available states are guaranteed to lock.
|
||||
// TODO However, we will have to revisit these methods in the future multi-threaded.
|
||||
services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet())
|
||||
} else {
|
||||
log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}")
|
||||
}
|
||||
success
|
||||
}
|
||||
|
||||
// retry as more states may become available
|
||||
} catch (e: StatesNotAvailableException) { // Should never happen with single threaded state machine
|
||||
log.warn(e.message)
|
||||
// retry only if there are locked states that may become available again (or consumed with change)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@ -360,6 +360,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
drainingModePollPeriod = configuration.drainingModePollPeriod,
|
||||
nodeProperties = nodeProperties)
|
||||
|
||||
runOnStop += { schedulerService.join() }
|
||||
(serverThread as? ExecutorService)?.let {
|
||||
runOnStop += {
|
||||
// We wait here, even though any in-flight messages should have been drained away because the
|
||||
|
@ -21,13 +21,18 @@ import net.corda.core.contracts.ScheduledStateRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowLogicRefFactory
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.internal.FlowStateMachine
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.internal.concurrent.flatMap
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.join
|
||||
import net.corda.core.internal.until
|
||||
import net.corda.core.node.ServicesForResolution
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.CordaClock
|
||||
import net.corda.node.MutableClock
|
||||
@ -45,7 +50,15 @@ import org.slf4j.Logger
|
||||
import java.io.Serializable
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.*
|
||||
import java.util.concurrent.CancellationException
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.CompletionStage
|
||||
import java.util.concurrent.ExecutionException
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.Future
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.TimeoutException
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.EmbeddedId
|
||||
@ -92,7 +105,7 @@ class NodeSchedulerService(private val clock: CordaClock,
|
||||
// to wait in our code, rather than <code>Thread.sleep()</code> or other time-based pauses.
|
||||
@Suspendable
|
||||
@VisibleForTesting
|
||||
// We specify full classpath on SettableFuture to differentiate it from the Quasar class of the same name
|
||||
// We specify full classpath on SettableFuture to differentiate it from the Quasar class of the same name
|
||||
fun awaitWithDeadline(clock: CordaClock, deadline: Instant, future: Future<*> = GuavaSettableFuture.create<Any>()): Boolean {
|
||||
var nanos: Long
|
||||
do {
|
||||
@ -151,26 +164,32 @@ class NodeSchedulerService(private val clock: CordaClock,
|
||||
private class InnerState {
|
||||
var rescheduled: GuavaSettableFuture<Boolean>? = null
|
||||
var nextScheduledAction: ScheduledStateRef? = null
|
||||
var running: Boolean = true
|
||||
}
|
||||
|
||||
// Used to de-duplicate flow starts in case a flow is starting but the corresponding entry hasn't been removed yet
|
||||
// from the database
|
||||
private val startingStateRefs = ConcurrentHashSet<ScheduledStateRef>()
|
||||
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
private val schedulerTimerExecutor = Executors.newSingleThreadExecutor()
|
||||
|
||||
// if there's nothing to do, check every minute if something fell through the cracks.
|
||||
// any new state should trigger a reschedule immediately if nothing is scheduled, so I would not expect
|
||||
// this to usually trigger anything.
|
||||
private val idleWaitSeconds = 60.seconds
|
||||
|
||||
// We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow.
|
||||
fun start() {
|
||||
mutex.locked {
|
||||
rescheduleWakeUp()
|
||||
}
|
||||
schedulerTimerExecutor.execute { runLoopFunction() }
|
||||
}
|
||||
|
||||
override fun scheduleStateActivity(action: ScheduledStateRef) {
|
||||
log.trace { "Schedule $action" }
|
||||
// Only increase the number of unfinished schedules if the state didn't already exist on the queue
|
||||
val countUp = !schedulerRepo.merge(action)
|
||||
if (!schedulerRepo.merge(action)) {
|
||||
unfinishedSchedules.countUp()
|
||||
}
|
||||
contextTransaction.onCommit {
|
||||
if (countUp) unfinishedSchedules.countUp()
|
||||
mutex.locked {
|
||||
if (action.scheduledAt < nextScheduledAction?.scheduledAt ?: Instant.MAX) {
|
||||
// We are earliest
|
||||
@ -196,41 +215,44 @@ class NodeSchedulerService(private val clock: CordaClock,
|
||||
}
|
||||
}
|
||||
|
||||
private val schedulerTimerExecutor = Executors.newSingleThreadExecutor()
|
||||
/**
|
||||
* This method first cancels the [java.util.concurrent.Future] for any pending action so that the
|
||||
* [awaitWithDeadline] used below drops through without running the action. We then create a new
|
||||
* [java.util.concurrent.Future] for the new action (so it too can be cancelled), and then await the arrival of the
|
||||
* scheduled time. If we reach the scheduled time (the deadline) without the [java.util.concurrent.Future] being
|
||||
* cancelled then we run the scheduled action. Finally we remove that action from the scheduled actions and
|
||||
* recompute the next scheduled action.
|
||||
*/
|
||||
private fun rescheduleWakeUp() {
|
||||
// Note, we already have the mutex but we need the scope again here
|
||||
val (scheduledState, ourRescheduledFuture) = mutex.alreadyLocked {
|
||||
rescheduled?.cancel(false)
|
||||
rescheduled = GuavaSettableFuture.create()
|
||||
//get the next scheduled action that isn't currently running
|
||||
nextScheduledAction = schedulerRepo.getLatest(startingStateRefs.size + 1).firstOrNull { !startingStateRefs.contains(it.second) }?.second
|
||||
Pair(nextScheduledAction, rescheduled!!)
|
||||
}
|
||||
if (scheduledState != null) {
|
||||
schedulerTimerExecutor.execute {
|
||||
log.trace(schedulingAsNextFormat, scheduledState)
|
||||
// This will block the scheduler single thread until the scheduled time (returns false) OR
|
||||
// the Future is cancelled due to rescheduling (returns true).
|
||||
private fun runLoopFunction() {
|
||||
while (mutex.locked { running }) {
|
||||
val (scheduledState, ourRescheduledFuture) = mutex.locked {
|
||||
rescheduled = GuavaSettableFuture.create()
|
||||
//get the next scheduled action that isn't currently running
|
||||
val deduplicate = HashSet(startingStateRefs) // Take an immutable copy to remove races with afterDatabaseCommit.
|
||||
nextScheduledAction = schedulerRepo.getLatest(deduplicate.size + 1).firstOrNull { !deduplicate.contains(it.second) }?.second
|
||||
Pair(nextScheduledAction, rescheduled!!)
|
||||
}
|
||||
log.trace(schedulingAsNextFormat, scheduledState)
|
||||
// This will block the scheduler single thread until the scheduled time (returns false) OR
|
||||
// the Future is cancelled due to rescheduling (returns true).
|
||||
if (scheduledState != null) {
|
||||
if (!awaitWithDeadline(clock, scheduledState.scheduledAt, ourRescheduledFuture)) {
|
||||
log.trace { "Invoking as next $scheduledState" }
|
||||
onTimeReached(scheduledState)
|
||||
} else {
|
||||
log.trace { "Rescheduled $scheduledState" }
|
||||
}
|
||||
} else {
|
||||
awaitWithDeadline(clock, clock.instant() + idleWaitSeconds, ourRescheduledFuture)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private fun rescheduleWakeUp() {
|
||||
mutex.alreadyLocked {
|
||||
rescheduled?.cancel(false)
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
internal fun join() {
|
||||
mutex.locked {
|
||||
running = false
|
||||
rescheduleWakeUp()
|
||||
}
|
||||
schedulerTimerExecutor.join()
|
||||
}
|
||||
|
||||
@ -320,8 +342,6 @@ class NodeSchedulerService(private val clock: CordaClock,
|
||||
}
|
||||
}
|
||||
}
|
||||
// and schedule the next one
|
||||
rescheduleWakeUp()
|
||||
}
|
||||
return scheduledFlow
|
||||
}
|
||||
|
@ -10,8 +10,22 @@
|
||||
|
||||
package net.corda.node.services.events
|
||||
|
||||
import com.nhaarman.mockito_kotlin.*
|
||||
import net.corda.core.contracts.*
|
||||
import com.nhaarman.mockito_kotlin.any
|
||||
import com.nhaarman.mockito_kotlin.argForWhich
|
||||
import com.nhaarman.mockito_kotlin.doAnswer
|
||||
import com.nhaarman.mockito_kotlin.doReturn
|
||||
import com.nhaarman.mockito_kotlin.eq
|
||||
import com.nhaarman.mockito_kotlin.same
|
||||
import com.nhaarman.mockito_kotlin.timeout
|
||||
import com.nhaarman.mockito_kotlin.verify
|
||||
import com.nhaarman.mockito_kotlin.verifyNoMoreInteractions
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import junit.framework.Assert.fail
|
||||
import net.corda.core.contracts.SchedulableState
|
||||
import net.corda.core.contracts.ScheduledActivity
|
||||
import net.corda.core.contracts.ScheduledStateRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TransactionState
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowLogicRef
|
||||
@ -33,6 +47,7 @@ import net.corda.testing.internal.spectator
|
||||
import net.corda.testing.node.MockServices
|
||||
import net.corda.testing.node.TestClock
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Ignore
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
@ -42,6 +57,9 @@ import org.slf4j.Logger
|
||||
import java.time.Clock
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
open class NodeSchedulerServiceTestBase {
|
||||
@ -78,16 +96,30 @@ open class NodeSchedulerServiceTestBase {
|
||||
protected val servicesForResolution = rigorousMock<ServicesForResolution>().also {
|
||||
doLookup(transactionStates).whenever(it).loadState(any())
|
||||
}
|
||||
|
||||
protected val traces = Collections.synchronizedList(mutableListOf<ScheduledStateRef>())
|
||||
|
||||
@Before
|
||||
fun resetTraces() {
|
||||
traces.clear()
|
||||
}
|
||||
|
||||
protected val log = spectator<Logger>().also {
|
||||
doReturn(false).whenever(it).isTraceEnabled
|
||||
doAnswer {
|
||||
traces += it.getArgument<ScheduledStateRef>(1)
|
||||
}.whenever(it).trace(eq(NodeSchedulerService.schedulingAsNextFormat), any<Object>())
|
||||
}
|
||||
|
||||
protected fun assertWaitingFor(ssr: ScheduledStateRef, total: Int = 1) {
|
||||
// The timeout is to make verify wait, which is necessary as we're racing the NSS thread i.e. we often get here just before the trace:
|
||||
verify(log, timeout(5000).times(total)).trace(NodeSchedulerService.schedulingAsNextFormat, ssr)
|
||||
protected fun assertWaitingFor(ssr: ScheduledStateRef) {
|
||||
val endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(5)
|
||||
while (System.currentTimeMillis() < endTime) {
|
||||
if (traces.lastOrNull() == ssr) return
|
||||
}
|
||||
fail("Was expecting to by waiting for $ssr")
|
||||
}
|
||||
|
||||
protected fun assertWaitingFor(event: Event, total: Int = 1) = assertWaitingFor(event.ssr, total)
|
||||
protected fun assertWaitingFor(event: Event) = assertWaitingFor(event.ssr)
|
||||
|
||||
protected fun assertStarted(flowLogic: FlowLogic<*>) {
|
||||
// Like in assertWaitingFor, use timeout to make verify wait as we often race the call to startFlow:
|
||||
@ -98,7 +130,7 @@ open class NodeSchedulerServiceTestBase {
|
||||
}
|
||||
|
||||
class MockScheduledFlowRepository : ScheduledFlowRepository {
|
||||
private val map = HashMap<StateRef, ScheduledStateRef>()
|
||||
private val map = ConcurrentHashMap<StateRef, ScheduledStateRef>()
|
||||
|
||||
override fun getLatest(lookahead: Int): List<Pair<StateRef, ScheduledStateRef>> {
|
||||
return map.values.sortedBy { it.scheduledAt }.map { Pair(it.ref, it) }
|
||||
@ -189,7 +221,7 @@ class NodeSchedulerServiceTest : NodeSchedulerServiceTestBase() {
|
||||
assertWaitingFor(event1)
|
||||
testClock.advanceBy(1.days)
|
||||
assertStarted(event1)
|
||||
assertWaitingFor(event2, 2)
|
||||
assertWaitingFor(event2)
|
||||
testClock.advanceBy(1.days)
|
||||
assertStarted(event2)
|
||||
}
|
||||
@ -210,7 +242,6 @@ class NodeSchedulerServiceTest : NodeSchedulerServiceTestBase() {
|
||||
fun `test activity due in the future and schedule another for same time`() {
|
||||
val eventA = schedule(mark + 1.days)
|
||||
val eventB = schedule(mark + 1.days)
|
||||
assertWaitingFor(eventA)
|
||||
testClock.advanceBy(1.days)
|
||||
assertStarted(eventA)
|
||||
assertStarted(eventB)
|
||||
|
Loading…
x
Reference in New Issue
Block a user