Fix coin selection with Flow-friendly sleep (#1847)

This commit is contained in:
Rick Parker 2017-10-11 14:33:20 +01:00 committed by GitHub
parent d0d0f132df
commit 3fdc69e541
10 changed files with 191 additions and 95 deletions

View File

@ -1,6 +1,7 @@
package net.corda.core.flows
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import net.corda.core.crypto.SecureHash
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
@ -16,6 +17,8 @@ import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.debug
import org.slf4j.Logger
import java.time.Duration
import java.time.Instant
/**
* A sub-class of [FlowLogic<T>] implements a flow using direct, straight line blocking code. Thus you
@ -43,6 +46,34 @@ abstract class FlowLogic<out T> {
/** This is where you should log things to. */
val logger: Logger get() = stateMachine.logger
companion object {
/**
* Return the outermost [FlowLogic] instance, or null if not in a flow.
*/
@JvmStatic
val currentTopLevel: FlowLogic<*>? get() = (Strand.currentStrand() as? FlowStateMachine<*>)?.logic
/**
* If on a flow, suspends the flow and only wakes it up after at least [duration] time has passed. Otherwise,
* just sleep for [duration]. This sleep function is not designed to aid scheduling, for which you should
* consider using [SchedulableState]. It is designed to aid with managing contention for which you have not
* managed via another means.
*
* Warning: long sleeps and in general long running flows are highly discouraged, as there is currently no
* support for flow migration! This method will throw an exception if you attempt to sleep for longer than
* 5 minutes.
*/
@Suspendable
@JvmStatic
@Throws(FlowException::class)
fun sleep(duration: Duration) {
if (duration.compareTo(Duration.ofMinutes(5)) > 0) {
throw FlowException("Attempt to sleep for longer than 5 minutes is not supported. Consider using SchedulableState.")
}
(Strand.currentStrand() as? FlowStateMachine<*>)?.sleepUntil(Instant.now() + duration) ?: Strand.sleep(duration.toMillis())
}
}
/**
* Returns a wrapped [java.util.UUID] object that identifies this state machine run (i.e. subflows have the same
* identifier as their parents).

View File

@ -10,6 +10,7 @@ import net.corda.core.node.ServiceHub
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.UntrustworthyData
import org.slf4j.Logger
import java.time.Instant
/** This is an internal interface that is implemented by code in the node module. You should look at [FlowLogic]. */
interface FlowStateMachine<R> {
@ -35,6 +36,9 @@ interface FlowStateMachine<R> {
@Suspendable
fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction
@Suspendable
fun sleepUntil(until: Instant)
fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>)
fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map<String, String>)
@ -45,6 +49,7 @@ interface FlowStateMachine<R> {
@Suspendable
fun persistFlowStackSnapshot(flowClass: Class<out FlowLogic<*>>)
val logic: FlowLogic<R>
val serviceHub: ServiceHub
val logger: Logger
val id: StateMachineRunId

View File

@ -36,6 +36,12 @@ UNRELEASED
* Cordformation node building DSL can have an additional parameter `configFile` with the path to a properties file
to be appended to node.conf.
* ``FlowLogic`` now has a static method called ``sleep`` which can be used in certain circumstances to help with resolving
contention over states in flows. This should be used in place of any other sleep primitive since these are not compatible
with flows and their use will be prevented at some point in the future. Pay attention to the warnings and limitations
described in the documentation for this method. This helps resolve a bug in ``Cash`` coin selection.
A new static property `currentTopLevel` returns the top most `FlowLogic` instance, or null if not in a flow.
* ``CordaService`` annotated classes should be upgraded to take a constructor parameter of type ``AppServiceHub`` which
allows services to start flows marked with the ``StartableByService`` annotation. For backwards compatability
service classes with only ``ServiceHub`` constructors will still work.

View File

@ -32,6 +32,9 @@ dependencies {
testCompile project(':test-utils')
testCompile project(path: ':core', configuration: 'testArtifacts')
testCompile "junit:junit:$junit_version"
// AssertJ: for fluent assertions for testing
testCompile "org.assertj:assertj-core:$assertj_version"
}
configurations {

View File

@ -1,12 +1,12 @@
package net.corda.finance.contracts.asset.cash.selection
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import net.corda.core.contracts.Amount
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionState
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub
@ -34,8 +34,9 @@ class CashSelectionH2Impl : CashSelection {
}
// coin selection retry loop counter, sleep (msecs) and lock for selecting states
private val MAX_RETRIES = 5
private val MAX_RETRIES = 8
private val RETRY_SLEEP = 100
private val RETRY_CAP = 2000
private val spendLock: ReentrantLock = ReentrantLock()
/**
@ -73,78 +74,84 @@ class CashSelectionH2Impl : CashSelection {
// 3) H2 does not support JOIN's in FOR UPDATE (hence we are forced to execute 2 queries)
for (retryCount in 1..MAX_RETRIES) {
spendLock.withLock {
val statement = services.jdbcSession().createStatement()
try {
statement.execute("CALL SET(@t, CAST(0 AS BIGINT));")
// we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null)
// the softLockReserve update will detect whether we try to lock states locked by others
val selectJoin = """
SELECT vs.transaction_id, vs.output_index, vs.contract_state, ccs.pennies, SET(@t, ifnull(@t,0)+ccs.pennies) total_pennies, vs.lock_id
FROM vault_states AS vs, contract_cash_states AS ccs
WHERE vs.transaction_id = ccs.transaction_id AND vs.output_index = ccs.output_index
AND vs.state_status = 0
AND ccs.ccy_code = '${amount.token}' and @t < ${amount.quantity}
AND (vs.lock_id = '$lockId' OR vs.lock_id is null)
""" +
(if (notary != null)
" AND vs.notary_name = '${notary.name}'" else "") +
(if (onlyFromIssuerParties.isNotEmpty())
" AND ccs.issuer_key IN ($issuerKeysStr)" else "") +
(if (withIssuerRefs.isNotEmpty())
" AND ccs.issuer_ref IN ($issuerRefsStr)" else "")
// Retrieve spendable state refs
val rs = statement.executeQuery(selectJoin)
if (!attemptSpend(services, amount, lockId, notary, onlyFromIssuerParties, issuerKeysStr, withIssuerRefs, issuerRefsStr, stateAndRefs)) {
log.warn("Coin selection failed on attempt $retryCount")
// TODO: revisit the back off strategy for contended spending.
if (retryCount != MAX_RETRIES) {
stateAndRefs.clear()
log.debug(selectJoin)
var totalPennies = 0L
while (rs.next()) {
val txHash = SecureHash.parse(rs.getString(1))
val index = rs.getInt(2)
val stateRef = StateRef(txHash, index)
val state = rs.getBytes(3).deserialize<TransactionState<Cash.State>>(context = SerializationDefaults.STORAGE_CONTEXT)
val pennies = rs.getLong(4)
totalPennies = rs.getLong(5)
val rowLockId = rs.getString(6)
stateAndRefs.add(StateAndRef(state, stateRef))
log.trace { "ROW: $rowLockId ($lockId): $stateRef : $pennies ($totalPennies)" }
}
if (stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity) {
// we should have a minimum number of states to satisfy our selection `amount` criteria
log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs")
// With the current single threaded state machine available states are guaranteed to lock.
// TODO However, we will have to revisit these methods in the future multi-threaded.
services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet())
return stateAndRefs
}
log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}")
// retry as more states may become available
} catch (e: SQLException) {
log.error("""Failed retrieving unconsumed states for: amount [$amount], onlyFromIssuerParties [$onlyFromIssuerParties], notary [$notary], lockId [$lockId]
$e.
""")
} catch (e: StatesNotAvailableException) { // Should never happen with single threaded state machine
stateAndRefs.clear()
log.warn(e.message)
// retry only if there are locked states that may become available again (or consumed with change)
} finally {
statement.close()
val durationMillis = (minOf(RETRY_SLEEP.shl(retryCount), RETRY_CAP / 2) * (1.0 + Math.random())).toInt()
FlowLogic.sleep(durationMillis.millis)
} else {
log.warn("Insufficient spendable states identified for $amount")
}
}
log.warn("Coin selection failed on attempt $retryCount")
// TODO: revisit the back off strategy for contended spending.
if (retryCount != MAX_RETRIES) {
Strand.sleep(RETRY_SLEEP * retryCount.toLong())
} else {
break
}
}
log.warn("Insufficient spendable states identified for $amount")
return stateAndRefs
}
private fun attemptSpend(services: ServiceHub, amount: Amount<Currency>, lockId: UUID, notary: Party?, onlyFromIssuerParties: Set<AbstractParty>, issuerKeysStr: String, withIssuerRefs: Set<OpaqueBytes>, issuerRefsStr: String, stateAndRefs: MutableList<StateAndRef<Cash.State>>): Boolean {
spendLock.withLock {
val statement = services.jdbcSession().createStatement()
try {
statement.execute("CALL SET(@t, CAST(0 AS BIGINT));")
// we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null)
// the softLockReserve update will detect whether we try to lock states locked by others
val selectJoin = """
SELECT vs.transaction_id, vs.output_index, vs.contract_state, ccs.pennies, SET(@t, ifnull(@t,0)+ccs.pennies) total_pennies, vs.lock_id
FROM vault_states AS vs, contract_cash_states AS ccs
WHERE vs.transaction_id = ccs.transaction_id AND vs.output_index = ccs.output_index
AND vs.state_status = 0
AND ccs.ccy_code = '${amount.token}' and @t < ${amount.quantity}
AND (vs.lock_id = '$lockId' OR vs.lock_id is null)
""" +
(if (notary != null)
" AND vs.notary_name = '${notary.name}'" else "") +
(if (onlyFromIssuerParties.isNotEmpty())
" AND ccs.issuer_key IN ($issuerKeysStr)" else "") +
(if (withIssuerRefs.isNotEmpty())
" AND ccs.issuer_ref IN ($issuerRefsStr)" else "")
// Retrieve spendable state refs
val rs = statement.executeQuery(selectJoin)
log.debug(selectJoin)
var totalPennies = 0L
while (rs.next()) {
val txHash = SecureHash.parse(rs.getString(1))
val index = rs.getInt(2)
val stateRef = StateRef(txHash, index)
val state = rs.getBytes(3).deserialize<TransactionState<Cash.State>>(context = SerializationDefaults.STORAGE_CONTEXT)
val pennies = rs.getLong(4)
totalPennies = rs.getLong(5)
val rowLockId = rs.getString(6)
stateAndRefs.add(StateAndRef(state, stateRef))
log.trace { "ROW: $rowLockId ($lockId): $stateRef : $pennies ($totalPennies)" }
}
if (stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity) {
// we should have a minimum number of states to satisfy our selection `amount` criteria
log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs")
// With the current single threaded state machine available states are guaranteed to lock.
// TODO However, we will have to revisit these methods in the future multi-threaded.
services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet())
return true
}
log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}")
// retry as more states may become available
} catch (e: SQLException) {
log.error("""Failed retrieving unconsumed states for: amount [$amount], onlyFromIssuerParties [$onlyFromIssuerParties], notary [$notary], lockId [$lockId]
$e.
""")
} catch (e: StatesNotAvailableException) { // Should never happen with single threaded state machine
log.warn(e.message)
// retry only if there are locked states that may become available again (or consumed with change)
} finally {
statement.close()
}
}
return false
}
}

View File

@ -0,0 +1,40 @@
package net.corda.finance.contracts.asset
import net.corda.core.utilities.getOrThrow
import net.corda.finance.DOLLARS
import net.corda.finance.flows.CashException
import net.corda.finance.flows.CashPaymentFlow
import net.corda.testing.chooseIdentity
import net.corda.testing.node.MockNetwork
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
class CashSelectionH2Test {
@Test
fun `check does not hold connection over retries`() {
val mockNet = MockNetwork(threadPerNode = true)
try {
val notaryNode = mockNet.createNotaryNode()
val bankA = mockNet.createNode(configOverrides = { existingConfig ->
// Tweak connections to be minimal to make this easier (1 results in a hung node during start up, so use 2 connections).
existingConfig.dataSourceProperties.setProperty("maximumPoolSize", "2")
existingConfig
})
mockNet.startNodes()
// Start more cash spends than we have connections. If spend leaks a connection on retry, we will run out of connections.
val flow1 = bankA.services.startFlow(CashPaymentFlow(amount = 100.DOLLARS, anonymous = false, recipient = notaryNode.info.chooseIdentity()))
val flow2 = bankA.services.startFlow(CashPaymentFlow(amount = 100.DOLLARS, anonymous = false, recipient = notaryNode.info.chooseIdentity()))
val flow3 = bankA.services.startFlow(CashPaymentFlow(amount = 100.DOLLARS, anonymous = false, recipient = notaryNode.info.chooseIdentity()))
assertThatThrownBy { flow1.resultFuture.getOrThrow() }.isInstanceOf(CashException::class.java)
assertThatThrownBy { flow2.resultFuture.getOrThrow() }.isInstanceOf(CashException::class.java)
assertThatThrownBy { flow3.resultFuture.getOrThrow() }.isInstanceOf(CashException::class.java)
} finally {
mockNet.stopNodes()
}
}
}

View File

@ -2,6 +2,7 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.crypto.SecureHash
import java.time.Instant
interface FlowIORequest {
// This is used to identify where we suspended, in case of message mismatch errors and other things where we
@ -112,4 +113,9 @@ data class WaitForLedgerCommit(val hash: SecureHash, val fiber: FlowStateMachine
override fun shouldResume(message: ExistingSessionMessage, session: FlowSessionInternal): Boolean = message is ErrorSessionEnd
}
data class Sleep(val until: Instant, val fiber: FlowStateMachineImpl<*>) : FlowIORequest {
@Transient
override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot()
}
class StackSnapshot : Throwable("This is a stack trace to help identify the source of the underlying problem")

View File

@ -12,13 +12,9 @@ import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.abbreviate
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.isRegularFile
import net.corda.core.internal.staticField
import net.corda.core.internal.uncheckedCast
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.*
import net.corda.node.services.api.FlowAppAuditEvent
@ -32,13 +28,15 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.nio.file.Paths
import java.sql.SQLException
import java.time.Duration
import java.time.Instant
import java.util.*
import java.util.concurrent.TimeUnit
class FlowPermissionException(message: String) : FlowException(message)
class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
val logic: FlowLogic<R>,
override val logic: FlowLogic<R>,
scheduler: FiberScheduler,
override val flowInitiator: FlowInitiator,
// Store the Party rather than the full cert path with PartyAndCertificate
@ -52,23 +50,6 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
* Return the current [FlowStateMachineImpl] or null if executing outside of one.
*/
fun currentStateMachine(): FlowStateMachineImpl<*>? = Strand.currentStrand() as? FlowStateMachineImpl<*>
/**
* Provide a mechanism to sleep within a Strand without locking any transactional state
*/
// TODO: inlined due to an intermittent Quasar error (to be fully investigated)
@Suppress("NOTHING_TO_INLINE")
@Suspendable
inline fun sleep(millis: Long) {
if (currentStateMachine() != null) {
val db = DatabaseTransactionManager.dataSource
DatabaseTransactionManager.current().commit()
DatabaseTransactionManager.current().close()
Strand.sleep(millis)
DatabaseTransactionManager.dataSource = db
DatabaseTransactionManager.newTransaction()
} else Strand.sleep(millis)
}
}
// These fields shouldn't be serialised, so they are marked @Transient.
@ -259,6 +240,13 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
throw IllegalStateException("We were resumed after waiting for $hash but it wasn't found in our local storage")
}
// Provide a mechanism to sleep within a Strand without locking any transactional state.
// This checkpoints, since we cannot undo any database writes up to this point.
@Suspendable
override fun sleepUntil(until: Instant) {
suspend(Sleep(until, this))
}
// TODO Dummy implementation of access to application specific permission controls and audit logging
override fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>) {
val permissionGranted = true // TODO define permission control service on ServiceHubInternal and actually check authorization.
@ -494,6 +482,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
}
if (exceptionDuringSuspend == null && ioRequest is Sleep) {
// Sleep on the fiber. This will not sleep if it's in the past.
Strand.sleep(Duration.between(Instant.now(), ioRequest.until).toNanos(), TimeUnit.NANOSECONDS)
}
createTransaction()
// TODO Now that we're throwing outside of the suspend the FlowLogic can catch it. We need Quasar to terminate
// the fiber when exceptions occur inside a suspend.

