From 3fdc69e54199b9eedddc82b597c95c73870cc998 Mon Sep 17 00:00:00 2001 From: Rick Parker Date: Wed, 11 Oct 2017 14:33:20 +0100 Subject: [PATCH] Fix coin selection with Flow-friendly sleep (#1847) --- .../kotlin/net/corda/core/flows/FlowLogic.kt | 31 ++++ .../corda/core/internal/FlowStateMachine.kt | 5 + docs/source/changelog.rst | 6 + finance/build.gradle | 3 + .../cash/selection/CashSelectionH2Impl.kt | 147 +++++++++--------- .../contracts/asset/CashSelectionH2Test.kt | 40 +++++ .../services/statemachine/FlowIORequest.kt | 6 + .../statemachine/FlowStateMachineImpl.kt | 38 ++--- .../statemachine/StateMachineManager.kt | 6 + .../net/corda/node/InteractiveShellTest.kt | 4 +- 10 files changed, 191 insertions(+), 95 deletions(-) create mode 100644 finance/src/test/kotlin/net/corda/finance/contracts/asset/CashSelectionH2Test.kt diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index 419c918753..0239c81f20 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -1,6 +1,7 @@ package net.corda.core.flows import co.paralleluniverse.fibers.Suspendable +import co.paralleluniverse.strands.Strand import net.corda.core.crypto.SecureHash import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate @@ -16,6 +17,8 @@ import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.UntrustworthyData import net.corda.core.utilities.debug import org.slf4j.Logger +import java.time.Duration +import java.time.Instant /** * A sub-class of [FlowLogic] implements a flow using direct, straight line blocking code. Thus you @@ -43,6 +46,34 @@ abstract class FlowLogic { /** This is where you should log things to. */ val logger: Logger get() = stateMachine.logger + companion object { + /** + * Return the outermost [FlowLogic] instance, or null if not in a flow. + */ + @JvmStatic + val currentTopLevel: FlowLogic<*>? get() = (Strand.currentStrand() as? FlowStateMachine<*>)?.logic + + /** + * If on a flow, suspends the flow and only wakes it up after at least [duration] time has passed. Otherwise, + * just sleep for [duration]. This sleep function is not designed to aid scheduling, for which you should + * consider using [SchedulableState]. It is designed to aid with managing contention for which you have not + * managed via another means. + * + * Warning: long sleeps and in general long running flows are highly discouraged, as there is currently no + * support for flow migration! This method will throw an exception if you attempt to sleep for longer than + * 5 minutes. + */ + @Suspendable + @JvmStatic + @Throws(FlowException::class) + fun sleep(duration: Duration) { + if (duration.compareTo(Duration.ofMinutes(5)) > 0) { + throw FlowException("Attempt to sleep for longer than 5 minutes is not supported. Consider using SchedulableState.") + } + (Strand.currentStrand() as? FlowStateMachine<*>)?.sleepUntil(Instant.now() + duration) ?: Strand.sleep(duration.toMillis()) + } + } + /** * Returns a wrapped [java.util.UUID] object that identifies this state machine run (i.e. subflows have the same * identifier as their parents). diff --git a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt index 261af49f02..a2b0e2fd15 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt @@ -10,6 +10,7 @@ import net.corda.core.node.ServiceHub import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.UntrustworthyData import org.slf4j.Logger +import java.time.Instant /** This is an internal interface that is implemented by code in the node module. You should look at [FlowLogic]. */ interface FlowStateMachine { @@ -35,6 +36,9 @@ interface FlowStateMachine { @Suspendable fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction + @Suspendable + fun sleepUntil(until: Instant) + fun checkFlowPermission(permissionName: String, extraAuditData: Map) fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map) @@ -45,6 +49,7 @@ interface FlowStateMachine { @Suspendable fun persistFlowStackSnapshot(flowClass: Class>) + val logic: FlowLogic val serviceHub: ServiceHub val logger: Logger val id: StateMachineRunId diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 8b17db03ac..0931e57561 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -36,6 +36,12 @@ UNRELEASED * Cordformation node building DSL can have an additional parameter `configFile` with the path to a properties file to be appended to node.conf. +* ``FlowLogic`` now has a static method called ``sleep`` which can be used in certain circumstances to help with resolving + contention over states in flows. This should be used in place of any other sleep primitive since these are not compatible + with flows and their use will be prevented at some point in the future. Pay attention to the warnings and limitations + described in the documentation for this method. This helps resolve a bug in ``Cash`` coin selection. + A new static property `currentTopLevel` returns the top most `FlowLogic` instance, or null if not in a flow. + * ``CordaService`` annotated classes should be upgraded to take a constructor parameter of type ``AppServiceHub`` which allows services to start flows marked with the ``StartableByService`` annotation. For backwards compatability service classes with only ``ServiceHub`` constructors will still work. diff --git a/finance/build.gradle b/finance/build.gradle index c642969785..662e7c9aa5 100644 --- a/finance/build.gradle +++ b/finance/build.gradle @@ -32,6 +32,9 @@ dependencies { testCompile project(':test-utils') testCompile project(path: ':core', configuration: 'testArtifacts') testCompile "junit:junit:$junit_version" + + // AssertJ: for fluent assertions for testing + testCompile "org.assertj:assertj-core:$assertj_version" } configurations { diff --git a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionH2Impl.kt b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionH2Impl.kt index a447827205..8be96121c8 100644 --- a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionH2Impl.kt +++ b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/CashSelectionH2Impl.kt @@ -1,12 +1,12 @@ package net.corda.finance.contracts.asset.cash.selection import co.paralleluniverse.fibers.Suspendable -import co.paralleluniverse.strands.Strand import net.corda.core.contracts.Amount import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateRef import net.corda.core.contracts.TransactionState import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FlowLogic import net.corda.core.identity.AbstractParty import net.corda.core.identity.Party import net.corda.core.node.ServiceHub @@ -34,8 +34,9 @@ class CashSelectionH2Impl : CashSelection { } // coin selection retry loop counter, sleep (msecs) and lock for selecting states - private val MAX_RETRIES = 5 + private val MAX_RETRIES = 8 private val RETRY_SLEEP = 100 + private val RETRY_CAP = 2000 private val spendLock: ReentrantLock = ReentrantLock() /** @@ -73,78 +74,84 @@ class CashSelectionH2Impl : CashSelection { // 3) H2 does not support JOIN's in FOR UPDATE (hence we are forced to execute 2 queries) for (retryCount in 1..MAX_RETRIES) { - - spendLock.withLock { - val statement = services.jdbcSession().createStatement() - try { - statement.execute("CALL SET(@t, CAST(0 AS BIGINT));") - - // we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null) - // the softLockReserve update will detect whether we try to lock states locked by others - val selectJoin = """ - SELECT vs.transaction_id, vs.output_index, vs.contract_state, ccs.pennies, SET(@t, ifnull(@t,0)+ccs.pennies) total_pennies, vs.lock_id - FROM vault_states AS vs, contract_cash_states AS ccs - WHERE vs.transaction_id = ccs.transaction_id AND vs.output_index = ccs.output_index - AND vs.state_status = 0 - AND ccs.ccy_code = '${amount.token}' and @t < ${amount.quantity} - AND (vs.lock_id = '$lockId' OR vs.lock_id is null) - """ + - (if (notary != null) - " AND vs.notary_name = '${notary.name}'" else "") + - (if (onlyFromIssuerParties.isNotEmpty()) - " AND ccs.issuer_key IN ($issuerKeysStr)" else "") + - (if (withIssuerRefs.isNotEmpty()) - " AND ccs.issuer_ref IN ($issuerRefsStr)" else "") - - // Retrieve spendable state refs - val rs = statement.executeQuery(selectJoin) + if (!attemptSpend(services, amount, lockId, notary, onlyFromIssuerParties, issuerKeysStr, withIssuerRefs, issuerRefsStr, stateAndRefs)) { + log.warn("Coin selection failed on attempt $retryCount") + // TODO: revisit the back off strategy for contended spending. + if (retryCount != MAX_RETRIES) { stateAndRefs.clear() - log.debug(selectJoin) - var totalPennies = 0L - while (rs.next()) { - val txHash = SecureHash.parse(rs.getString(1)) - val index = rs.getInt(2) - val stateRef = StateRef(txHash, index) - val state = rs.getBytes(3).deserialize>(context = SerializationDefaults.STORAGE_CONTEXT) - val pennies = rs.getLong(4) - totalPennies = rs.getLong(5) - val rowLockId = rs.getString(6) - stateAndRefs.add(StateAndRef(state, stateRef)) - log.trace { "ROW: $rowLockId ($lockId): $stateRef : $pennies ($totalPennies)" } - } - - if (stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity) { - // we should have a minimum number of states to satisfy our selection `amount` criteria - log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs") - - // With the current single threaded state machine available states are guaranteed to lock. - // TODO However, we will have to revisit these methods in the future multi-threaded. - services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet()) - return stateAndRefs - } - log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}") - // retry as more states may become available - } catch (e: SQLException) { - log.error("""Failed retrieving unconsumed states for: amount [$amount], onlyFromIssuerParties [$onlyFromIssuerParties], notary [$notary], lockId [$lockId] - $e. - """) - } catch (e: StatesNotAvailableException) { // Should never happen with single threaded state machine - stateAndRefs.clear() - log.warn(e.message) - // retry only if there are locked states that may become available again (or consumed with change) - } finally { - statement.close() + val durationMillis = (minOf(RETRY_SLEEP.shl(retryCount), RETRY_CAP / 2) * (1.0 + Math.random())).toInt() + FlowLogic.sleep(durationMillis.millis) + } else { + log.warn("Insufficient spendable states identified for $amount") } - } - - log.warn("Coin selection failed on attempt $retryCount") - // TODO: revisit the back off strategy for contended spending. - if (retryCount != MAX_RETRIES) { - Strand.sleep(RETRY_SLEEP * retryCount.toLong()) + } else { + break } } - - log.warn("Insufficient spendable states identified for $amount") return stateAndRefs } + + private fun attemptSpend(services: ServiceHub, amount: Amount, lockId: UUID, notary: Party?, onlyFromIssuerParties: Set, issuerKeysStr: String, withIssuerRefs: Set, issuerRefsStr: String, stateAndRefs: MutableList>): Boolean { + spendLock.withLock { + val statement = services.jdbcSession().createStatement() + try { + statement.execute("CALL SET(@t, CAST(0 AS BIGINT));") + + // we select spendable states irrespective of lock but prioritised by unlocked ones (Eg. null) + // the softLockReserve update will detect whether we try to lock states locked by others + val selectJoin = """ + SELECT vs.transaction_id, vs.output_index, vs.contract_state, ccs.pennies, SET(@t, ifnull(@t,0)+ccs.pennies) total_pennies, vs.lock_id + FROM vault_states AS vs, contract_cash_states AS ccs + WHERE vs.transaction_id = ccs.transaction_id AND vs.output_index = ccs.output_index + AND vs.state_status = 0 + AND ccs.ccy_code = '${amount.token}' and @t < ${amount.quantity} + AND (vs.lock_id = '$lockId' OR vs.lock_id is null) + """ + + (if (notary != null) + " AND vs.notary_name = '${notary.name}'" else "") + + (if (onlyFromIssuerParties.isNotEmpty()) + " AND ccs.issuer_key IN ($issuerKeysStr)" else "") + + (if (withIssuerRefs.isNotEmpty()) + " AND ccs.issuer_ref IN ($issuerRefsStr)" else "") + + // Retrieve spendable state refs + val rs = statement.executeQuery(selectJoin) + log.debug(selectJoin) + var totalPennies = 0L + while (rs.next()) { + val txHash = SecureHash.parse(rs.getString(1)) + val index = rs.getInt(2) + val stateRef = StateRef(txHash, index) + val state = rs.getBytes(3).deserialize>(context = SerializationDefaults.STORAGE_CONTEXT) + val pennies = rs.getLong(4) + totalPennies = rs.getLong(5) + val rowLockId = rs.getString(6) + stateAndRefs.add(StateAndRef(state, stateRef)) + log.trace { "ROW: $rowLockId ($lockId): $stateRef : $pennies ($totalPennies)" } + } + + if (stateAndRefs.isNotEmpty() && totalPennies >= amount.quantity) { + // we should have a minimum number of states to satisfy our selection `amount` criteria + log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs") + + // With the current single threaded state machine available states are guaranteed to lock. + // TODO However, we will have to revisit these methods in the future multi-threaded. + services.vaultService.softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet()) + return true + } + log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}") + // retry as more states may become available + } catch (e: SQLException) { + log.error("""Failed retrieving unconsumed states for: amount [$amount], onlyFromIssuerParties [$onlyFromIssuerParties], notary [$notary], lockId [$lockId] + $e. + """) + } catch (e: StatesNotAvailableException) { // Should never happen with single threaded state machine + log.warn(e.message) + // retry only if there are locked states that may become available again (or consumed with change) + } finally { + statement.close() + } + } + return false + } } \ No newline at end of file diff --git a/finance/src/test/kotlin/net/corda/finance/contracts/asset/CashSelectionH2Test.kt b/finance/src/test/kotlin/net/corda/finance/contracts/asset/CashSelectionH2Test.kt new file mode 100644 index 0000000000..ef85dead0a --- /dev/null +++ b/finance/src/test/kotlin/net/corda/finance/contracts/asset/CashSelectionH2Test.kt @@ -0,0 +1,40 @@ +package net.corda.finance.contracts.asset + +import net.corda.core.utilities.getOrThrow +import net.corda.finance.DOLLARS +import net.corda.finance.flows.CashException +import net.corda.finance.flows.CashPaymentFlow +import net.corda.testing.chooseIdentity +import net.corda.testing.node.MockNetwork +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.Test + + +class CashSelectionH2Test { + + @Test + fun `check does not hold connection over retries`() { + val mockNet = MockNetwork(threadPerNode = true) + try { + val notaryNode = mockNet.createNotaryNode() + val bankA = mockNet.createNode(configOverrides = { existingConfig -> + // Tweak connections to be minimal to make this easier (1 results in a hung node during start up, so use 2 connections). + existingConfig.dataSourceProperties.setProperty("maximumPoolSize", "2") + existingConfig + }) + + mockNet.startNodes() + + // Start more cash spends than we have connections. If spend leaks a connection on retry, we will run out of connections. + val flow1 = bankA.services.startFlow(CashPaymentFlow(amount = 100.DOLLARS, anonymous = false, recipient = notaryNode.info.chooseIdentity())) + val flow2 = bankA.services.startFlow(CashPaymentFlow(amount = 100.DOLLARS, anonymous = false, recipient = notaryNode.info.chooseIdentity())) + val flow3 = bankA.services.startFlow(CashPaymentFlow(amount = 100.DOLLARS, anonymous = false, recipient = notaryNode.info.chooseIdentity())) + + assertThatThrownBy { flow1.resultFuture.getOrThrow() }.isInstanceOf(CashException::class.java) + assertThatThrownBy { flow2.resultFuture.getOrThrow() }.isInstanceOf(CashException::class.java) + assertThatThrownBy { flow3.resultFuture.getOrThrow() }.isInstanceOf(CashException::class.java) + } finally { + mockNet.stopNodes() + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowIORequest.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowIORequest.kt index cd56d786ad..bd29525072 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowIORequest.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowIORequest.kt @@ -2,6 +2,7 @@ package net.corda.node.services.statemachine import co.paralleluniverse.fibers.Suspendable import net.corda.core.crypto.SecureHash +import java.time.Instant interface FlowIORequest { // This is used to identify where we suspended, in case of message mismatch errors and other things where we @@ -112,4 +113,9 @@ data class WaitForLedgerCommit(val hash: SecureHash, val fiber: FlowStateMachine override fun shouldResume(message: ExistingSessionMessage, session: FlowSessionInternal): Boolean = message is ErrorSessionEnd } +data class Sleep(val until: Instant, val fiber: FlowStateMachineImpl<*>) : FlowIORequest { + @Transient + override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot() +} + class StackSnapshot : Throwable("This is a stack trace to help identify the source of the underlying problem") diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index c61ef66ead..019940363b 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -12,13 +12,9 @@ import net.corda.core.crypto.random63BitValue import net.corda.core.flows.* import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate -import net.corda.core.internal.FlowStateMachine -import net.corda.core.internal.abbreviate +import net.corda.core.internal.* import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.openFuture -import net.corda.core.internal.isRegularFile -import net.corda.core.internal.staticField -import net.corda.core.internal.uncheckedCast import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.* import net.corda.node.services.api.FlowAppAuditEvent @@ -32,13 +28,15 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import java.nio.file.Paths import java.sql.SQLException +import java.time.Duration +import java.time.Instant import java.util.* import java.util.concurrent.TimeUnit class FlowPermissionException(message: String) : FlowException(message) class FlowStateMachineImpl(override val id: StateMachineRunId, - val logic: FlowLogic, + override val logic: FlowLogic, scheduler: FiberScheduler, override val flowInitiator: FlowInitiator, // Store the Party rather than the full cert path with PartyAndCertificate @@ -52,23 +50,6 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, * Return the current [FlowStateMachineImpl] or null if executing outside of one. */ fun currentStateMachine(): FlowStateMachineImpl<*>? = Strand.currentStrand() as? FlowStateMachineImpl<*> - - /** - * Provide a mechanism to sleep within a Strand without locking any transactional state - */ - // TODO: inlined due to an intermittent Quasar error (to be fully investigated) - @Suppress("NOTHING_TO_INLINE") - @Suspendable - inline fun sleep(millis: Long) { - if (currentStateMachine() != null) { - val db = DatabaseTransactionManager.dataSource - DatabaseTransactionManager.current().commit() - DatabaseTransactionManager.current().close() - Strand.sleep(millis) - DatabaseTransactionManager.dataSource = db - DatabaseTransactionManager.newTransaction() - } else Strand.sleep(millis) - } } // These fields shouldn't be serialised, so they are marked @Transient. @@ -259,6 +240,13 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, throw IllegalStateException("We were resumed after waiting for $hash but it wasn't found in our local storage") } + // Provide a mechanism to sleep within a Strand without locking any transactional state. + // This checkpoints, since we cannot undo any database writes up to this point. + @Suspendable + override fun sleepUntil(until: Instant) { + suspend(Sleep(until, this)) + } + // TODO Dummy implementation of access to application specific permission controls and audit logging override fun checkFlowPermission(permissionName: String, extraAuditData: Map) { val permissionGranted = true // TODO define permission control service on ServiceHubInternal and actually check authorization. @@ -494,6 +482,10 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } } + if (exceptionDuringSuspend == null && ioRequest is Sleep) { + // Sleep on the fiber. This will not sleep if it's in the past. + Strand.sleep(Duration.between(Instant.now(), ioRequest.until).toNanos(), TimeUnit.NANOSECONDS) + } createTransaction() // TODO Now that we're throwing outside of the suspend the FlowLogic can catch it. We need Quasar to terminate // the fiber when exceptions occur inside a suspend. diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 42fd677f65..2215354b07 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -584,6 +584,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, when (ioRequest) { is SendRequest -> processSendRequest(ioRequest) is WaitForLedgerCommit -> processWaitForCommitRequest(ioRequest) + is Sleep -> processSleepRequest(ioRequest) } } @@ -621,6 +622,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, } } + private fun processSleepRequest(ioRequest: Sleep) { + // Resume the fiber now we have checkpointed, so we can sleep on the Fiber. + resumeFiber(ioRequest.fiber) + } + private fun sendSessionMessage(party: Party, message: SessionMessage, fiber: FlowStateMachineImpl<*>? = null, retryId: Long? = null) { val partyInfo = serviceHub.networkMapCache.getPartyInfo(party) ?: throw IllegalArgumentException("Don't know about party $party") diff --git a/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt b/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt index f694eec66c..3d4005d6fc 100644 --- a/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt +++ b/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt @@ -52,7 +52,7 @@ class InteractiveShellTest { private fun check(input: String, expected: String) { var output: DummyFSM? = null InteractiveShell.runFlowFromString({ DummyFSM(it as FlowA).apply { output = this } }, input, FlowA::class.java, om) - assertEquals(expected, output!!.logic.a, input) + assertEquals(expected, output!!.flowA.a, input) } @Test @@ -83,5 +83,5 @@ class InteractiveShellTest { @Test fun party() = check("party: \"${MEGA_CORP.name}\"", MEGA_CORP.name.toString()) - class DummyFSM(val logic: FlowA) : FlowStateMachine by mock() + class DummyFSM(val flowA: FlowA) : FlowStateMachine by mock() }