mirror of
https://github.com/corda/corda.git
synced 2025-02-21 01:42:24 +00:00
CORDA-3197 Fix flow has been waiting message (#5660)
* Added a timestamp property to Checkpoint getting a new Instant.now() value at every Checkpoint instantiation/ copy instantiation. FlowMonitor is now using this new property (Checkpoint#timestamp) and StateMachineState#isFlowResumed to determine which flows are actually suspended. It leaves out flows that are doing work in their FlowLogic#call method. * Cleaner comment * Broke FlowMonitor#logFlowsWaitingForParty into logFlowsWaitingForParty and waitingFlowsToDurations. This way waitingFlowsToDurations is modular and can be tested. Made FlowMonitor constructor get StateMachineManager instead of the retrieveFlows lamda. This way FlowMonitor is more consistent as a service, and entire flow filtering process is now being done in FlowMonitor#waitingFlowsToDurations. Removed "smm as? StateMachineManagerInternal" in AbstractNode#start as it made no sense. Updated CheckpointDumper to mention the Checkpoint#timestamp when writing the checkpoint as json. * Added tests for FlowMonitor service. * Remove old comment * 1. FLowMonitor#waitingFlowDurations now returns a Sequence to have an iteration less. It used to be, one iteration from returning a Set from FLowMonitor#waitingFlowDurations plus one iteration from FlowMonitor#logFlowsWaitingForParty. 2. Code reformattings * 1. Remove constructor keyword from FlowMonitor 2. Code reformattings 3. Update detekt baseline * Resolve conflict in Detekt baseline
This commit is contained in:
parent
0f92c96d15
commit
21a075b727
@ -6,11 +6,6 @@
|
||||
<ID>ClassNaming:AbstractCashFlow.kt$AbstractCashFlow.Companion$GENERATING_ID : Step</ID>
|
||||
<ID>ClassNaming:AbstractCashFlow.kt$AbstractCashFlow.Companion$GENERATING_TX : Step</ID>
|
||||
<ID>ClassNaming:AbstractCashFlow.kt$AbstractCashFlow.Companion$SIGNING_TX : Step</ID>
|
||||
<ID>ClassNaming:BlobWriter.kt$_Li_</ID>
|
||||
<ID>ClassNaming:BlobWriter.kt$_Mis_</ID>
|
||||
<ID>ClassNaming:BlobWriter.kt$_i_</ID>
|
||||
<ID>ClassNaming:BlobWriter.kt$_i_is__</ID>
|
||||
<ID>ClassNaming:BlobWriter.kt$_is_</ID>
|
||||
<ID>ClassNaming:BuyerFlow.kt$BuyerFlow$STARTING_BUY : Step</ID>
|
||||
<ID>ClassNaming:CompositeMemberCompositeSchemaToClassCarpenterTests.kt$I_</ID>
|
||||
<ID>ClassNaming:CordaServiceTest.kt$CordaServiceTest.DummyServiceFlow.Companion$TEST_STEP : Step</ID>
|
||||
@ -145,7 +140,7 @@
|
||||
<ID>ComplexMethod:DriverDSLImpl.kt$DriverDSLImpl$override fun start()</ID>
|
||||
<ID>ComplexMethod:Expect.kt$ fun <S, E : Any> S.genericExpectEvents( isStrict: Boolean = true, stream: S.((E) -> Unit) -> Unit, expectCompose: () -> ExpectCompose<E> )</ID>
|
||||
<ID>ComplexMethod:FinalityFlow.kt$FinalityFlow$@Suspendable @Throws(NotaryException::class) override fun call(): SignedTransaction</ID>
|
||||
<ID>ComplexMethod:FlowMonitor.kt$FlowMonitor$private fun warningMessageForFlowWaitingOnIo(request: FlowIORequest<*>, flow: FlowStateMachineImpl<*>, now: Instant): String</ID>
|
||||
<ID>ComplexMethod:FlowMonitor.kt$FlowMonitor$private fun warningMessageForFlowWaitingOnIo(request: FlowIORequest<*>, flow: FlowStateMachineImpl<*>, suspensionDuration: Duration): String</ID>
|
||||
<ID>ComplexMethod:FlowStateMachineImpl.kt$FlowStateMachineImpl$ @Suspendable private fun processEventsUntilFlowIsResumed(isDbTransactionOpenOnEntry: Boolean, isDbTransactionOpenOnExit: Boolean): Any?</ID>
|
||||
<ID>ComplexMethod:GenerateRpcSslCertsCli.kt$GenerateRpcSslCerts$private fun generateRpcSslCertificates(conf: NodeConfiguration)</ID>
|
||||
<ID>ComplexMethod:GenericsTests.kt$GenericsTests$@Test fun nestedSerializationInMultipleContextsDoesntColideGenericTypes()</ID>
|
||||
@ -716,7 +711,6 @@
|
||||
<ID>LongParameterList:ParametersUtilities.kt$( notaries: List<NotaryInfo> = emptyList(), minimumPlatformVersion: Int = 1, modifiedTime: Instant = Instant.now(), maxMessageSize: Int = 10485760, // TODO: Make this configurable and consistence across driver, bootstrapper, demobench and NetworkMapServer maxTransactionSize: Int = maxMessageSize * 50, whitelistedContractImplementations: Map<String, List<AttachmentId>> = emptyMap(), epoch: Int = 1, eventHorizon: Duration = 30.days, packageOwnership: Map<String, PublicKey> = emptyMap() )</ID>
|
||||
<ID>LongParameterList:PersistentUniquenessProvider.kt$PersistentUniquenessProvider$( states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef> )</ID>
|
||||
<ID>LongParameterList:PhysicalLocationStructures.kt$WorldCoordinate$(screenWidth: Double, screenHeight: Double, topLatitude: Double, bottomLatitude: Double, leftLongitude: Double, rightLongitude: Double)</ID>
|
||||
<ID>LongParameterList:ProcessUtilities.kt$ProcessUtilities$( arguments: List<String>, classPath: List<String> = defaultClassPath, workingDirectory: Path? = null, jdwpPort: Int? = null, extraJvmArguments: List<String> = emptyList(), maximumHeapSize: String? = null )</ID>
|
||||
<ID>LongParameterList:QueryCriteria.kt$QueryCriteria.FungibleAssetQueryCriteria$( participants: List<AbstractParty>? = this.participants, owner: List<AbstractParty>? = this.owner, quantity: ColumnPredicate<Long>? = this.quantity, issuer: List<AbstractParty>? = this.issuer, issuerRef: List<OpaqueBytes>? = this.issuerRef, status: Vault.StateStatus = this.status, contractStateTypes: Set<Class<out ContractState>>? = this.contractStateTypes )</ID>
|
||||
<ID>LongParameterList:QueryCriteria.kt$QueryCriteria.FungibleAssetQueryCriteria$( participants: List<AbstractParty>? = this.participants, owner: List<AbstractParty>? = this.owner, quantity: ColumnPredicate<Long>? = this.quantity, issuer: List<AbstractParty>? = this.issuer, issuerRef: List<OpaqueBytes>? = this.issuerRef, status: Vault.StateStatus = this.status, contractStateTypes: Set<Class<out ContractState>>? = this.contractStateTypes, relevancyStatus: Vault.RelevancyStatus = this.relevancyStatus )</ID>
|
||||
<ID>LongParameterList:QueryCriteria.kt$QueryCriteria.LinearStateQueryCriteria$( participants: List<AbstractParty>? = this.participants, uuid: List<UUID>? = this.uuid, externalId: List<String>? = this.externalId, status: Vault.StateStatus = this.status, contractStateTypes: Set<Class<out ContractState>>? = this.contractStateTypes, relevancyStatus: Vault.RelevancyStatus = this.relevancyStatus )</ID>
|
||||
@ -2114,11 +2108,8 @@
|
||||
<ID>MaxLineLength:FlowMessaging.kt$FlowMessagingImpl$val mightDeadlockDrainingTarget = FlowStateMachineImpl.currentStateMachine()?.context?.origin.let { it is InvocationOrigin.Peer && it.party == target.name }</ID>
|
||||
<ID>MaxLineLength:FlowMessaging.kt$FlowMessagingImpl$val networkMessage = serviceHub.networkService.createMessage(sessionTopic, serializeSessionMessage(message).bytes, deduplicationId, message.additionalHeaders(party))</ID>
|
||||
<ID>MaxLineLength:FlowMessaging.kt$FlowMessagingImpl${ // Handling Kryo and AMQP serialization problems. Unfortunately the two exception types do not share much of a common exception interface. if ((exception is KryoException || exception is NotSerializableException) && message is ExistingSessionMessage && message.payload is ErrorSessionMessage) { val error = message.payload.flowException val rewrappedError = FlowException(error?.message) message.copy(payload = message.payload.copy(flowException = rewrappedError)).serialize() } else { throw exception } }</ID>
|
||||
<ID>MaxLineLength:FlowMonitor.kt$FlowMonitor$is FlowIORequest.ExecuteAsyncOperation -> "for asynchronous operation of type ${request.operation::javaClass} to complete"</ID>
|
||||
<ID>MaxLineLength:FlowMonitor.kt$FlowMonitor$is FlowIORequest.SendAndReceive -> "to send and receive messages from parties ${request.sessionToMessage.keys.partiesInvolved()}"</ID>
|
||||
<ID>MaxLineLength:FlowMonitor.kt$FlowMonitor$is FlowIORequest.Sleep -> "to wake up from sleep ending at ${LocalDateTime.ofInstant(request.wakeUpAfter, ZoneId.systemDefault())}"</ID>
|
||||
<ID>MaxLineLength:FlowMonitor.kt$FlowMonitor$scheduler!!.scheduleAtFixedRate({ logFlowsWaitingForParty(suspensionLoggingThreshold) }, 0, monitoringPeriod.toMillis(), TimeUnit.MILLISECONDS)</ID>
|
||||
<ID>MaxLineLength:FlowMonitor.kt$FlowMonitor$val message = StringBuilder("Flow with id ${flow.id.uuid} has been waiting for ${flow.ongoingDuration(now).toMillis() / 1000} seconds ")</ID>
|
||||
<ID>MaxLineLength:FlowRetryTest.kt$FlowRetryTest$it.proxy.startFlow(::InitiatorFlow, numSessions, numIterations, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow()</ID>
|
||||
<ID>MaxLineLength:FlowSessionImpl.kt$FlowSessionImpl$@Suspendable override</ID>
|
||||
<ID>MaxLineLength:FlowStackSnapshot.kt$FlowStackSnapshotFactoryImpl$private</ID>
|
||||
|
@ -500,22 +500,19 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
smm.start(frozenTokenizableServices)
|
||||
// Shut down the SMM so no Fibers are scheduled.
|
||||
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
|
||||
(smm as? StateMachineManagerInternal)?.let {
|
||||
val flowMonitor = FlowMonitor({
|
||||
smm.snapshot().filter { flow -> flow !in smm.flowHospital }.toSet()
|
||||
}, configuration.flowMonitorPeriodMillis, configuration.flowMonitorSuspensionLoggingThresholdMillis)
|
||||
runOnStop += flowMonitor::stop
|
||||
flowMonitor.start()
|
||||
}
|
||||
|
||||
val flowMonitor = FlowMonitor(
|
||||
smm,
|
||||
configuration.flowMonitorPeriodMillis,
|
||||
configuration.flowMonitorSuspensionLoggingThresholdMillis
|
||||
)
|
||||
runOnStop += flowMonitor::stop
|
||||
flowMonitor.start()
|
||||
schedulerService.start()
|
||||
|
||||
createStartedNode(nodeInfo, rpcOps, notaryService).also { _started = it }
|
||||
}
|
||||
}
|
||||
|
||||
private operator fun StaffedFlowHospital.contains(flow: FlowStateMachine<*>) = contains(flow.id)
|
||||
|
||||
/** Subclasses must override this to create a "started" node of the desired type, using the provided machinery. */
|
||||
abstract fun createStartedNode(nodeInfo: NodeInfo, rpcOps: CordaRPCOps, notaryService: NotaryService?): S
|
||||
|
||||
|
@ -184,7 +184,7 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private
|
||||
flowCallStackSummary = flowCallStack.toSummary(),
|
||||
flowCallStack = flowCallStack,
|
||||
suspendedOn = (flowState as? FlowState.Started)?.flowIORequest?.toSuspendedOn(
|
||||
suspendedTimestamp(),
|
||||
timestamp,
|
||||
now
|
||||
),
|
||||
origin = invocationContext.origin.toOrigin(),
|
||||
@ -194,8 +194,6 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private
|
||||
)
|
||||
}
|
||||
|
||||
private fun Checkpoint.suspendedTimestamp(): Instant = invocationContext.trace.invocationId.timestamp
|
||||
|
||||
private fun checkpointDeserializationErrorMessage(
|
||||
checkpointId: StateMachineRunId,
|
||||
exception: Exception
|
||||
|
@ -2,6 +2,8 @@ package net.corda.node.services.statemachine
|
||||
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.internal.FlowIORequest
|
||||
import net.corda.core.internal.FlowStateMachine
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.internal.LifecycleSupport
|
||||
import java.time.Duration
|
||||
@ -12,10 +14,12 @@ import java.util.concurrent.Executors
|
||||
import java.util.concurrent.ScheduledExecutorService
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
internal class FlowMonitor constructor(private val retrieveFlows: () -> Set<FlowStateMachineImpl<*>>,
|
||||
private val monitoringPeriod: Duration,
|
||||
private val suspensionLoggingThreshold: Duration,
|
||||
private var scheduler: ScheduledExecutorService? = null) : LifecycleSupport {
|
||||
internal class FlowMonitor(
|
||||
private val smm: StateMachineManager,
|
||||
private val monitoringPeriod: Duration,
|
||||
private val suspensionLoggingThreshold: Duration,
|
||||
private var scheduler: ScheduledExecutorService? = null
|
||||
) : LifecycleSupport {
|
||||
|
||||
private companion object {
|
||||
private fun defaultScheduler(): ScheduledExecutorService {
|
||||
@ -35,7 +39,7 @@ internal class FlowMonitor constructor(private val retrieveFlows: () -> Set<Flow
|
||||
scheduler = defaultScheduler()
|
||||
shutdownScheduler = true
|
||||
}
|
||||
scheduler!!.scheduleAtFixedRate({ logFlowsWaitingForParty(suspensionLoggingThreshold) }, 0, monitoringPeriod.toMillis(), TimeUnit.MILLISECONDS)
|
||||
scheduler!!.scheduleAtFixedRate({ logFlowsWaitingForParty() }, 0, monitoringPeriod.toMillis(), TimeUnit.MILLISECONDS)
|
||||
started = true
|
||||
}
|
||||
}
|
||||
@ -49,30 +53,38 @@ internal class FlowMonitor constructor(private val retrieveFlows: () -> Set<Flow
|
||||
}
|
||||
}
|
||||
|
||||
private fun logFlowsWaitingForParty(suspensionLoggingThreshold: Duration) {
|
||||
val now = Instant.now()
|
||||
val flows = retrieveFlows()
|
||||
for (flow in flows) {
|
||||
if (flow.isStarted() && flow.ongoingDuration(now) >= suspensionLoggingThreshold) {
|
||||
flow.ioRequest()?.let { request -> warningMessageForFlowWaitingOnIo(request, flow, now) }?.let(logger::info)
|
||||
}
|
||||
private fun logFlowsWaitingForParty() {
|
||||
for ((flow, suspensionDuration) in waitingFlowDurations(suspensionLoggingThreshold)) {
|
||||
flow.ioRequest()?.let { request -> logger.info(warningMessageForFlowWaitingOnIo(request, flow, suspensionDuration)) }
|
||||
}
|
||||
}
|
||||
|
||||
private fun warningMessageForFlowWaitingOnIo(request: FlowIORequest<*>, flow: FlowStateMachineImpl<*>, now: Instant): String {
|
||||
val message = StringBuilder("Flow with id ${flow.id.uuid} has been waiting for ${flow.ongoingDuration(now).toMillis() / 1000} seconds ")
|
||||
@VisibleForTesting
|
||||
fun waitingFlowDurations(suspensionLoggingThreshold: Duration): Sequence<Pair<FlowStateMachineImpl<*>, Duration>> {
|
||||
val now = Instant.now()
|
||||
return smm.snapshot()
|
||||
.asSequence()
|
||||
.filter { flow -> flow !in smm.flowHospital && flow.isStarted() && flow.isSuspended() }
|
||||
.map { flow -> flow to flow.ongoingDuration(now) }
|
||||
.filter { (_, suspensionDuration) -> suspensionDuration >= suspensionLoggingThreshold }
|
||||
}
|
||||
|
||||
private fun warningMessageForFlowWaitingOnIo(request: FlowIORequest<*>,
|
||||
flow: FlowStateMachineImpl<*>,
|
||||
suspensionDuration: Duration): String {
|
||||
val message = StringBuilder("Flow with id ${flow.id.uuid} has been waiting for ${suspensionDuration.toMillis() / 1000} seconds ")
|
||||
message.append(
|
||||
when (request) {
|
||||
is FlowIORequest.Send -> "to send a message to parties ${request.sessionToMessage.keys.partiesInvolved()}"
|
||||
is FlowIORequest.Receive -> "to receive messages from parties ${request.sessions.partiesInvolved()}"
|
||||
is FlowIORequest.SendAndReceive -> "to send and receive messages from parties ${request.sessionToMessage.keys.partiesInvolved()}"
|
||||
is FlowIORequest.WaitForLedgerCommit -> "for the ledger to commit transaction with hash ${request.hash}"
|
||||
is FlowIORequest.GetFlowInfo -> "to get flow information from parties ${request.sessions.partiesInvolved()}"
|
||||
is FlowIORequest.Sleep -> "to wake up from sleep ending at ${LocalDateTime.ofInstant(request.wakeUpAfter, ZoneId.systemDefault())}"
|
||||
FlowIORequest.WaitForSessionConfirmations -> "for sessions to be confirmed"
|
||||
is FlowIORequest.ExecuteAsyncOperation -> "for asynchronous operation of type ${request.operation::javaClass} to complete"
|
||||
FlowIORequest.ForceCheckpoint -> "for forcing a checkpoint at an arbitrary point in a flow"
|
||||
}
|
||||
when (request) {
|
||||
is FlowIORequest.Send -> "to send a message to parties ${request.sessionToMessage.keys.partiesInvolved()}"
|
||||
is FlowIORequest.Receive -> "to receive messages from parties ${request.sessions.partiesInvolved()}"
|
||||
is FlowIORequest.SendAndReceive -> "to send and receive messages from parties ${request.sessionToMessage.keys.partiesInvolved()}"
|
||||
is FlowIORequest.WaitForLedgerCommit -> "for the ledger to commit transaction with hash ${request.hash}"
|
||||
is FlowIORequest.GetFlowInfo -> "to get flow information from parties ${request.sessions.partiesInvolved()}"
|
||||
is FlowIORequest.Sleep -> "to wake up from sleep ending at ${LocalDateTime.ofInstant(request.wakeUpAfter, ZoneId.systemDefault())}"
|
||||
FlowIORequest.WaitForSessionConfirmations -> "for sessions to be confirmed"
|
||||
is FlowIORequest.ExecuteAsyncOperation -> "for asynchronous operation of type ${request.operation::javaClass} to complete"
|
||||
FlowIORequest.ForceCheckpoint -> "for forcing a checkpoint at an arbitrary point in a flow"
|
||||
}
|
||||
)
|
||||
message.append(".")
|
||||
return message.toString()
|
||||
@ -82,9 +94,13 @@ internal class FlowMonitor constructor(private val retrieveFlows: () -> Set<Flow
|
||||
|
||||
private fun FlowStateMachineImpl<*>.ioRequest() = (snapshot().checkpoint.flowState as? FlowState.Started)?.flowIORequest
|
||||
|
||||
private fun FlowStateMachineImpl<*>.ongoingDuration(now: Instant) = Duration.between(createdAt(), now)
|
||||
private fun FlowStateMachineImpl<*>.ongoingDuration(now: Instant): Duration {
|
||||
return transientState?.value?.checkpoint?.timestamp?.let { Duration.between(it, now) } ?: Duration.ZERO
|
||||
}
|
||||
|
||||
private fun FlowStateMachineImpl<*>.createdAt() = context.trace.invocationId.timestamp
|
||||
private fun FlowStateMachineImpl<*>.isSuspended() = !snapshot().isFlowResumed
|
||||
|
||||
private fun FlowStateMachineImpl<*>.isStarted() = transientState?.value?.checkpoint?.flowState is FlowState.Started
|
||||
|
||||
private operator fun StaffedFlowHospital.contains(flow: FlowStateMachine<*>) = contains(flow.id)
|
||||
}
|
@ -791,7 +791,12 @@ class SingleThreadedStateMachineManager(
|
||||
is FlowState.Started -> {
|
||||
val fiber = tryCheckpointDeserialize(flowState.frozenFiber, id) ?: return null
|
||||
val state = StateMachineState(
|
||||
checkpoint = checkpoint,
|
||||
// Do a trivial checkpoint copy below, to update the Checkpoint#timestamp value.
|
||||
// The Checkpoint#timestamp is being used by FlowMonitor as the starting time point of a potential suspension.
|
||||
// We need to refresh the Checkpoint#timestamp here, in case of an e.g. node start up after a long period.
|
||||
// If not then, there is a time window (until the next checkpoint update) in which the FlowMonitor
|
||||
// could log this flow as a waiting flow, from the last checkpoint update i.e. before the node's start up.
|
||||
checkpoint = checkpoint.copy(),
|
||||
pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(),
|
||||
isFlowResumed = false,
|
||||
isTransactionTracked = false,
|
||||
|
@ -10,6 +10,7 @@ import net.corda.core.internal.FlowIORequest
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.utilities.Try
|
||||
import net.corda.node.services.messaging.DeduplicationHandler
|
||||
import java.time.Instant
|
||||
|
||||
/**
|
||||
* The state of the state machine, capturing the state of a flow. It consists of two parts, an *immutable* part that is
|
||||
@ -62,6 +63,9 @@ data class Checkpoint(
|
||||
val errorState: ErrorState,
|
||||
val numberOfSuspends: Int
|
||||
) {
|
||||
|
||||
val timestamp: Instant = Instant.now() // This will get updated every time a Checkpoint object is created/ created by copy.
|
||||
|
||||
companion object {
|
||||
|
||||
fun create(
|
||||
|
@ -46,10 +46,12 @@ import org.junit.Before
|
||||
import org.junit.Test
|
||||
import rx.Notification
|
||||
import rx.Observable
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.function.Predicate
|
||||
import kotlin.reflect.KClass
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
|
||||
class FlowFrameworkTests {
|
||||
@ -158,6 +160,54 @@ class FlowFrameworkTests {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `FlowMonitor flow suspends on a FlowIORequest`() { // alice flow only, suspends on a FlowIORequest
|
||||
monitorFlows { aliceFlowMonitor, bobFlowMonitor ->
|
||||
val terminationSignal = Semaphore(0)
|
||||
// bob's flow need to wait otherwise it could end the session prematurely
|
||||
bobNode.registerCordappFlowFactory(ReceiveFlow::class) { NoOpFlow( terminateUponSignal = terminationSignal) }
|
||||
aliceNode.services.startFlow(ReceiveFlow(bob))
|
||||
mockNet.runNetwork()
|
||||
assertEquals(1, aliceFlowMonitor.waitingFlowDurations(Duration.ZERO).toSet().size)
|
||||
assertEquals(0, bobFlowMonitor.waitingFlowDurations(Duration.ZERO).toSet().size)
|
||||
// continue bob's NoOpFlow, it will send an EndSessionMessage to alice
|
||||
terminationSignal.release()
|
||||
mockNet.runNetwork()
|
||||
// alice's ReceiveFlow is not finished because bob sent an EndSessionMessage, check that flow is no longer waiting
|
||||
assertEquals(0, aliceFlowMonitor.waitingFlowDurations(Duration.ZERO).toSet().size)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `FlowMonitor flows suspend on a FlowIORequest`() { // alice and bob's flows, both suspend on a FlowIORequest
|
||||
monitorFlows { aliceFlowMonitor, bobFlowMonitor ->
|
||||
bobNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedReceiveFlow(it) }
|
||||
aliceNode.services.startFlow(ReceiveFlow(bob))
|
||||
mockNet.runNetwork()
|
||||
// both flows are suspened on a receive from the counter party
|
||||
assertEquals(1, aliceFlowMonitor.waitingFlowDurations(Duration.ZERO).toSet().size)
|
||||
assertEquals(1, bobFlowMonitor.waitingFlowDurations(Duration.ZERO).toSet().size)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `FlowMonitor flow is running`() { // flow is running a "take a long time" task
|
||||
monitorFlows { aliceFlowMonitor, _ ->
|
||||
val terminationSignal = Semaphore(0)
|
||||
// "take a long time" task, implemented by a NoOpFlow stuck in call method
|
||||
aliceNode.services.startFlow(NoOpFlow( terminateUponSignal = terminationSignal))
|
||||
mockNet.waitQuiescent() // current thread needs to wait fiber running on a different thread, has reached the blocking point
|
||||
assertEquals(0, aliceFlowMonitor.waitingFlowDurations(Duration.ZERO).toSet().size)
|
||||
// "take a long time" flow continues ...
|
||||
terminationSignal.release()
|
||||
assertEquals(0, aliceFlowMonitor.waitingFlowDurations(Duration.ZERO).toSet().size)
|
||||
}
|
||||
}
|
||||
|
||||
private fun monitorFlows(script: (FlowMonitor, FlowMonitor) -> Unit) {
|
||||
script(FlowMonitor(aliceNode.smm, Duration.ZERO, Duration.ZERO), FlowMonitor(bobNode.smm, Duration.ZERO, Duration.ZERO))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `receiving unexpected session end before entering sendAndReceive`() {
|
||||
bobNode.registerCordappFlowFactory(WaitForOtherSideEndBeforeSendAndReceive::class) { NoOpFlow() }
|
||||
@ -709,7 +759,10 @@ internal open class SendFlow(private val payload: Any, private vararg val otherP
|
||||
}
|
||||
}
|
||||
|
||||
internal class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic<Unit>() {
|
||||
internal class NoOpFlow(
|
||||
val nonTerminating: Boolean = false,
|
||||
@Transient val terminateUponSignal: Semaphore? = null
|
||||
) : FlowLogic<Unit>() {
|
||||
@Transient
|
||||
var flowStarted = false
|
||||
|
||||
@ -719,6 +772,8 @@ internal class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic<Unit>()
|
||||
if (nonTerminating) {
|
||||
Fiber.park()
|
||||
}
|
||||
|
||||
terminateUponSignal?.acquire() // block at Semaphore and resume upon external signaling
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user