CORDA-3657/5459 inspect waiting flows (#6540)

* CORDA-3657 Extract information from state machine

`FlowReadOperations` interface provides functions that extract
information about flows from the state machine manager.

`FlowOperator` implements this interface (along with another currenly
empty interface).

* CORDA-3657 Rename function and use set

* initial test is passing

* wip

* done tests

* additional tests to cover more FlowIORequest variations

* completed tests

* The quasar.jar should nat have been changed

* Fixed issues reported by detekt

* got rid of sync objects, instead relying on nodes being offline

* Added extra grouping test and minor simplification

* Hospital test must use online node which fails on otherside

* Added additional information required for the ENT

* Added tests to cover SEND FlowIORequests

* using node name constants from the core testing module

* Changed flow operator to the query pattern

* made query fields mutable to simply building query

* fixed detekt issue

* Fixed test which had dependency on the order int the result (failed for windows)

* Fixed recommendations in PR

* Moved WrappedFlowExternalOperation and  WrappedFlowExternalAsyncOperation to FlowExternalOperation.kt as per PR comment

* Moved extension to FlowAsyncOperation

* removed unnecessarily brackets

Co-authored-by: LankyDan <danknewton@hotmail.com>
This commit is contained in:
Alexey Kadyrov 2020-08-12 10:14:05 +01:00 committed by GitHub
parent 5778edae8f
commit 0b6b69bbda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 953 additions and 55 deletions

View File

@ -1,6 +1,12 @@
package net.corda.core.flows
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.FlowAsyncOperation
import net.corda.core.internal.ServiceHubCoreInternal
import net.corda.core.internal.concurrent.asCordaFuture
import net.corda.core.node.ServiceHub
import java.util.concurrent.CompletableFuture
import java.util.function.Supplier
/**
* [FlowExternalAsyncOperation] represents an external future that blocks a flow from continuing until the future returned by
@ -62,4 +68,36 @@ interface FlowExternalOperation<R : Any> {
* de-duplicated if necessary inside the execute method.
*/
fun execute(deduplicationId: String): R
}
}
/**
* [WrappedFlowExternalAsyncOperation] is added to allow jackson to properly reference the data stored within the wrapped
* [FlowExternalAsyncOperation].
*/
internal class WrappedFlowExternalAsyncOperation<R : Any>(val operation: FlowExternalAsyncOperation<R>) : FlowAsyncOperation<R> {
override fun execute(deduplicationId: String): CordaFuture<R> {
return operation.execute(deduplicationId).asCordaFuture()
}
}
/**
* [WrappedFlowExternalOperation] is added to allow jackson to properly reference the data stored within the wrapped
* [FlowExternalOperation].
*
* The reference to [ServiceHub] is also needed by Kryo to properly keep a reference to [ServiceHub] so that
* [FlowExternalOperation] can be run from the [ServiceHubCoreInternal.externalOperationExecutor] without causing errors when retrying a
* flow. A [NullPointerException] is thrown if [FlowLogic.serviceHub] is accessed from [FlowLogic.await] when retrying a flow.
*/
internal class WrappedFlowExternalOperation<R : Any>(
val serviceHub: ServiceHubCoreInternal,
val operation: FlowExternalOperation<R>
) : FlowAsyncOperation<R> {
override fun execute(deduplicationId: String): CordaFuture<R> {
// Using a [CompletableFuture] allows unhandled exceptions to be thrown inside the background operation
// the exceptions will be set on the future by [CompletableFuture.AsyncSupply.run]
return CompletableFuture.supplyAsync(
Supplier { this.operation.execute(deduplicationId) },
serviceHub.externalOperationExecutor
).asCordaFuture()
}
}

View File

@ -4,21 +4,18 @@ import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import net.corda.core.CordaInternal
import net.corda.core.DeleteForDJVM
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.SecureHash
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.FlowAsyncOperation
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.ServiceHubCoreInternal
import net.corda.core.internal.WaitForStateConsumption
import net.corda.core.internal.abbreviate
import net.corda.core.internal.checkPayloadIs
import net.corda.core.internal.concurrent.asCordaFuture
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.DataFeed
import net.corda.core.node.NodeInfo
@ -34,8 +31,6 @@ import org.slf4j.Logger
import java.time.Duration
import java.util.HashMap
import java.util.LinkedHashMap
import java.util.concurrent.CompletableFuture
import java.util.function.Supplier
/**
* A sub-class of [FlowLogic<T>] implements a flow using direct, straight line blocking code. Thus you
@ -655,38 +650,6 @@ abstract class FlowLogic<out T> {
}
}
/**
* [WrappedFlowExternalAsyncOperation] is added to allow jackson to properly reference the data stored within the wrapped
* [FlowExternalAsyncOperation].
*/
private class WrappedFlowExternalAsyncOperation<R : Any>(val operation: FlowExternalAsyncOperation<R>) : FlowAsyncOperation<R> {
override fun execute(deduplicationId: String): CordaFuture<R> {
return operation.execute(deduplicationId).asCordaFuture()
}
}
/**
* [WrappedFlowExternalOperation] is added to allow jackson to properly reference the data stored within the wrapped
* [FlowExternalOperation].
*
* The reference to [ServiceHub] is also needed by Kryo to properly keep a reference to [ServiceHub] so that
* [FlowExternalOperation] can be run from the [ServiceHubCoreInternal.externalOperationExecutor] without causing errors when retrying a
* flow. A [NullPointerException] is thrown if [FlowLogic.serviceHub] is accessed from [FlowLogic.await] when retrying a flow.
*/
private class WrappedFlowExternalOperation<R : Any>(
val serviceHub: ServiceHubCoreInternal,
val operation: FlowExternalOperation<R>
) : FlowAsyncOperation<R> {
override fun execute(deduplicationId: String): CordaFuture<R> {
// Using a [CompletableFuture] allows unhandled exceptions to be thrown inside the background operation
// the exceptions will be set on the future by [CompletableFuture.AsyncSupply.run]
return CompletableFuture.supplyAsync(
Supplier { this.operation.execute(deduplicationId) },
serviceHub.externalOperationExecutor
).asCordaFuture()
}
}
/**
* Version and name of the CorDapp hosting the other side of the flow.
*/