View File

@ -584,6 +584,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
when (ioRequest) {
is SendRequest -> processSendRequest(ioRequest)
is WaitForLedgerCommit -> processWaitForCommitRequest(ioRequest)
is Sleep -> processSleepRequest(ioRequest)
}
}
@ -621,6 +622,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
}
private fun processSleepRequest(ioRequest: Sleep) {
// Resume the fiber now we have checkpointed, so we can sleep on the Fiber.
resumeFiber(ioRequest.fiber)
}
private fun sendSessionMessage(party: Party, message: SessionMessage, fiber: FlowStateMachineImpl<*>? = null, retryId: Long? = null) {
val partyInfo = serviceHub.networkMapCache.getPartyInfo(party)
?: throw IllegalArgumentException("Don't know about party $party")

View File

@ -52,7 +52,7 @@ class InteractiveShellTest {
private fun check(input: String, expected: String) {
var output: DummyFSM? = null
InteractiveShell.runFlowFromString({ DummyFSM(it as FlowA).apply { output = this } }, input, FlowA::class.java, om)
assertEquals(expected, output!!.logic.a, input)
assertEquals(expected, output!!.flowA.a, input)
}
@Test
@ -83,5 +83,5 @@ class InteractiveShellTest {
@Test
fun party() = check("party: \"${MEGA_CORP.name}\"", MEGA_CORP.name.toString())
class DummyFSM(val logic: FlowA) : FlowStateMachine<Any?> by mock()
class DummyFSM(val flowA: FlowA) : FlowStateMachine<Any?> by mock()
}