Changed the counter parties in the query to be CordaX509Name (#6622)

This commit is contained in:
Alexey Kadyrov 2020-08-12 14:53:03 +01:00 committed by GitHub
parent 57de0c4eec
commit 518026c6c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 46 additions and 45 deletions

View File

@ -1,6 +1,7 @@
package net.corda.node.services.statemachine
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachine
@ -10,18 +11,18 @@ import java.time.Duration
import java.time.Instant
/**
* Stage in which the flow is suspended
* Defines criteria to get waiting flows
*/
enum class WaitingSource {
SEND,
RECEIVE,
SEND_AND_RECEIVE,
CLOSE_SESSIONS,
WAIT_FOR_LEDGER_COMMIT,
GET_FLOW_INFO,
SLEEP,
WAIT_FOR_SESSIONS_CONFIRMATIONS,
EXTERNAL_OPERATION
data class WaitingFlowQuery(
val flowIds: MutableList<StateMachineRunId> = mutableListOf(),
val onlyIfSuspendedLongerThan: Duration = Duration.ZERO,
val waitingSources: MutableList<WaitingSource> = mutableListOf(),
val counterParties: MutableList<CordaX500Name> = mutableListOf()
) {
fun isDefined() = flowIds.isNotEmpty()
|| waitingSources.isNotEmpty()
|| counterParties.isNotEmpty()
|| onlyIfSuspendedLongerThan > Duration.ZERO
}
/**
@ -40,18 +41,18 @@ data class WaitingFlowInfo(
)
/**
* Defines criteria to get waiting flows
* Stage in which the flow is suspended
*/
data class WaitingFlowQuery(
val ids: MutableList<StateMachineRunId> = mutableListOf(),
val onlyIfSuspendedLongerThan: Duration = Duration.ZERO,
val waitingSource: MutableList<WaitingSource> = mutableListOf(),
val counterParties: MutableList<Party> = mutableListOf()
) {
fun isDefined() = ids.isNotEmpty()
|| waitingSource.isNotEmpty()
|| counterParties.isNotEmpty()
|| onlyIfSuspendedLongerThan > Duration.ZERO
enum class WaitingSource {
SEND,
RECEIVE,
SEND_AND_RECEIVE,
CLOSE_SESSIONS,
WAIT_FOR_LEDGER_COMMIT,
GET_FLOW_INFO,
SLEEP,
WAIT_FOR_SESSIONS_CONFIRMATIONS,
EXTERNAL_OPERATION
}
/**
@ -89,14 +90,14 @@ class FlowOperator(private val smm: StateMachineManager, private val clock: Cloc
override fun queryWaitingFlows(query: WaitingFlowQuery): Set<WaitingFlowInfo> {
var sequence = getAllWaitingFlows()
if (query.ids.isNotEmpty()) {
sequence = sequence.filter { it.id in query.ids }
if (query.flowIds.isNotEmpty()) {
sequence = sequence.filter { it.id in query.flowIds }
}
if (query.counterParties.isNotEmpty()) {
sequence = sequence.filter { it.isWaitingForParties(query.counterParties) }
}
if (query.waitingSource.isNotEmpty()) {
sequence = sequence.filter { it.waitingSource() in query.waitingSource }
if (query.waitingSources.isNotEmpty()) {
sequence = sequence.filter { it.waitingSource() in query.waitingSources }
}
if (query.onlyIfSuspendedLongerThan > Duration.ZERO) {
val now = clock.instant()
@ -125,13 +126,13 @@ class FlowOperator(private val smm: StateMachineManager, private val clock: Cloc
}
}
private fun FlowStateMachineImpl<*>.isWaitingForParties(parties: List<Party>): Boolean {
private fun FlowStateMachineImpl<*>.isWaitingForParties(parties: List<CordaX500Name>): Boolean {
return ioRequest()?.let { request ->
when (request) {
is FlowIORequest.GetFlowInfo -> request.sessions.any { it.counterparty in parties }
is FlowIORequest.Receive -> request.sessions.any { it.counterparty in parties }
is FlowIORequest.Send -> request.sessionToMessage.keys.any { it.counterparty in parties }
is FlowIORequest.SendAndReceive -> request.sessionToMessage.keys.any { it.counterparty in parties }
is FlowIORequest.GetFlowInfo -> request.sessions.any { it.counterparty.name in parties }
is FlowIORequest.Receive -> request.sessions.any { it.counterparty.name in parties }
is FlowIORequest.Send -> request.sessionToMessage.keys.any { it.counterparty.name in parties }
is FlowIORequest.SendAndReceive -> request.sessionToMessage.keys.any { it.counterparty.name in parties }
else -> false
}
} ?: false

View File

@ -104,7 +104,7 @@ class FlowOperatorTests {
executeTest(5.seconds) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty)
WaitingFlowQuery(counterParties = mutableListOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME, DAVE_NAME, EUGENE_NAME)
))
assertEquals(2, result.size)
@ -132,7 +132,7 @@ class FlowOperatorTests {
executeTest(5.seconds) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(counterParties = mutableListOf(daveParty)
WaitingFlowQuery(counterParties = mutableListOf(DAVE_NAME)
))
assertEquals(1, result.size)
@ -152,7 +152,7 @@ class FlowOperatorTests {
executeTest(5.seconds) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty)
WaitingFlowQuery(counterParties = mutableListOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME, DAVE_NAME, EUGENE_NAME)
))
assertEquals(1, result.size)
@ -176,7 +176,7 @@ class FlowOperatorTests {
executeTest(5.seconds) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(counterParties = mutableListOf(bobParty, daveParty)
WaitingFlowQuery(counterParties = mutableListOf(BOB_NAME, DAVE_NAME)
))
assertEquals(1, result.size)
@ -199,7 +199,7 @@ class FlowOperatorTests {
executeTest(5.seconds) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(
counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty),
counterParties = mutableListOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME, DAVE_NAME, EUGENE_NAME),
onlyIfSuspendedLongerThan = 4.seconds
))
assertEquals(1, result.size)
@ -227,8 +227,8 @@ class FlowOperatorTests {
executeTest(5.seconds) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(
counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty),
waitingSource = mutableListOf(WaitingSource.EXTERNAL_OPERATION, WaitingSource.RECEIVE, WaitingSource.GET_FLOW_INFO)
counterParties = mutableListOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME, DAVE_NAME, EUGENE_NAME),
waitingSources = mutableListOf(WaitingSource.EXTERNAL_OPERATION, WaitingSource.RECEIVE, WaitingSource.GET_FLOW_INFO)
))
assertEquals(2, result.size)
@ -258,7 +258,7 @@ class FlowOperatorTests {
executeTest(5.seconds) {
val result = cut.queryFlowsCurrentlyWaitingForPartiesGrouped(WaitingFlowQuery(
waitingSource = mutableListOf(WaitingSource.EXTERNAL_OPERATION, WaitingSource.RECEIVE)
waitingSources = mutableListOf(WaitingSource.EXTERNAL_OPERATION, WaitingSource.RECEIVE)
))
assertEquals(2, result.size)
@ -303,7 +303,7 @@ class FlowOperatorTests {
executeTest(5.seconds) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(ids = mutableListOf(charlieStart.id, eugeneStart.id)
WaitingFlowQuery(flowIds = mutableListOf(charlieStart.id, eugeneStart.id)
))
assertEquals(1, result.size)
@ -323,7 +323,7 @@ class FlowOperatorTests {
executeTest(5.seconds) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty)
WaitingFlowQuery(counterParties = mutableListOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME, DAVE_NAME, EUGENE_NAME)
))
assertEquals(1, result.size)
@ -343,7 +343,7 @@ class FlowOperatorTests {
executeTest(5.seconds) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty)
WaitingFlowQuery(counterParties = mutableListOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME, DAVE_NAME, EUGENE_NAME)
))
assertEquals(1, result.size)
@ -366,7 +366,7 @@ class FlowOperatorTests {
executeTest(5.seconds, { future.complete(Unit) }) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty)
WaitingFlowQuery(counterParties = mutableListOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME, DAVE_NAME, EUGENE_NAME)
))
assertEquals(1, result.size)
@ -387,7 +387,7 @@ class FlowOperatorTests {
executeTest(5.seconds, { future.complete("Hello") }) {
val result = cut.queryWaitingFlows(WaitingFlowQuery(
waitingSource = mutableListOf(WaitingSource.EXTERNAL_OPERATION)
waitingSources = mutableListOf(WaitingSource.EXTERNAL_OPERATION)
)) // the list of counter parties must be empty to get any external operation
assertEquals(1, result.size)
@ -444,7 +444,7 @@ class FlowOperatorTests {
executeTest(5.seconds, { future.complete(Unit) }) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty)
WaitingFlowQuery(counterParties = mutableListOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME, DAVE_NAME, EUGENE_NAME)
))
assertEquals(1, result.size)