View File

@ -3,6 +3,8 @@ package net.corda.core.internal
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.WrappedFlowExternalAsyncOperation
import net.corda.core.flows.WrappedFlowExternalOperation
import net.corda.core.serialization.CordaSerializable
/**
@ -31,3 +33,14 @@ fun <T, R : Any> FlowLogic<T>.executeAsync(operation: FlowAsyncOperation<R>, may
val request = FlowIORequest.ExecuteAsyncOperation(operation)
return stateMachine.suspend(request, maySkipCheckpoint)
}
/**
* Returns a name of the external operation implementation considering that it can wrapped
* by WrappedFlowExternalAsyncOperation<T> or WrappedFlowExternalOperation<T>
*/
val FlowAsyncOperation<*>.externalOperationImplName: String
get() = when (this) {
is WrappedFlowExternalAsyncOperation<*> -> operation.javaClass.canonicalName
is WrappedFlowExternalOperation<*> -> operation.javaClass.canonicalName
else -> javaClass.canonicalName
}

View File

@ -128,6 +128,7 @@ import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.ExternalEvent
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.services.statemachine.FlowMonitor
import net.corda.node.services.statemachine.FlowOperator
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.SingleThreadedStateMachineManager
import net.corda.node.services.statemachine.StateMachineManager
@ -336,6 +337,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
@Suppress("LeakingThis")
val smm = makeStateMachineManager()
val flowStarter = FlowStarterImpl(smm, flowLogicRefFactory, DBCheckpointStorage.MAX_CLIENT_ID_LENGTH)
val flowOperator = FlowOperator(smm, platformClock)
private val schedulerService = makeNodeSchedulerService()
private val cordappServices = MutableClassToInstanceMap.create<SerializeAsToken>()
@ -591,7 +593,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
// Shut down the SMM so no Fibers are scheduled.
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
val flowMonitor = FlowMonitor(
smm,
flowOperator,
configuration.flowMonitorPeriodMillis,
configuration.flowMonitorSuspensionLoggingThresholdMillis
)

View File

