mirror of
https://github.com/corda/corda.git
synced 2025-06-02 23:50:54 +00:00
Fix coin selection with Flow-friendly sleep (#1847)
This commit is contained in:
parent
76f0fbef8d
commit
310f0daa37
@ -1,6 +1,7 @@
|
|||||||
package net.corda.core.flows
|
package net.corda.core.flows
|
||||||
|
|
||||||
import co.paralleluniverse.fibers.Suspendable
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
|
import co.paralleluniverse.strands.Strand
|
||||||
import net.corda.core.crypto.SecureHash
|
import net.corda.core.crypto.SecureHash
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
import net.corda.core.identity.PartyAndCertificate
|
import net.corda.core.identity.PartyAndCertificate
|
||||||
@ -15,6 +16,8 @@ import net.corda.core.utilities.ProgressTracker
|
|||||||
import net.corda.core.utilities.UntrustworthyData
|
import net.corda.core.utilities.UntrustworthyData
|
||||||
import net.corda.core.utilities.debug
|
import net.corda.core.utilities.debug
|
||||||
import org.slf4j.Logger
|
import org.slf4j.Logger
|
||||||
|
import java.time.Duration
|
||||||
|
import java.time.Instant
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A sub-class of [FlowLogic<T>] implements a flow using direct, straight line blocking code. Thus you
|
* A sub-class of [FlowLogic<T>] implements a flow using direct, straight line blocking code. Thus you
|
||||||
@ -42,6 +45,34 @@ abstract class FlowLogic<out T> {
|
|||||||
/** This is where you should log things to. */
|
/** This is where you should log things to. */
|
||||||
val logger: Logger get() = stateMachine.logger
|
val logger: Logger get() = stateMachine.logger
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
/**
|
||||||
|
* Return the outermost [FlowLogic] instance, or null if not in a flow.
|
||||||
|
*/
|
||||||
|
@JvmStatic
|
||||||
|
val currentTopLevel: FlowLogic<*>? get() = (Strand.currentStrand() as? FlowStateMachine<*>)?.logic
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If on a flow, suspends the flow and only wakes it up after at least [duration] time has passed. Otherwise,
|
||||||
|
* just sleep for [duration]. This sleep function is not designed to aid scheduling, for which you should
|
||||||
|
* consider using [SchedulableState]. It is designed to aid with managing contention for which you have not
|
||||||
|
* managed via another means.
|
||||||
|
*
|
||||||
|
* Warning: long sleeps and in general long running flows are highly discouraged, as there is currently no
|
||||||
|
* support for flow migration! This method will throw an exception if you attempt to sleep for longer than
|
||||||
|
* 5 minutes.
|
||||||
|
*/
|
||||||
|
@Suspendable
|
||||||
|
@JvmStatic
|
||||||
|
@Throws(FlowException::class)
|
||||||
|
fun sleep(duration: Duration) {
|
||||||
|
if (duration.compareTo(Duration.ofMinutes(5)) > 0) {
|
||||||
|
throw FlowException("Attempt to sleep for longer than 5 minutes is not supported. Consider using SchedulableState.")
|
||||||
|
}
|
||||||
|
(Strand.currentStrand() as? FlowStateMachine<*>)?.sleepUntil(Instant.now() + duration) ?: Strand.sleep(duration.toMillis())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a wrapped [java.util.UUID] object that identifies this state machine run (i.e. subflows have the same
|
* Returns a wrapped [java.util.UUID] object that identifies this state machine run (i.e. subflows have the same
|
||||||
* identifier as their parents).
|
* identifier as their parents).
|
||||||
|
@ -10,6 +10,7 @@ import net.corda.core.node.ServiceHub
|
|||||||
import net.corda.core.transactions.SignedTransaction
|
import net.corda.core.transactions.SignedTransaction
|
||||||
import net.corda.core.utilities.UntrustworthyData
|
import net.corda.core.utilities.UntrustworthyData
|
||||||
import org.slf4j.Logger
|
import org.slf4j.Logger
|
||||||
|
import java.time.Instant
|
||||||
|
|
||||||
/** This is an internal interface that is implemented by code in the node module. You should look at [FlowLogic]. */
|
/** This is an internal interface that is implemented by code in the node module. You should look at [FlowLogic]. */
|
||||||
interface FlowStateMachine<R> {
|
interface FlowStateMachine<R> {
|
||||||
@ -35,6 +36,9 @@ interface FlowStateMachine<R> {
|
|||||||
@Suspendable
|
@Suspendable
|
||||||
fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction
|
fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction
|
||||||
|
|
||||||
|
@Suspendable
|
||||||
|
fun sleepUntil(until: Instant)
|
||||||
|
|
||||||
fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>): Unit
|
fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>): Unit
|
||||||
|
|
||||||
fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map<String, String>): Unit
|
fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map<String, String>): Unit
|
||||||
@ -45,6 +49,7 @@ interface FlowStateMachine<R> {
|
|||||||
@Suspendable
|
@Suspendable
|
||||||
fun persistFlowStackSnapshot(flowClass: Class<out FlowLogic<*>>): Unit
|
fun persistFlowStackSnapshot(flowClass: Class<out FlowLogic<*>>): Unit
|
||||||
|
|
||||||
|
val logic: FlowLogic<R>
|
||||||
val serviceHub: ServiceHub
|
val serviceHub: ServiceHub
|
||||||
val logger: Logger
|
val logger: Logger
|
||||||
val id: StateMachineRunId
|
val id: StateMachineRunId
|
||||||
|
@ -4,12 +4,20 @@ Changelog
|
|||||||
Here are brief summaries of what's changed between each snapshot release. This includes guidance on how to upgrade code
|
Here are brief summaries of what's changed between each snapshot release. This includes guidance on how to upgrade code
|
||||||
from the previous milestone release.
|
from the previous milestone release.
|
||||||
|
|
||||||
UNRELEASED
|
.. _changelog_v2:
|
||||||
----------
|
|
||||||
|
Release 2.0
|
||||||
|
-----------
|
||||||
|
|
||||||
* ``OpaqueBytes.bytes`` now returns a clone of its underlying ``ByteArray``, and has been redeclared as ``final``.
|
* ``OpaqueBytes.bytes`` now returns a clone of its underlying ``ByteArray``, and has been redeclared as ``final``.
|
||||||
This is a minor change to the public API, but is required to ensure that classes like ``SecureHash`` are immutable.
|
This is a minor change to the public API, but is required to ensure that classes like ``SecureHash`` are immutable.
|
||||||
|
|
||||||
|
* ``FlowLogic`` now has a static method called ``sleep`` which can be used in certain circumstances to help with resolving
|
||||||
|
contention over states in flows. This should be used in place of any other sleep primitive since these are not compatible
|
||||||
|
with flows and their use will be prevented at some point in the future. Pay attention to the warnings and limitations
|
||||||
|
described in the documentation for this method. This helps resolve a bug in ``Cash`` coin selection.
|
||||||
|
A new static property `currentTopLevel` returns the top most `FlowLogic` instance, or null if not in a flow.
|
||||||
|
|
||||||
.. _changelog_v1:
|
.. _changelog_v1:
|
||||||
|
|
||||||
Release 1.0
|
Release 1.0
|
||||||
|
@ -19,6 +19,9 @@ dependencies {
|
|||||||
testCompile project(':test-utils')
|
testCompile project(':test-utils')
|
||||||
testCompile project(path: ':core', configuration: 'testArtifacts')
|
testCompile project(path: ':core', configuration: 'testArtifacts')
|
||||||
testCompile "junit:junit:$junit_version"
|
testCompile "junit:junit:$junit_version"
|
||||||
|
|
||||||
|
// AssertJ: for fluent assertions for testing
|
||||||
|
testCompile "org.assertj:assertj-core:$assertj_version"
|
||||||
}
|
}
|
||||||
|
|
||||||
configurations {
|
configurations {
|
||||||
|
@ -1,12 +1,12 @@
|
|||||||
package net.corda.finance.contracts.asset.cash.selection
|
package net.corda.finance.contracts.asset.cash.selection
|
||||||
|
|
||||||
import co.paralleluniverse.fibers.Suspendable
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
import co.paralleluniverse.strands.Strand
|
|
||||||
import net.corda.core.contracts.Amount
|
import net.corda.core.contracts.Amount
|
||||||
import net.corda.core.contracts.StateAndRef
|
import net.corda.core.contracts.StateAndRef
|
||||||
import net.corda.core.contracts.StateRef
|
import net.corda.core.contracts.StateRef
|
||||||
import net.corda.core.contracts.TransactionState
|
import net.corda.core.contracts.TransactionState
|
||||||
import net.corda.core.crypto.SecureHash
|
import net.corda.core.crypto.SecureHash
|
||||||
|
import net.corda.core.flows.FlowLogic
|
||||||
import net.corda.core.identity.AbstractParty
|
import net.corda.core.identity.AbstractParty
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
import net.corda.core.node.ServiceHub
|
import net.corda.core.node.ServiceHub
|
||||||
@ -34,8 +34,9 @@ class CashSelectionH2Impl : CashSelection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// coin selection retry loop counter, sleep (msecs) and lock for selecting states
|
// coin selection retry loop counter, sleep (msecs) and lock for selecting states
|
||||||
private val MAX_RETRIES = 5
|
private val MAX_RETRIES = 8
|
||||||
private val RETRY_SLEEP = 100
|
private val RETRY_SLEEP = 100
|
||||||
|
private val RETRY_CAP = 2000
|
||||||
private val spendLock: ReentrantLock = ReentrantLock()
|
private val spendLock: ReentrantLock = ReentrantLock()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -73,78 +74,84 @@ class CashSelectionH2Impl : CashSelection {
|
|||||||
// 3) H2 does not support JOIN's in FOR UPDATE (hence we are forced to execute 2 queries)
|
// 3) H2 does not support JOIN's in FOR UPDATE (hence we are forced to execute 2 queries)
|
||||||
|
|
||||||
for (retryCount in 1..MAX_RETRIES) {
|
for (retryCount in 1..MAX_RETRIES) {
|
||||||
|
if (!attemptSpend(services, amount, lockId, notary, onlyFromIssuerParties, issuerKeysStr, withIssuerRefs, issuerRefsStr, stateAndRefs)) {
|
||||||
spendLock.withLock {
|
log.warn("Coin selection failed on attempt $retryCount")
|
||||||
val statement = services.jdbcSession().createStatement()
|
// TODO: revisit the back off strategy for contended spending.
|
||||||
try {
|
if (retryCount != MAX_RETRIES) {
|
||||||
statement.execute("CALL SET(@t, CAST(0 AS BIGINT));")
|
|
||||||
|
|
||||||
// we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null)
|
|
||||||
// the softLockReserve update will detect whether we try to lock states locked by others
|
|
||||||
val selectJoin = """
|
|
||||||
SELECT vs.transaction_id, vs.output_index, vs.contract_state, ccs.pennies, SET(@t, ifnull(@t,0)+ccs.pennies) total_pennies, vs.lock_id
|
|
||||||
FROM vault_states AS vs, contract_cash_states AS ccs
|
|
||||||
WHERE vs.transaction_id = ccs.transaction_id AND vs.output_index = ccs.output_index
|
|
||||||
AND vs.state_status = 0
|
|
||||||
AND ccs.ccy_code = '${amount.token}' and @t < ${amount.quantity}
|
|
||||||
AND (vs.lock_id = '$lockId' OR vs.lock_id is null)
|
|
||||||
""" +
|
|
||||||
(if (notary != null)
|
|
||||||
" AND vs.notary_name = '${notary.name}'" else "") +
|
|
||||||
(if (onlyFromIssuerParties.isNotEmpty())
|
|
||||||
" AND ccs.issuer_key IN ($issuerKeysStr)" else "") +
|
|
||||||
(if (withIssuerRefs.isNotEmpty())
|
|
||||||
" AND ccs.issuer_ref IN ($issuerRefsStr)" else "")
|
|
||||||
|
|
||||||
// Retrieve spendable state refs
|
|
||||||
val rs = statement.executeQuery(selectJoin)
|
|
||||||
stateAndRefs.clear()
|
stateAndRefs.clear()
|
||||||
log.debug(selectJoin)
|
val durationMillis = (minOf(RETRY_SLEEP.shl(retryCount), RETRY_CAP / 2) * (1.0 + Math.random())).toInt()
|
||||||
var totalPennies = 0L
|
FlowLogic.sleep(durationMillis.millis)
|
||||||
while (rs.next()) {
|
} else {
|
||||||
val txHash = SecureHash.parse(rs.getString(1))
|
log.warn("Insufficient spendable states identified for $amount")
|
||||||
val index = rs.getInt(2)
|
|
||||||
val stateRef = StateRef(txHash, index)
|
|
||||||
val state = rs.getBytes(3).deserialize<TransactionState<Cash.State>>(context = SerializationDefaults.STORAGE_CONTEXT)
|
|
||||||
val pennies = rs.getLong(4)
|
|
||||||
totalPennies = rs.getLong(5)
|
|
||||||
val rowLockId = rs.getString(6)
|
|
||||||
stateAndRefs.add(StateAndRef(state, stateRef))
|
|
||||||
log.trace { "ROW: $rowLockId ($lockId): $stateRef : $pennies ($totalPennies)" }
|
|
||||||
}
|
|
||||||
|
|
||||||
if (stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity) {
|
|
||||||
// we should have a minimum number of states to satisfy our selection `amount` criteria
|
|
||||||
log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs")
|
|
||||||
|
|
||||||
// With the current single threaded state machine available states are guaranteed to lock.
|
|
||||||
// TODO However, we will have to revisit these methods in the future multi-threaded.
|
|
||||||
services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet())
|
|
||||||
return stateAndRefs
|
|
||||||
}
|
|
||||||
log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}")
|
|
||||||
// retry as more states may become available
|
|
||||||
} catch (e: SQLException) {
|
|
||||||
log.error("""Failed retrieving unconsumed states for: amount [$amount], onlyFromIssuerParties [$onlyFromIssuerParties], notary [$notary], lockId [$lockId]
|
|
||||||
$e.
|
|
||||||
""")
|
|
||||||
} catch (e: StatesNotAvailableException) { // Should never happen with single threaded state machine
|
|
||||||
stateAndRefs.clear()
|
|
||||||
log.warn(e.message)
|
|
||||||
// retry only if there are locked states that may become available again (or consumed with change)
|
|
||||||
} finally {
|
|
||||||
statement.close()
|
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
|
break
|
||||||
log.warn("Coin selection failed on attempt $retryCount")
|
|
||||||
// TODO: revisit the back off strategy for contended spending.
|
|
||||||
if (retryCount != MAX_RETRIES) {
|
|
||||||
Strand.sleep(RETRY_SLEEP * retryCount.toLong())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.warn("Insufficient spendable states identified for $amount")
|
|
||||||
return stateAndRefs
|
return stateAndRefs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun attemptSpend(services: ServiceHub, amount: Amount<Currency>, lockId: UUID, notary: Party?, onlyFromIssuerParties: Set<AbstractParty>, issuerKeysStr: String, withIssuerRefs: Set<OpaqueBytes>, issuerRefsStr: String, stateAndRefs: MutableList<StateAndRef<Cash.State>>): Boolean {
|
||||||
|
spendLock.withLock {
|
||||||
|
val statement = services.jdbcSession().createStatement()
|
||||||
|
try {
|
||||||
|
statement.execute("CALL SET(@t, CAST(0 AS BIGINT));")
|
||||||
|
|
||||||
|
// we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null)
|
||||||
|
// the softLockReserve update will detect whether we try to lock states locked by others
|
||||||
|
val selectJoin = """
|
||||||
|
SELECT vs.transaction_id, vs.output_index, vs.contract_state, ccs.pennies, SET(@t, ifnull(@t,0)+ccs.pennies) total_pennies, vs.lock_id
|
||||||
|
FROM vault_states AS vs, contract_cash_states AS ccs
|
||||||
|
WHERE vs.transaction_id = ccs.transaction_id AND vs.output_index = ccs.output_index
|
||||||
|
AND vs.state_status = 0
|
||||||
|
AND ccs.ccy_code = '${amount.token}' and @t < ${amount.quantity}
|
||||||
|
AND (vs.lock_id = '$lockId' OR vs.lock_id is null)
|
||||||
|
""" +
|
||||||
|
(if (notary != null)
|
||||||
|
" AND vs.notary_name = '${notary.name}'" else "") +
|
||||||
|
(if (onlyFromIssuerParties.isNotEmpty())
|
||||||
|
" AND ccs.issuer_key IN ($issuerKeysStr)" else "") +
|
||||||
|
(if (withIssuerRefs.isNotEmpty())
|
||||||
|
" AND ccs.issuer_ref IN ($issuerRefsStr)" else "")
|
||||||
|
|
||||||
|
// Retrieve spendable state refs
|
||||||
|
val rs = statement.executeQuery(selectJoin)
|
||||||
|
log.debug(selectJoin)
|
||||||
|
var totalPennies = 0L
|
||||||
|
while (rs.next()) {
|
||||||
|
val txHash = SecureHash.parse(rs.getString(1))
|
||||||
|
val index = rs.getInt(2)
|
||||||
|
val stateRef = StateRef(txHash, index)
|
||||||
|
val state = rs.getBytes(3).deserialize<TransactionState<Cash.State>>(context = SerializationDefaults.STORAGE_CONTEXT)
|
||||||
|
val pennies = rs.getLong(4)
|
||||||
|
totalPennies = rs.getLong(5)
|
||||||
|
val rowLockId = rs.getString(6)
|
||||||
|
stateAndRefs.add(StateAndRef(state, stateRef))
|
||||||
|
log.trace { "ROW: $rowLockId ($lockId): $stateRef : $pennies ($totalPennies)" }
|
||||||
|
}
|
||||||
|
|
||||||
|
if (stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity) {
|
||||||
|
// we should have a minimum number of states to satisfy our selection `amount` criteria
|
||||||
|
log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs")
|
||||||
|
|
||||||
|
// With the current single threaded state machine available states are guaranteed to lock.
|
||||||
|
// TODO However, we will have to revisit these methods in the future multi-threaded.
|
||||||
|
services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet())
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}")
|
||||||
|
// retry as more states may become available
|
||||||
|
} catch (e: SQLException) {
|
||||||
|
log.error("""Failed retrieving unconsumed states for: amount [$amount], onlyFromIssuerParties [$onlyFromIssuerParties], notary [$notary], lockId [$lockId]
|
||||||
|
$e.
|
||||||
|
""")
|
||||||
|
} catch (e: StatesNotAvailableException) { // Should never happen with single threaded state machine
|
||||||
|
log.warn(e.message)
|
||||||
|
// retry only if there are locked states that may become available again (or consumed with change)
|
||||||
|
} finally {
|
||||||
|
statement.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
}
|
}
|
@ -0,0 +1,40 @@
|
|||||||
|
package net.corda.finance.contracts.asset
|
||||||
|
|
||||||
|
import net.corda.core.utilities.getOrThrow
|
||||||
|
import net.corda.finance.DOLLARS
|
||||||
|
import net.corda.finance.flows.CashException
|
||||||
|
import net.corda.finance.flows.CashPaymentFlow
|
||||||
|
import net.corda.testing.chooseIdentity
|
||||||
|
import net.corda.testing.node.MockNetwork
|
||||||
|
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||||
|
import org.junit.Test
|
||||||
|
|
||||||
|
|
||||||
|
class CashSelectionH2Test {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `check does not hold connection over retries`() {
|
||||||
|
val mockNet = MockNetwork(threadPerNode = true)
|
||||||
|
try {
|
||||||
|
val notaryNode = mockNet.createNotaryNode()
|
||||||
|
val bankA = mockNet.createNode(configOverrides = { existingConfig ->
|
||||||
|
// Tweak connections to be minimal to make this easier (1 results in a hung node during start up, so use 2 connections).
|
||||||
|
existingConfig.dataSourceProperties.setProperty("maximumPoolSize", "2")
|
||||||
|
existingConfig
|
||||||
|
})
|
||||||
|
|
||||||
|
mockNet.startNodes()
|
||||||
|
|
||||||
|
// Start more cash spends than we have connections. If spend leaks a connection on retry, we will run out of connections.
|
||||||
|
val flow1 = bankA.services.startFlow(CashPaymentFlow(amount = 100.DOLLARS, anonymous = false, recipient = notaryNode.info.chooseIdentity()))
|
||||||
|
val flow2 = bankA.services.startFlow(CashPaymentFlow(amount = 100.DOLLARS, anonymous = false, recipient = notaryNode.info.chooseIdentity()))
|
||||||
|
val flow3 = bankA.services.startFlow(CashPaymentFlow(amount = 100.DOLLARS, anonymous = false, recipient = notaryNode.info.chooseIdentity()))
|
||||||
|
|
||||||
|
assertThatThrownBy { flow1.resultFuture.getOrThrow() }.isInstanceOf(CashException::class.java)
|
||||||
|
assertThatThrownBy { flow2.resultFuture.getOrThrow() }.isInstanceOf(CashException::class.java)
|
||||||
|
assertThatThrownBy { flow3.resultFuture.getOrThrow() }.isInstanceOf(CashException::class.java)
|
||||||
|
} finally {
|
||||||
|
mockNet.stopNodes()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,7 @@
|
|||||||
package net.corda.node.services.statemachine
|
package net.corda.node.services.statemachine
|
||||||
|
|
||||||
import net.corda.core.crypto.SecureHash
|
import net.corda.core.crypto.SecureHash
|
||||||
|
import java.time.Instant
|
||||||
|
|
||||||
interface FlowIORequest {
|
interface FlowIORequest {
|
||||||
// This is used to identify where we suspended, in case of message mismatch errors and other things where we
|
// This is used to identify where we suspended, in case of message mismatch errors and other things where we
|
||||||
@ -48,4 +49,9 @@ data class WaitForLedgerCommit(val hash: SecureHash, val fiber: FlowStateMachine
|
|||||||
override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot()
|
override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
data class Sleep(val until: Instant, val fiber: FlowStateMachineImpl<*>) : FlowIORequest {
|
||||||
|
@Transient
|
||||||
|
override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot()
|
||||||
|
}
|
||||||
|
|
||||||
class StackSnapshot : Throwable("This is a stack trace to help identify the source of the underlying problem")
|
class StackSnapshot : Throwable("This is a stack trace to help identify the source of the underlying problem")
|
||||||
|
@ -12,8 +12,7 @@ import net.corda.core.crypto.random63BitValue
|
|||||||
import net.corda.core.flows.*
|
import net.corda.core.flows.*
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
import net.corda.core.identity.PartyAndCertificate
|
import net.corda.core.identity.PartyAndCertificate
|
||||||
import net.corda.core.internal.FlowStateMachine
|
import net.corda.core.internal.*
|
||||||
import net.corda.core.internal.abbreviate
|
|
||||||
import net.corda.core.internal.concurrent.OpenFuture
|
import net.corda.core.internal.concurrent.OpenFuture
|
||||||
import net.corda.core.internal.concurrent.openFuture
|
import net.corda.core.internal.concurrent.openFuture
|
||||||
import net.corda.core.internal.isRegularFile
|
import net.corda.core.internal.isRegularFile
|
||||||
@ -32,13 +31,15 @@ import org.slf4j.Logger
|
|||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import java.nio.file.Paths
|
import java.nio.file.Paths
|
||||||
import java.sql.SQLException
|
import java.sql.SQLException
|
||||||
|
import java.time.Duration
|
||||||
|
import java.time.Instant
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
class FlowPermissionException(message: String) : FlowException(message)
|
class FlowPermissionException(message: String) : FlowException(message)
|
||||||
|
|
||||||
class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||||
val logic: FlowLogic<R>,
|
override val logic: FlowLogic<R>,
|
||||||
scheduler: FiberScheduler,
|
scheduler: FiberScheduler,
|
||||||
override val flowInitiator: FlowInitiator,
|
override val flowInitiator: FlowInitiator,
|
||||||
// Store the Party rather than the full cert path with PartyAndCertificate
|
// Store the Party rather than the full cert path with PartyAndCertificate
|
||||||
@ -52,23 +53,6 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
* Return the current [FlowStateMachineImpl] or null if executing outside of one.
|
* Return the current [FlowStateMachineImpl] or null if executing outside of one.
|
||||||
*/
|
*/
|
||||||
fun currentStateMachine(): FlowStateMachineImpl<*>? = Strand.currentStrand() as? FlowStateMachineImpl<*>
|
fun currentStateMachine(): FlowStateMachineImpl<*>? = Strand.currentStrand() as? FlowStateMachineImpl<*>
|
||||||
|
|
||||||
/**
|
|
||||||
* Provide a mechanism to sleep within a Strand without locking any transactional state
|
|
||||||
*/
|
|
||||||
// TODO: inlined due to an intermittent Quasar error (to be fully investigated)
|
|
||||||
@Suppress("NOTHING_TO_INLINE")
|
|
||||||
@Suspendable
|
|
||||||
inline fun sleep(millis: Long) {
|
|
||||||
if (currentStateMachine() != null) {
|
|
||||||
val db = DatabaseTransactionManager.dataSource
|
|
||||||
DatabaseTransactionManager.current().commit()
|
|
||||||
DatabaseTransactionManager.current().close()
|
|
||||||
Strand.sleep(millis)
|
|
||||||
DatabaseTransactionManager.dataSource = db
|
|
||||||
DatabaseTransactionManager.newTransaction()
|
|
||||||
} else Strand.sleep(millis)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// These fields shouldn't be serialised, so they are marked @Transient.
|
// These fields shouldn't be serialised, so they are marked @Transient.
|
||||||
@ -259,6 +243,13 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
throw IllegalStateException("We were resumed after waiting for $hash but it wasn't found in our local storage")
|
throw IllegalStateException("We were resumed after waiting for $hash but it wasn't found in our local storage")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Provide a mechanism to sleep within a Strand without locking any transactional state.
|
||||||
|
// This checkpoints, since we cannot undo any database writes up to this point.
|
||||||
|
@Suspendable
|
||||||
|
override fun sleepUntil(until: Instant) {
|
||||||
|
suspend(Sleep(until, this))
|
||||||
|
}
|
||||||
|
|
||||||
// TODO Dummy implementation of access to application specific permission controls and audit logging
|
// TODO Dummy implementation of access to application specific permission controls and audit logging
|
||||||
override fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>) {
|
override fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>) {
|
||||||
// This is a hack to allow cash app access list of permitted issuer currency.
|
// This is a hack to allow cash app access list of permitted issuer currency.
|
||||||
@ -481,6 +472,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (exceptionDuringSuspend == null && ioRequest is Sleep) {
|
||||||
|
// Sleep on the fiber. This will not sleep if it's in the past.
|
||||||
|
Strand.sleep(Duration.between(Instant.now(), ioRequest.until).toNanos(), TimeUnit.NANOSECONDS)
|
||||||
|
}
|
||||||
createTransaction()
|
createTransaction()
|
||||||
// TODO Now that we're throwing outside of the suspend the FlowLogic can catch it. We need Quasar to terminate
|
// TODO Now that we're throwing outside of the suspend the FlowLogic can catch it. We need Quasar to terminate
|
||||||
// the fiber when exceptions occur inside a suspend.
|
// the fiber when exceptions occur inside a suspend.
|
||||||
|
@ -583,6 +583,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
|||||||
when (ioRequest) {
|
when (ioRequest) {
|
||||||
is SendRequest -> processSendRequest(ioRequest)
|
is SendRequest -> processSendRequest(ioRequest)
|
||||||
is WaitForLedgerCommit -> processWaitForCommitRequest(ioRequest)
|
is WaitForLedgerCommit -> processWaitForCommitRequest(ioRequest)
|
||||||
|
is Sleep -> processSleepRequest(ioRequest)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -620,6 +621,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun processSleepRequest(ioRequest: Sleep) {
|
||||||
|
// Resume the fiber now we have checkpointed, so we can sleep on the Fiber.
|
||||||
|
resumeFiber(ioRequest.fiber)
|
||||||
|
}
|
||||||
|
|
||||||
private fun sendSessionMessage(party: Party, message: SessionMessage, fiber: FlowStateMachineImpl<*>? = null, retryId: Long? = null) {
|
private fun sendSessionMessage(party: Party, message: SessionMessage, fiber: FlowStateMachineImpl<*>? = null, retryId: Long? = null) {
|
||||||
val partyInfo = serviceHub.networkMapCache.getPartyInfo(party)
|
val partyInfo = serviceHub.networkMapCache.getPartyInfo(party)
|
||||||
?: throw IllegalArgumentException("Don't know about party $party")
|
?: throw IllegalArgumentException("Don't know about party $party")
|
||||||
|
@ -52,7 +52,7 @@ class InteractiveShellTest {
|
|||||||
private fun check(input: String, expected: String) {
|
private fun check(input: String, expected: String) {
|
||||||
var output: DummyFSM? = null
|
var output: DummyFSM? = null
|
||||||
InteractiveShell.runFlowFromString({ DummyFSM(it as FlowA).apply { output = this } }, input, FlowA::class.java, om)
|
InteractiveShell.runFlowFromString({ DummyFSM(it as FlowA).apply { output = this } }, input, FlowA::class.java, om)
|
||||||
assertEquals(expected, output!!.logic.a, input)
|
assertEquals(expected, output!!.flowA.a, input)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -84,4 +84,4 @@ class InteractiveShellTest {
|
|||||||
fun party() = check("party: \"${MEGA_CORP.name}\"", MEGA_CORP.name.toString())
|
fun party() = check("party: \"${MEGA_CORP.name}\"", MEGA_CORP.name.toString())
|
||||||
|
|
||||||
class DummyFSM(val logic: FlowA) : FlowStateMachine<Any?> by mock()
|
class DummyFSM(val logic: FlowA) : FlowStateMachine<Any?> by mock()
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user