diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowOperator.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowOperator.kt index 69adea4581..45fbff65b7 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowOperator.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowOperator.kt @@ -1,6 +1,7 @@ package net.corda.node.services.statemachine import net.corda.core.flows.StateMachineRunId +import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.FlowIORequest import net.corda.core.internal.FlowStateMachine @@ -10,18 +11,18 @@ import java.time.Duration import java.time.Instant /** - * Stage in which the flow is suspended + * Defines criteria to get waiting flows */ -enum class WaitingSource { - SEND, - RECEIVE, - SEND_AND_RECEIVE, - CLOSE_SESSIONS, - WAIT_FOR_LEDGER_COMMIT, - GET_FLOW_INFO, - SLEEP, - WAIT_FOR_SESSIONS_CONFIRMATIONS, - EXTERNAL_OPERATION +data class WaitingFlowQuery( + val flowIds: MutableList = mutableListOf(), + val onlyIfSuspendedLongerThan: Duration = Duration.ZERO, + val waitingSources: MutableList = mutableListOf(), + val counterParties: MutableList = mutableListOf() +) { + fun isDefined() = flowIds.isNotEmpty() + || waitingSources.isNotEmpty() + || counterParties.isNotEmpty() + || onlyIfSuspendedLongerThan > Duration.ZERO } /** @@ -40,18 +41,18 @@ data class WaitingFlowInfo( ) /** - * Defines criteria to get waiting flows + * Stage in which the flow is suspended */ -data class WaitingFlowQuery( - val ids: MutableList = mutableListOf(), - val onlyIfSuspendedLongerThan: Duration = Duration.ZERO, - val waitingSource: MutableList = mutableListOf(), - val counterParties: MutableList = mutableListOf() -) { - fun isDefined() = ids.isNotEmpty() - || waitingSource.isNotEmpty() - || counterParties.isNotEmpty() - || onlyIfSuspendedLongerThan > Duration.ZERO +enum class WaitingSource { + SEND, + RECEIVE, + SEND_AND_RECEIVE, + CLOSE_SESSIONS, + WAIT_FOR_LEDGER_COMMIT, + GET_FLOW_INFO, + SLEEP, + WAIT_FOR_SESSIONS_CONFIRMATIONS, + EXTERNAL_OPERATION } /** @@ -89,14 +90,14 @@ class FlowOperator(private val smm: StateMachineManager, private val clock: Cloc override fun queryWaitingFlows(query: WaitingFlowQuery): Set { var sequence = getAllWaitingFlows() - if (query.ids.isNotEmpty()) { - sequence = sequence.filter { it.id in query.ids } + if (query.flowIds.isNotEmpty()) { + sequence = sequence.filter { it.id in query.flowIds } } if (query.counterParties.isNotEmpty()) { sequence = sequence.filter { it.isWaitingForParties(query.counterParties) } } - if (query.waitingSource.isNotEmpty()) { - sequence = sequence.filter { it.waitingSource() in query.waitingSource } + if (query.waitingSources.isNotEmpty()) { + sequence = sequence.filter { it.waitingSource() in query.waitingSources } } if (query.onlyIfSuspendedLongerThan > Duration.ZERO) { val now = clock.instant() @@ -125,13 +126,13 @@ class FlowOperator(private val smm: StateMachineManager, private val clock: Cloc } } -private fun FlowStateMachineImpl<*>.isWaitingForParties(parties: List): Boolean { +private fun FlowStateMachineImpl<*>.isWaitingForParties(parties: List): Boolean { return ioRequest()?.let { request -> when (request) { - is FlowIORequest.GetFlowInfo -> request.sessions.any { it.counterparty in parties } - is FlowIORequest.Receive -> request.sessions.any { it.counterparty in parties } - is FlowIORequest.Send -> request.sessionToMessage.keys.any { it.counterparty in parties } - is FlowIORequest.SendAndReceive -> request.sessionToMessage.keys.any { it.counterparty in parties } + is FlowIORequest.GetFlowInfo -> request.sessions.any { it.counterparty.name in parties } + is FlowIORequest.Receive -> request.sessions.any { it.counterparty.name in parties } + is FlowIORequest.Send -> request.sessionToMessage.keys.any { it.counterparty.name in parties } + is FlowIORequest.SendAndReceive -> request.sessionToMessage.keys.any { it.counterparty.name in parties } else -> false } } ?: false diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowOperatorTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowOperatorTests.kt index 304181c0ca..7bea95fe4d 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowOperatorTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowOperatorTests.kt @@ -104,7 +104,7 @@ class FlowOperatorTests { executeTest(5.seconds) { val result = cut.queryWaitingFlows( - WaitingFlowQuery(counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty) + WaitingFlowQuery(counterParties = mutableListOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME, DAVE_NAME, EUGENE_NAME) )) assertEquals(2, result.size) @@ -132,7 +132,7 @@ class FlowOperatorTests { executeTest(5.seconds) { val result = cut.queryWaitingFlows( - WaitingFlowQuery(counterParties = mutableListOf(daveParty) + WaitingFlowQuery(counterParties = mutableListOf(DAVE_NAME) )) assertEquals(1, result.size) @@ -152,7 +152,7 @@ class FlowOperatorTests { executeTest(5.seconds) { val result = cut.queryWaitingFlows( - WaitingFlowQuery(counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty) + WaitingFlowQuery(counterParties = mutableListOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME, DAVE_NAME, EUGENE_NAME) )) assertEquals(1, result.size) @@ -176,7 +176,7 @@ class FlowOperatorTests { executeTest(5.seconds) { val result = cut.queryWaitingFlows( - WaitingFlowQuery(counterParties = mutableListOf(bobParty, daveParty) + WaitingFlowQuery(counterParties = mutableListOf(BOB_NAME, DAVE_NAME) )) assertEquals(1, result.size) @@ -199,7 +199,7 @@ class FlowOperatorTests { executeTest(5.seconds) { val result = cut.queryWaitingFlows( WaitingFlowQuery( - counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty), + counterParties = mutableListOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME, DAVE_NAME, EUGENE_NAME), onlyIfSuspendedLongerThan = 4.seconds )) assertEquals(1, result.size) @@ -227,8 +227,8 @@ class FlowOperatorTests { executeTest(5.seconds) { val result = cut.queryWaitingFlows( WaitingFlowQuery( - counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty), - waitingSource = mutableListOf(WaitingSource.EXTERNAL_OPERATION, WaitingSource.RECEIVE, WaitingSource.GET_FLOW_INFO) + counterParties = mutableListOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME, DAVE_NAME, EUGENE_NAME), + waitingSources = mutableListOf(WaitingSource.EXTERNAL_OPERATION, WaitingSource.RECEIVE, WaitingSource.GET_FLOW_INFO) )) assertEquals(2, result.size) @@ -258,7 +258,7 @@ class FlowOperatorTests { executeTest(5.seconds) { val result = cut.queryFlowsCurrentlyWaitingForPartiesGrouped(WaitingFlowQuery( - waitingSource = mutableListOf(WaitingSource.EXTERNAL_OPERATION, WaitingSource.RECEIVE) + waitingSources = mutableListOf(WaitingSource.EXTERNAL_OPERATION, WaitingSource.RECEIVE) )) assertEquals(2, result.size) @@ -303,7 +303,7 @@ class FlowOperatorTests { executeTest(5.seconds) { val result = cut.queryWaitingFlows( - WaitingFlowQuery(ids = mutableListOf(charlieStart.id, eugeneStart.id) + WaitingFlowQuery(flowIds = mutableListOf(charlieStart.id, eugeneStart.id) )) assertEquals(1, result.size) @@ -323,7 +323,7 @@ class FlowOperatorTests { executeTest(5.seconds) { val result = cut.queryWaitingFlows( - WaitingFlowQuery(counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty) + WaitingFlowQuery(counterParties = mutableListOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME, DAVE_NAME, EUGENE_NAME) )) assertEquals(1, result.size) @@ -343,7 +343,7 @@ class FlowOperatorTests { executeTest(5.seconds) { val result = cut.queryWaitingFlows( - WaitingFlowQuery(counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty) + WaitingFlowQuery(counterParties = mutableListOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME, DAVE_NAME, EUGENE_NAME) )) assertEquals(1, result.size) @@ -366,7 +366,7 @@ class FlowOperatorTests { executeTest(5.seconds, { future.complete(Unit) }) { val result = cut.queryWaitingFlows( - WaitingFlowQuery(counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty) + WaitingFlowQuery(counterParties = mutableListOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME, DAVE_NAME, EUGENE_NAME) )) assertEquals(1, result.size) @@ -387,7 +387,7 @@ class FlowOperatorTests { executeTest(5.seconds, { future.complete("Hello") }) { val result = cut.queryWaitingFlows(WaitingFlowQuery( - waitingSource = mutableListOf(WaitingSource.EXTERNAL_OPERATION) + waitingSources = mutableListOf(WaitingSource.EXTERNAL_OPERATION) )) // the list of counter parties must be empty to get any external operation assertEquals(1, result.size) @@ -444,7 +444,7 @@ class FlowOperatorTests { executeTest(5.seconds, { future.complete(Unit) }) { val result = cut.queryWaitingFlows( - WaitingFlowQuery(counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty) + WaitingFlowQuery(counterParties = mutableListOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME, DAVE_NAME, EUGENE_NAME) )) assertEquals(1, result.size)