@ -15,7 +15,7 @@ import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
internal class FlowMonitor(
private val smm: StateMachineManager,
private val flowOperator: FlowOperator,
private val monitoringPeriod: Duration,
private val suspensionLoggingThreshold: Duration,
private var scheduler: ScheduledExecutorService? = null
@ -62,9 +62,7 @@ internal class FlowMonitor(
@VisibleForTesting
fun waitingFlowDurations(suspensionLoggingThreshold: Duration): Sequence<Pair<FlowStateMachineImpl<*>, Duration>> {
val now = Instant.now()
return smm.snapshot()
.asSequence()
.filter { flow -> flow !in smm.flowHospital && flow.isStarted() && flow.isSuspended() }
return flowOperator.getAllWaitingFlows()
.map { flow -> flow to flow.ongoingDuration(now) }
.filter { (_, suspensionDuration) -> suspensionDuration >= suspensionLoggingThreshold }
}
@ -93,15 +91,5 @@ internal class FlowMonitor(
private fun Iterable<FlowSession>.partiesInvolved() = map { it.counterparty }.joinToString(", ", "[", "]")
private fun FlowStateMachineImpl<*>.ioRequest() = (snapshot().checkpoint.flowState as? FlowState.Started)?.flowIORequest
private fun FlowStateMachineImpl<*>.ongoingDuration(now: Instant): Duration {
return transientState.checkpoint.timestamp.let { Duration.between(it, now) } ?: Duration.ZERO
}
private fun FlowStateMachineImpl<*>.isSuspended() = !snapshot().isFlowResumed
private fun FlowStateMachineImpl<*>.isStarted() = transientState.checkpoint.flowState is FlowState.Started
private operator fun StaffedFlowHospital.contains(flow: FlowStateMachine<*>) = contains(flow.id)
}
}

View File

@ -0,0 +1,216 @@
package net.corda.node.services.statemachine
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.externalOperationImplName
import java.time.Clock
import java.time.Duration
import java.time.Instant
/**
* Stage in which the flow is suspended
*/
enum class WaitingSource {
SEND,
RECEIVE,
SEND_AND_RECEIVE,
CLOSE_SESSIONS,
WAIT_FOR_LEDGER_COMMIT,
GET_FLOW_INFO,
SLEEP,
WAIT_FOR_SESSIONS_CONFIRMATIONS,
EXTERNAL_OPERATION
}
/**
* Information about a flow which is waiting to be resumed
* The flow is considered to be waiting if:
* - It's started.
* - Is not admitted to the hospital
* - And it's in the suspended state for an IO request.
*/
data class WaitingFlowInfo(
val id: StateMachineRunId,
val suspendedTimestamp: Instant,
val source: WaitingSource,
val waitingForParties: List<Party>,
val externalOperationImplName: String? = null
)
/**
* Defines criteria to get waiting flows
*/
data class WaitingFlowQuery(
val ids: MutableList<StateMachineRunId> = mutableListOf(),
val onlyIfSuspendedLongerThan: Duration = Duration.ZERO,
val waitingSource: MutableList<WaitingSource> = mutableListOf(),
val counterParties: MutableList<Party> = mutableListOf()
) {
fun isDefined() = ids.isNotEmpty()
|| waitingSource.isNotEmpty()
|| counterParties.isNotEmpty()
|| onlyIfSuspendedLongerThan > Duration.ZERO
}
/**
* Read operations that extract information about the flows running in the node.
*/
interface FlowReadOperations {
/**
* Returns waiting flows for a specified query.
*/
fun queryWaitingFlows(query: WaitingFlowQuery): Set<WaitingFlowInfo>
/**
* Returns waiting flows for a specified query grouped by the party.
*/
fun queryFlowsCurrentlyWaitingForPartiesGrouped(query: WaitingFlowQuery): Map<Party, List<WaitingFlowInfo>>
/**
* Returns all waiting flow state machines.
*/
fun getAllWaitingFlows(): Sequence<FlowStateMachineImpl<*>>
}
/**
* Write operations that interact with the flows running in the node.
*/
interface FlowWriteOperations
/**
* Implements flow operators
* @see FlowReadOperations
* @see FlowWriteOperations
*/
class FlowOperator(private val smm: StateMachineManager, private val clock: Clock) : FlowReadOperations, FlowWriteOperations {
override fun queryWaitingFlows(query: WaitingFlowQuery): Set<WaitingFlowInfo> {
var sequence = getAllWaitingFlows()
if (query.ids.isNotEmpty()) {
sequence = sequence.filter { it.id in query.ids }
}
if (query.counterParties.isNotEmpty()) {
sequence = sequence.filter { it.isWaitingForParties(query.counterParties) }
}
if (query.waitingSource.isNotEmpty()) {
sequence = sequence.filter { it.waitingSource() in query.waitingSource }
}
if (query.onlyIfSuspendedLongerThan > Duration.ZERO) {
val now = clock.instant()
sequence = sequence.filter { flow -> flow.ongoingDuration(now) >= query.onlyIfSuspendedLongerThan }
}
val result = LinkedHashSet<WaitingFlowInfo>()
sequence.forEach { flow ->
val waitingParties = flow.waitingFlowInfo()
if (waitingParties != null) {
result.add(waitingParties)
}
}
return result
}
override fun queryFlowsCurrentlyWaitingForPartiesGrouped(query: WaitingFlowQuery): Map<Party, List<WaitingFlowInfo>> {
return queryWaitingFlows(query)
.flatMap { info -> info.waitingForParties.map { it to info } }
.groupBy({ it.first }) { it.second }
}
override fun getAllWaitingFlows(): Sequence<FlowStateMachineImpl<*>> {
return smm.snapshot()
.asSequence()
.filter { flow -> flow !in smm.flowHospital && flow.isStarted() && flow.isSuspended() }
}
}
private fun FlowStateMachineImpl<*>.isWaitingForParties(parties: List<Party>): Boolean {
return ioRequest()?.let { request ->
when (request) {
is FlowIORequest.GetFlowInfo -> request.sessions.any { it.counterparty in parties }
is FlowIORequest.Receive -> request.sessions.any { it.counterparty in parties }
is FlowIORequest.Send -> request.sessionToMessage.keys.any { it.counterparty in parties }
is FlowIORequest.SendAndReceive -> request.sessionToMessage.keys.any { it.counterparty in parties }
else -> false
}
} ?: false
}
@Suppress("ComplexMethod")
private fun FlowStateMachineImpl<*>.waitingSource(): WaitingSource? {
return ioRequest()?.let { request ->
when (request) {
is FlowIORequest.Send -> WaitingSource.SEND
is FlowIORequest.Receive -> WaitingSource.RECEIVE
is FlowIORequest.SendAndReceive -> WaitingSource.SEND_AND_RECEIVE
is FlowIORequest.CloseSessions -> WaitingSource.CLOSE_SESSIONS
is FlowIORequest.WaitForLedgerCommit -> WaitingSource.WAIT_FOR_LEDGER_COMMIT
is FlowIORequest.GetFlowInfo -> WaitingSource.GET_FLOW_INFO
is FlowIORequest.Sleep -> WaitingSource.SLEEP
is FlowIORequest.WaitForSessionConfirmations -> WaitingSource.WAIT_FOR_SESSIONS_CONFIRMATIONS
is FlowIORequest.ExecuteAsyncOperation -> WaitingSource.EXTERNAL_OPERATION
else -> null
}
}
}
@Suppress("ComplexMethod")
private fun FlowStateMachineImpl<*>.waitingFlowInfo(): WaitingFlowInfo? {
return ioRequest()?.let { request ->
when (request) {
is FlowIORequest.Send -> flowInfoOf(
WaitingSource.SEND,
request.sessionToMessage.map { it.key.counterparty }
)
is FlowIORequest.Receive -> flowInfoOf(
WaitingSource.RECEIVE,
request.sessions.map { it.counterparty }
)
is FlowIORequest.SendAndReceive -> flowInfoOf(
WaitingSource.SEND_AND_RECEIVE,
request.sessionToMessage.map { it.key.counterparty }
)
is FlowIORequest.CloseSessions -> flowInfoOf(
WaitingSource.CLOSE_SESSIONS,
request.sessions.map { it.counterparty }
)
is FlowIORequest.WaitForLedgerCommit -> flowInfoOf(
WaitingSource.WAIT_FOR_LEDGER_COMMIT,
listOf()
)
is FlowIORequest.GetFlowInfo -> flowInfoOf(
WaitingSource.GET_FLOW_INFO,
request.sessions.map { it.counterparty }
)
is FlowIORequest.Sleep -> flowInfoOf(
WaitingSource.SLEEP,
listOf()
)
is FlowIORequest.WaitForSessionConfirmations -> flowInfoOf(
WaitingSource.WAIT_FOR_SESSIONS_CONFIRMATIONS,
listOf()
)
is FlowIORequest.ExecuteAsyncOperation -> flowInfoOf(
WaitingSource.EXTERNAL_OPERATION,
listOf(),
request.operation.externalOperationImplName
)
else -> null
}
}
}
private operator fun StaffedFlowHospital.contains(flow: FlowStateMachine<*>) = contains(flow.id)
private fun FlowStateMachineImpl<*>.flowInfoOf(
source: WaitingSource,
waitingForParties: List<Party>,
externalOperationName: String? = null
): WaitingFlowInfo =
WaitingFlowInfo(
id,
suspendedTimestamp(),
source,
waitingForParties,
externalOperationName)

View File

@ -0,0 +1,16 @@
package net.corda.node.services.statemachine
import java.time.Duration
import java.time.Instant
fun FlowStateMachineImpl<*>.ioRequest() = (snapshot().checkpoint.flowState as? FlowState.Started)?.flowIORequest
fun FlowStateMachineImpl<*>.ongoingDuration(now: Instant): Duration {
return suspendedTimestamp().let { Duration.between(it, now) } ?: Duration.ZERO
}
fun FlowStateMachineImpl<*>.suspendedTimestamp() = transientState.checkpoint.timestamp
fun FlowStateMachineImpl<*>.isSuspended() = !snapshot().isFlowResumed
fun FlowStateMachineImpl<*>.isStarted() = transientState.checkpoint.flowState is FlowState.Started

View File

@ -260,7 +260,11 @@ class FlowFrameworkTests {
}
private fun monitorFlows(script: (FlowMonitor, FlowMonitor) -> Unit) {
script(FlowMonitor(aliceNode.smm, Duration.ZERO, Duration.ZERO), FlowMonitor(bobNode.smm, Duration.ZERO, Duration.ZERO))
val clock = Clock.systemUTC()
script(
FlowMonitor(FlowOperator(aliceNode.smm, clock), Duration.ZERO, Duration.ZERO),
FlowMonitor(FlowOperator(bobNode.smm, clock), Duration.ZERO, Duration.ZERO)
)
}
@Test(timeout = 300_000)

View File

@ -0,0 +1,595 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowExternalAsyncOperation
import net.corda.core.flows.FlowExternalOperation
import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.messaging.MessageRecipients
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.seconds
import net.corda.node.services.messaging.Message
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.DAVE_NAME
import net.corda.testing.core.executeTest
import net.corda.testing.flows.registerCordappFlowFactory
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.MessagingServiceSpy
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.enclosedCordapp
import net.corda.testing.node.internal.startFlow
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.concurrent.CompletableFuture
import kotlin.test.assertEquals
import kotlin.test.assertNull
import kotlin.test.assertTrue
class FlowOperatorTests {
companion object {
val log = contextLogger()
val EUGENE_NAME = CordaX500Name("Eugene", "EugeneCorp", "GB")
}
lateinit var mockNet: InternalMockNetwork
lateinit var aliceNode: TestStartedNode
private lateinit var aliceParty: Party
lateinit var bobNode: TestStartedNode
private lateinit var bobParty: Party
lateinit var charlieNode: TestStartedNode
private lateinit var charlieParty: Party
lateinit var daveNode: TestStartedNode
lateinit var daveParty: Party
private lateinit var eugeneNode: TestStartedNode
private lateinit var eugeneParty: Party
@Before
fun setup() {
mockNet = InternalMockNetwork(
threadPerNode = true,
cordappsForAllNodes = listOf(enclosedCordapp())
)
aliceNode = mockNet.createNode(InternalMockNodeParameters(
legalName = ALICE_NAME
))
bobNode = mockNet.createNode(InternalMockNodeParameters(
legalName = BOB_NAME
))
charlieNode = mockNet.createNode(InternalMockNodeParameters(
legalName = CHARLIE_NAME
))
daveNode = mockNet.createNode(InternalMockNodeParameters(
legalName = DAVE_NAME
))
eugeneNode = mockNet.createNode(InternalMockNodeParameters(
legalName = EUGENE_NAME
))
mockNet.startNodes()
aliceParty = aliceNode.info.legalIdentities.first()
bobParty = bobNode.info.legalIdentities.first()
charlieParty = charlieNode.info.legalIdentities.first()
daveParty = daveNode.info.legalIdentities.first()
eugeneParty = eugeneNode.info.legalIdentities.first()
// put nodes offline, alice and charlie are staying online
bobNode.dispose()
daveNode.dispose()
eugeneNode.dispose()
}
@After
fun cleanUp() {
mockNet.stopNodes()
}
@Test(timeout = 300_000)
fun `query should return all flows which are waiting for counter party to process`() {
charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { AcceptingFlow("Hello", it) }
val bobStart = aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(bobParty)))
val daveStart = aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(daveParty)))
charlieNode.services.startFlow(ReceiveFlow("Hello", listOf(charlieParty)))
val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock)
executeTest(5.seconds) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty)
))
assertEquals(2, result.size)
val bob = result.first { it.waitingForParties.first().name == BOB_NAME }
assertNull(bob.externalOperationImplName)
assertEquals(WaitingSource.RECEIVE, bob.source)
assertEquals(1, bob.waitingForParties.size)
assertEquals(bobStart.id, bob.id)
val dave = result.first { it.waitingForParties.first().name == DAVE_NAME }
assertNull(dave.externalOperationImplName)
assertEquals(WaitingSource.RECEIVE, dave.source)
assertEquals(daveStart.id, dave.id)
assertEquals(1, dave.waitingForParties.size)
}
}
@Test(timeout = 300_000)
fun `query should return only requested party flows which are waiting for counter party to process`() {
aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(bobParty)))
val daveStart = aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(daveParty)))
val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock)
executeTest(5.seconds) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(counterParties = mutableListOf(daveParty)
))
assertEquals(1, result.size)
assertEquals(daveStart.id, result.first().id)
assertNull(result.first().externalOperationImplName)
assertEquals(WaitingSource.RECEIVE, result.first().source)
assertEquals(1, result.first().waitingForParties.size)
assertEquals(DAVE_NAME, result.first().waitingForParties.first().name)
}
}
@Test(timeout = 300_000)
fun `query should return all parties in a single flow which are waiting for counter party to process`() {
val start = aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(bobParty, daveParty)))
val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock)
executeTest(5.seconds) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty)
))
assertEquals(1, result.size)
assertEquals(start.id, result.first().id)
assertNull(result.first().externalOperationImplName)
assertEquals(WaitingSource.RECEIVE, result.first().source)
assertEquals(2, result.first().waitingForParties.size)
assertTrue(result.first().waitingForParties.any { it.name == BOB_NAME })
assertTrue(result.first().waitingForParties.any { it.name == DAVE_NAME })
}
}
@Test(timeout = 300_000)
fun `query should return only flows which are waiting for counter party to process and not in the hospital`() {
charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { AcceptingFlow("Fail", it) }
aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(charlieParty)))
val daveStart = aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(daveParty)))
val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock)
executeTest(5.seconds) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(counterParties = mutableListOf(bobParty, daveParty)
))
assertEquals(1, result.size)
assertEquals(daveStart.id, result.first().id)
assertNull(result.first().externalOperationImplName)
assertEquals(WaitingSource.RECEIVE, result.first().source)
assertEquals(1, result.first().waitingForParties.size)
assertEquals(DAVE_NAME, result.first().waitingForParties.first().name)
}
}
@Test(timeout = 300_000)
fun `query should return only flows which are waiting more than 4 seconds for counter party to process`() {
val bobStart = aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(bobParty)))
Thread.sleep(4500)
aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(daveParty)))
val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock)
executeTest(5.seconds) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(
counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty),
onlyIfSuspendedLongerThan = 4.seconds
))
assertEquals(1, result.size)
assertEquals(1, result.size)
assertEquals(bobStart.id, result.first().id)
assertNull(result.first().externalOperationImplName)
assertEquals(WaitingSource.RECEIVE, result.first().source)
assertEquals(1, result.first().waitingForParties.size)
assertEquals(BOB_NAME, result.first().waitingForParties.first().name)
}
}
@Test(timeout = 300_000)
fun `mixed query should return all flows which are waiting for counter party to process`() {
charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { AcceptingFlow("Hello", it) }
val future = CompletableFuture<String>()
aliceNode.services.startFlow(ExternalAsyncOperationFlow(future))
val bobStart = aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(bobParty)))
val daveStart = aliceNode.services.startFlow(GetFlowInfoFlow(listOf(daveParty)))
charlieNode.services.startFlow(ReceiveFlow("Hello", listOf(charlieParty)))
val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock)
executeTest(5.seconds) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(
counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty),
waitingSource = mutableListOf(WaitingSource.EXTERNAL_OPERATION, WaitingSource.RECEIVE, WaitingSource.GET_FLOW_INFO)
))
assertEquals(2, result.size)
val receive = result.first { it.source == WaitingSource.RECEIVE }
assertNull(receive.externalOperationImplName)
assertEquals(1, receive.waitingForParties.size)
assertEquals(bobStart.id, receive.id)
assertEquals(BOB_NAME, receive.waitingForParties.first().name)
val getFlowInfo = result.first { it.source == WaitingSource.GET_FLOW_INFO }
assertNull(getFlowInfo.externalOperationImplName)
assertEquals(1, getFlowInfo.waitingForParties.size)
assertEquals(daveStart.id, getFlowInfo.id)
assertEquals(DAVE_NAME, getFlowInfo.waitingForParties.first().name)
}
}
@Test(timeout = 300_000)
fun `query should return all flows which are waiting for counter party (the flow must have counter party) to process grouped by party`() {
val future = CompletableFuture<String>()
aliceNode.services.startFlow(ExternalAsyncOperationFlow(future))
val bobStart = aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(bobParty)))
val daveStart = aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(daveParty)))
val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock)
executeTest(5.seconds) {
val result = cut.queryFlowsCurrentlyWaitingForPartiesGrouped(WaitingFlowQuery(
waitingSource = mutableListOf(WaitingSource.EXTERNAL_OPERATION, WaitingSource.RECEIVE)
))
assertEquals(2, result.size)
assertEquals(1, result.getValue(bobParty).size)
assertNull(result.getValue(bobParty).first().externalOperationImplName)
assertEquals(bobStart.id, result.getValue(bobParty).first().id)
assertEquals(WaitingSource.RECEIVE, result.getValue(bobParty).first().source)
assertEquals(1, result.getValue(bobParty).first().waitingForParties.size)
assertEquals(BOB_NAME, result.getValue(bobParty).first().waitingForParties.first().name)
assertEquals(1, result.getValue(daveParty).size)
assertEquals(daveStart.id, result.getValue(daveParty).first().id)
assertNull(result.getValue(daveParty).first().externalOperationImplName)
assertEquals(WaitingSource.RECEIVE, result.getValue(daveParty).first().source)
assertEquals(1, result.getValue(daveParty).first().waitingForParties.size)
assertEquals(DAVE_NAME, result.getValue(daveParty).first().waitingForParties.first().name)
}
}
@Test(timeout = 300_000)
fun `get should return all flow state machines which are waiting for other party to process`() {
aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(bobParty)))
aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(daveParty)))
val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock)
executeTest(5.seconds) {
val result = cut.getAllWaitingFlows().toList()
assertEquals(2, result.size)
}
}
@Test(timeout = 300_000)
fun `query should return only requested by id flows which are waiting for counter party to process`() {
charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { AcceptingFlow("Fail", it) }
val charlieStart = aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(charlieParty)))
aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(daveParty)))
val eugeneStart = aliceNode.services.startFlow(ReceiveFlow("Hello", listOf(eugeneParty)))
val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock)
executeTest(5.seconds) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(ids = mutableListOf(charlieStart.id, eugeneStart.id)
))
assertEquals(1, result.size)
assertEquals(eugeneStart.id, result.first().id)
assertNull(result.first().externalOperationImplName)
assertEquals(WaitingSource.RECEIVE, result.first().source)
assertEquals(1, result.first().waitingForParties.size)
assertEquals(EUGENE_NAME, result.first().waitingForParties.first().name)
}
}
@Test(timeout = 300_000)
fun `query should return all flows which are waiting for getting info about counter party`() {
val start = aliceNode.services.startFlow(GetFlowInfoFlow(listOf(eugeneParty)))
val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock)
executeTest(5.seconds) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty)
))
assertEquals(1, result.size)
assertEquals(start.id, result.first().id)
assertNull(result.first().externalOperationImplName)
assertEquals(WaitingSource.GET_FLOW_INFO, result.first().source)
assertEquals(1, result.first().waitingForParties.size)
assertEquals(EUGENE_NAME, result.first().waitingForParties.first().name)
}
}
@Test(timeout = 300_000)
fun `query should return all flows which are waiting for sending and receiving from counter party when stuck in remote party`() {
val start = aliceNode.services.startFlow(SendAndReceiveFlow("Hello", listOf(eugeneParty)))
val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock)
executeTest(5.seconds) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty)
))
assertEquals(1, result.size)
assertEquals(start.id, result.first().id)
assertNull(result.first().externalOperationImplName)
assertEquals(WaitingSource.RECEIVE, result.first().source) // yep, it's receive
assertEquals(1, result.first().waitingForParties.size)
assertEquals(EUGENE_NAME, result.first().waitingForParties.first().name)
}
}
@Test(timeout = 300_000)
fun `query should return all flows which are waiting for sending and receiving from counter party when stuck in sending`() {
val future = CompletableFuture<Unit>()
aliceNode.setMessagingServiceSpy(BlockingMessageSpy("PauseSend", future))
val start = aliceNode.services.startFlow(SendAndReceiveFlow("PauseSend", listOf(eugeneParty)))
val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock)
executeTest(5.seconds, { future.complete(Unit) }) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty)
))
assertEquals(1, result.size)
assertEquals(start.id, result.first().id)
assertNull(result.first().externalOperationImplName)
assertEquals(WaitingSource.SEND_AND_RECEIVE, result.first().source)
assertEquals(1, result.first().waitingForParties.size)
assertEquals(EUGENE_NAME, result.first().waitingForParties.first().name)
}
}
@Test(timeout = 300_000)
fun `query should return all flows which are waiting for async external operations`() {
val future = CompletableFuture<String>()
val start = aliceNode.services.startFlow(ExternalAsyncOperationFlow(future))
val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock)
executeTest(5.seconds, { future.complete("Hello") }) {
val result = cut.queryWaitingFlows(WaitingFlowQuery(
waitingSource = mutableListOf(WaitingSource.EXTERNAL_OPERATION)
)) // the list of counter parties must be empty to get any external operation
assertEquals(1, result.size)
assertEquals(start.id, result.first().id)
assertEquals(ExternalAsyncOperationFlow.ExternalOperation::class.java.canonicalName, result.first().externalOperationImplName)
assertEquals(WaitingSource.EXTERNAL_OPERATION, result.first().source)
assertEquals(0, result.first().waitingForParties.size)
}
}
@Test(timeout = 300_000)
fun `query should return all flows which are waiting for external operations`() {
val future = CompletableFuture<String>()
val start = aliceNode.services.startFlow(ExternalOperationFlow(future))
val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock)
executeTest(5.seconds, { future.complete("Hello") }) {
val result = cut.queryWaitingFlows(WaitingFlowQuery())
assertEquals(1, result.size)
assertEquals(start.id, result.first().id)
assertEquals(ExternalOperationFlow.ExternalOperation::class.java.canonicalName, result.first().externalOperationImplName)
assertEquals(WaitingSource.EXTERNAL_OPERATION, result.first().source)
assertEquals(0, result.first().waitingForParties.size)
}
}
@Test(timeout = 300_000)
fun `query should return all flows which are sleeping`() {
val start = aliceNode.services.startFlow(SleepFlow())
val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock)
executeTest(5.seconds) {
val result = cut.queryWaitingFlows(WaitingFlowQuery())
assertEquals(1, result.size)
assertEquals(start.id, result.first().id)
assertNull(result.first().externalOperationImplName)
assertEquals(WaitingSource.SLEEP, result.first().source)
assertEquals(0, result.first().waitingForParties.size)
}
}
@Test(timeout = 300_000)
fun `query should return all flows which are waiting for sending from counter party`() {
val future = CompletableFuture<Unit>()
aliceNode.setMessagingServiceSpy(BlockingMessageSpy("PauseSend", future))
val start = aliceNode.services.startFlow(SendFlow("PauseSend", listOf(eugeneParty)))
val cut = FlowOperator(aliceNode.smm, aliceNode.services.clock)
executeTest(5.seconds, { future.complete(Unit) }) {
val result = cut.queryWaitingFlows(
WaitingFlowQuery(counterParties = mutableListOf(aliceParty, bobParty, charlieParty, daveParty, eugeneParty)
))
assertEquals(1, result.size)
assertEquals(start.id, result.first().id)
assertNull(result.first().externalOperationImplName)
assertEquals(WaitingSource.SEND, result.first().source)
assertEquals(1, result.first().waitingForParties.size)
assertEquals(EUGENE_NAME, result.first().waitingForParties.first().name)
}
}
@InitiatingFlow
class ReceiveFlow(private val payload: String, private val otherParties: List<Party>) : FlowLogic<Unit>() {
init {
require(otherParties.isNotEmpty())
}
@Suspendable
override fun call() {
if (payload == "Fail") {
error(payload)
}
val sessions = mutableMapOf<FlowSession, Class<out Any>>()
otherParties.forEach {
sessions[initiateFlow(it)] = String::class.java
}
receiveAllMap(sessions)
}
}
@InitiatingFlow
class SendAndReceiveFlow(private val payload: String, private val otherParties: List<Party>) : FlowLogic<Unit>() {
init {
require(otherParties.isNotEmpty())
}
@Suspendable
override fun call() {
if (payload == "Fail") {
error(payload)
}
otherParties.forEach {
val session = initiateFlow(it)
session.sendAndReceive<String>(payload)
}
}
}
@InitiatingFlow
class GetFlowInfoFlow(private val otherParties: List<Party>) : FlowLogic<FlowInfo>() {
init {
require(otherParties.isNotEmpty())
}
@Suspendable
override fun call(): FlowInfo {
val flowInfo = otherParties.map {
val session = initiateFlow(it)
session.getCounterpartyFlowInfo()
}.toList()
return flowInfo.first()
}
}
@InitiatingFlow
class ExternalAsyncOperationFlow(private val future: CompletableFuture<String>) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
await(ExternalOperation(future))
}
class ExternalOperation(private val future: CompletableFuture<String>) : FlowExternalAsyncOperation<String> {
override fun execute(deduplicationId: String): CompletableFuture<String> {
return future
}
}
}
@InitiatingFlow
class ExternalOperationFlow(private val future: CompletableFuture<String>) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
await(ExternalOperation(future))
}
class ExternalOperation(private val future: CompletableFuture<String>) : FlowExternalOperation<String> {
override fun execute(deduplicationId: String): String {
return future.get()
}
}
}
@InitiatingFlow
class SleepFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
sleep(15.seconds)
}
}
@InitiatingFlow
class SendFlow(private val payload: String, private val otherParties: List<Party>) : FlowLogic<Unit>() {
init {
require(otherParties.isNotEmpty())
}
@Suspendable
override fun call() {
if (payload == "Fail") {
error(payload)
}
otherParties.forEach {
initiateFlow(it).send(payload)
}
}
}
class AcceptingFlow(private val payload: Any, private val otherPartySession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
if (payload == "Fail") {
error(payload)
}
otherPartySession.send(payload)
}
}
class BlockingMessageSpy(
private val expectedPayload: String,
private val future: CompletableFuture<Unit>
) : MessagingServiceSpy() {
@Suppress("TooGenericExceptionCaught")
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
try {
val sessionMessage = message.data.bytes.deserialize<InitialSessionMessage>()
if (sessionMessage.firstPayload?.deserialize<String>() == expectedPayload) {
future.get()
}
} catch (e: Throwable) {
log.error("Expected '${InitialSessionMessage::class.qualifiedName}'", e)
}
messagingService.send(message, target)
}
}
}

View File

@ -32,6 +32,8 @@ val BOB_NAME = CordaX500Name("Bob Plc", "Rome", "IT")
/** A test node name **/
@JvmField
val CHARLIE_NAME = CordaX500Name("Charlie Ltd", "Athens", "GR")
@JvmField
val DAVE_NAME = CordaX500Name("Dave Unlimited", "Warsaw", "PL")
/** Generates a dummy command that doesn't do anything useful for use in tests **/
fun dummyCommand(vararg signers: PublicKey = arrayOf(generateKeyPair().public)) = Command<TypeOnlyCommandData>(DummyCommandData, signers.toList())

View File

@ -12,6 +12,7 @@ import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.unspecifiedCountry
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.millis
import net.corda.nodeapi.internal.createDevNodeCa
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
import net.corda.nodeapi.internal.crypto.CertificateType
@ -22,7 +23,10 @@ import java.math.BigInteger
import java.security.KeyPair
import java.security.PublicKey
import java.security.cert.X509Certificate
import java.time.Duration
import java.time.Instant
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.fail
/**
* JAVA INTEROP
@ -162,3 +166,60 @@ fun NodeInfo.singleIdentityAndCert(): PartyAndCertificate = legalIdentitiesAndCe
*/
fun NodeInfo.singleIdentity(): Party = singleIdentityAndCert().party
/**
* Executes a test action, if test fails then it retries with a small delay until test succeeds or the timeout expires.
* Useful in cases when a the action side effect is not immediately observable and may take a ONLY few seconds.
* Which will allow the make the tests more deterministic instead of relaying on thread sleeping before asserting the side effects.
*
* Don't use with the large timeouts.
*
* Example usage:
*
* executeTest(5.seconds) {
* val result = cut.getWaitingFlows(WaitingFlowQuery(counterParties = listOf(bobParty, daveParty)))
* assertEquals(1, result.size)
* assertEquals(daveStart.id, result.first().id)
* assertNull(result.first().externalOperationImplName)
* assertEquals(WaitingSource.RECEIVE, result.first().source)
* assertEquals(1, result.first().waitingForParties.size)
* assertEquals(DAVE_NAME, result.first().waitingForParties.first().party.name)
* }
*
* The above will test our expectation that the getWaitingFlows action was executed successfully considering
* that it may take a few hundreds of milliseconds for the flow state machine states to settle.
*/
@Suppress("TooGenericExceptionCaught", "MagicNumber", "ComplexMethod")
fun <T> executeTest(
timeout: Duration,
cleanup: (() -> Unit)? = null,
retryDelay: Duration = 50.millis,
block: () -> T
): T {
val end = Instant.now().plus(timeout)
var lastException: Throwable?
do {
try {
val result = block()
try {
cleanup?.invoke()
} catch (e: Throwable) {
// Intentional
}
return result
} catch (e: Throwable) {
lastException = e
}
Thread.sleep(retryDelay.toMillis())
val now = Instant.now()
} while (now < end)
try {
cleanup?.invoke()
} catch (e: Throwable) {
// Intentional
}
if(lastException == null) {
fail("Failed to execute the operation n time")
} else {
throw lastException
}
}