diff --git a/detekt-baseline.xml b/detekt-baseline.xml index 7d6ad2dac4..53f7b49139 100644 --- a/detekt-baseline.xml +++ b/detekt-baseline.xml @@ -6,11 +6,6 @@ ClassNaming:AbstractCashFlow.kt$AbstractCashFlow.Companion$GENERATING_ID : Step ClassNaming:AbstractCashFlow.kt$AbstractCashFlow.Companion$GENERATING_TX : Step ClassNaming:AbstractCashFlow.kt$AbstractCashFlow.Companion$SIGNING_TX : Step - ClassNaming:BlobWriter.kt$_Li_ - ClassNaming:BlobWriter.kt$_Mis_ - ClassNaming:BlobWriter.kt$_i_ - ClassNaming:BlobWriter.kt$_i_is__ - ClassNaming:BlobWriter.kt$_is_ ClassNaming:BuyerFlow.kt$BuyerFlow$STARTING_BUY : Step ClassNaming:CompositeMemberCompositeSchemaToClassCarpenterTests.kt$I_ ClassNaming:CordaServiceTest.kt$CordaServiceTest.DummyServiceFlow.Companion$TEST_STEP : Step @@ -145,7 +140,7 @@ ComplexMethod:DriverDSLImpl.kt$DriverDSLImpl$override fun start() ComplexMethod:Expect.kt$ fun <S, E : Any> S.genericExpectEvents( isStrict: Boolean = true, stream: S.((E) -> Unit) -> Unit, expectCompose: () -> ExpectCompose<E> ) ComplexMethod:FinalityFlow.kt$FinalityFlow$@Suspendable @Throws(NotaryException::class) override fun call(): SignedTransaction - ComplexMethod:FlowMonitor.kt$FlowMonitor$private fun warningMessageForFlowWaitingOnIo(request: FlowIORequest<*>, flow: FlowStateMachineImpl<*>, now: Instant): String + ComplexMethod:FlowMonitor.kt$FlowMonitor$private fun warningMessageForFlowWaitingOnIo(request: FlowIORequest<*>, flow: FlowStateMachineImpl<*>, suspensionDuration: Duration): String ComplexMethod:FlowStateMachineImpl.kt$FlowStateMachineImpl$ @Suspendable private fun processEventsUntilFlowIsResumed(isDbTransactionOpenOnEntry: Boolean, isDbTransactionOpenOnExit: Boolean): Any? ComplexMethod:GenerateRpcSslCertsCli.kt$GenerateRpcSslCerts$private fun generateRpcSslCertificates(conf: NodeConfiguration) ComplexMethod:GenericsTests.kt$GenericsTests$@Test fun nestedSerializationInMultipleContextsDoesntColideGenericTypes() @@ -716,7 +711,6 @@ LongParameterList:ParametersUtilities.kt$( notaries: List<NotaryInfo> = emptyList(), minimumPlatformVersion: Int = 1, modifiedTime: Instant = Instant.now(), maxMessageSize: Int = 10485760, // TODO: Make this configurable and consistence across driver, bootstrapper, demobench and NetworkMapServer maxTransactionSize: Int = maxMessageSize * 50, whitelistedContractImplementations: Map<String, List<AttachmentId>> = emptyMap(), epoch: Int = 1, eventHorizon: Duration = 30.days, packageOwnership: Map<String, PublicKey> = emptyMap() ) LongParameterList:PersistentUniquenessProvider.kt$PersistentUniquenessProvider$( states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef> ) LongParameterList:PhysicalLocationStructures.kt$WorldCoordinate$(screenWidth: Double, screenHeight: Double, topLatitude: Double, bottomLatitude: Double, leftLongitude: Double, rightLongitude: Double) - LongParameterList:ProcessUtilities.kt$ProcessUtilities$( arguments: List<String>, classPath: List<String> = defaultClassPath, workingDirectory: Path? = null, jdwpPort: Int? = null, extraJvmArguments: List<String> = emptyList(), maximumHeapSize: String? = null ) LongParameterList:QueryCriteria.kt$QueryCriteria.FungibleAssetQueryCriteria$( participants: List<AbstractParty>? = this.participants, owner: List<AbstractParty>? = this.owner, quantity: ColumnPredicate<Long>? = this.quantity, issuer: List<AbstractParty>? = this.issuer, issuerRef: List<OpaqueBytes>? = this.issuerRef, status: Vault.StateStatus = this.status, contractStateTypes: Set<Class<out ContractState>>? = this.contractStateTypes ) LongParameterList:QueryCriteria.kt$QueryCriteria.FungibleAssetQueryCriteria$( participants: List<AbstractParty>? = this.participants, owner: List<AbstractParty>? = this.owner, quantity: ColumnPredicate<Long>? = this.quantity, issuer: List<AbstractParty>? = this.issuer, issuerRef: List<OpaqueBytes>? = this.issuerRef, status: Vault.StateStatus = this.status, contractStateTypes: Set<Class<out ContractState>>? = this.contractStateTypes, relevancyStatus: Vault.RelevancyStatus = this.relevancyStatus ) LongParameterList:QueryCriteria.kt$QueryCriteria.LinearStateQueryCriteria$( participants: List<AbstractParty>? = this.participants, uuid: List<UUID>? = this.uuid, externalId: List<String>? = this.externalId, status: Vault.StateStatus = this.status, contractStateTypes: Set<Class<out ContractState>>? = this.contractStateTypes, relevancyStatus: Vault.RelevancyStatus = this.relevancyStatus ) @@ -2114,11 +2108,8 @@ MaxLineLength:FlowMessaging.kt$FlowMessagingImpl$val mightDeadlockDrainingTarget = FlowStateMachineImpl.currentStateMachine()?.context?.origin.let { it is InvocationOrigin.Peer && it.party == target.name } MaxLineLength:FlowMessaging.kt$FlowMessagingImpl$val networkMessage = serviceHub.networkService.createMessage(sessionTopic, serializeSessionMessage(message).bytes, deduplicationId, message.additionalHeaders(party)) MaxLineLength:FlowMessaging.kt$FlowMessagingImpl${ // Handling Kryo and AMQP serialization problems. Unfortunately the two exception types do not share much of a common exception interface. if ((exception is KryoException || exception is NotSerializableException) && message is ExistingSessionMessage && message.payload is ErrorSessionMessage) { val error = message.payload.flowException val rewrappedError = FlowException(error?.message) message.copy(payload = message.payload.copy(flowException = rewrappedError)).serialize() } else { throw exception } } - MaxLineLength:FlowMonitor.kt$FlowMonitor$is FlowIORequest.ExecuteAsyncOperation -> "for asynchronous operation of type ${request.operation::javaClass} to complete" MaxLineLength:FlowMonitor.kt$FlowMonitor$is FlowIORequest.SendAndReceive -> "to send and receive messages from parties ${request.sessionToMessage.keys.partiesInvolved()}" MaxLineLength:FlowMonitor.kt$FlowMonitor$is FlowIORequest.Sleep -> "to wake up from sleep ending at ${LocalDateTime.ofInstant(request.wakeUpAfter, ZoneId.systemDefault())}" - MaxLineLength:FlowMonitor.kt$FlowMonitor$scheduler!!.scheduleAtFixedRate({ logFlowsWaitingForParty(suspensionLoggingThreshold) }, 0, monitoringPeriod.toMillis(), TimeUnit.MILLISECONDS) - MaxLineLength:FlowMonitor.kt$FlowMonitor$val message = StringBuilder("Flow with id ${flow.id.uuid} has been waiting for ${flow.ongoingDuration(now).toMillis() / 1000} seconds ") MaxLineLength:FlowRetryTest.kt$FlowRetryTest$it.proxy.startFlow(::InitiatorFlow, numSessions, numIterations, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow() MaxLineLength:FlowSessionImpl.kt$FlowSessionImpl$@Suspendable override MaxLineLength:FlowStackSnapshot.kt$FlowStackSnapshotFactoryImpl$private diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 675f1c2c98..7f39fce87f 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -500,22 +500,19 @@ abstract class AbstractNode(val configuration: NodeConfiguration, smm.start(frozenTokenizableServices) // Shut down the SMM so no Fibers are scheduled. runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) } - (smm as? StateMachineManagerInternal)?.let { - val flowMonitor = FlowMonitor({ - smm.snapshot().filter { flow -> flow !in smm.flowHospital }.toSet() - }, configuration.flowMonitorPeriodMillis, configuration.flowMonitorSuspensionLoggingThresholdMillis) - runOnStop += flowMonitor::stop - flowMonitor.start() - } - + val flowMonitor = FlowMonitor( + smm, + configuration.flowMonitorPeriodMillis, + configuration.flowMonitorSuspensionLoggingThresholdMillis + ) + runOnStop += flowMonitor::stop + flowMonitor.start() schedulerService.start() createStartedNode(nodeInfo, rpcOps, notaryService).also { _started = it } } } - private operator fun StaffedFlowHospital.contains(flow: FlowStateMachine<*>) = contains(flow.id) - /** Subclasses must override this to create a "started" node of the desired type, using the provided machinery. */ abstract fun createStartedNode(nodeInfo: NodeInfo, rpcOps: CordaRPCOps, notaryService: NotaryService?): S diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt index 5bf3107339..5a2ef80d56 100644 --- a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt +++ b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt @@ -184,7 +184,7 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private flowCallStackSummary = flowCallStack.toSummary(), flowCallStack = flowCallStack, suspendedOn = (flowState as? FlowState.Started)?.flowIORequest?.toSuspendedOn( - suspendedTimestamp(), + timestamp, now ), origin = invocationContext.origin.toOrigin(), @@ -194,8 +194,6 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private ) } - private fun Checkpoint.suspendedTimestamp(): Instant = invocationContext.trace.invocationId.timestamp - private fun checkpointDeserializationErrorMessage( checkpointId: StateMachineRunId, exception: Exception diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt index 245b4eea67..b947f62f2b 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt @@ -2,6 +2,8 @@ package net.corda.node.services.statemachine import net.corda.core.flows.FlowSession import net.corda.core.internal.FlowIORequest +import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.VisibleForTesting import net.corda.core.utilities.loggerFor import net.corda.node.internal.LifecycleSupport import java.time.Duration @@ -12,10 +14,12 @@ import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit -internal class FlowMonitor constructor(private val retrieveFlows: () -> Set>, - private val monitoringPeriod: Duration, - private val suspensionLoggingThreshold: Duration, - private var scheduler: ScheduledExecutorService? = null) : LifecycleSupport { +internal class FlowMonitor( + private val smm: StateMachineManager, + private val monitoringPeriod: Duration, + private val suspensionLoggingThreshold: Duration, + private var scheduler: ScheduledExecutorService? = null +) : LifecycleSupport { private companion object { private fun defaultScheduler(): ScheduledExecutorService { @@ -35,7 +39,7 @@ internal class FlowMonitor constructor(private val retrieveFlows: () -> Set Set= suspensionLoggingThreshold) { - flow.ioRequest()?.let { request -> warningMessageForFlowWaitingOnIo(request, flow, now) }?.let(logger::info) - } + private fun logFlowsWaitingForParty() { + for ((flow, suspensionDuration) in waitingFlowDurations(suspensionLoggingThreshold)) { + flow.ioRequest()?.let { request -> logger.info(warningMessageForFlowWaitingOnIo(request, flow, suspensionDuration)) } } } - private fun warningMessageForFlowWaitingOnIo(request: FlowIORequest<*>, flow: FlowStateMachineImpl<*>, now: Instant): String { - val message = StringBuilder("Flow with id ${flow.id.uuid} has been waiting for ${flow.ongoingDuration(now).toMillis() / 1000} seconds ") + @VisibleForTesting + fun waitingFlowDurations(suspensionLoggingThreshold: Duration): Sequence, Duration>> { + val now = Instant.now() + return smm.snapshot() + .asSequence() + .filter { flow -> flow !in smm.flowHospital && flow.isStarted() && flow.isSuspended() } + .map { flow -> flow to flow.ongoingDuration(now) } + .filter { (_, suspensionDuration) -> suspensionDuration >= suspensionLoggingThreshold } + } + + private fun warningMessageForFlowWaitingOnIo(request: FlowIORequest<*>, + flow: FlowStateMachineImpl<*>, + suspensionDuration: Duration): String { + val message = StringBuilder("Flow with id ${flow.id.uuid} has been waiting for ${suspensionDuration.toMillis() / 1000} seconds ") message.append( - when (request) { - is FlowIORequest.Send -> "to send a message to parties ${request.sessionToMessage.keys.partiesInvolved()}" - is FlowIORequest.Receive -> "to receive messages from parties ${request.sessions.partiesInvolved()}" - is FlowIORequest.SendAndReceive -> "to send and receive messages from parties ${request.sessionToMessage.keys.partiesInvolved()}" - is FlowIORequest.WaitForLedgerCommit -> "for the ledger to commit transaction with hash ${request.hash}" - is FlowIORequest.GetFlowInfo -> "to get flow information from parties ${request.sessions.partiesInvolved()}" - is FlowIORequest.Sleep -> "to wake up from sleep ending at ${LocalDateTime.ofInstant(request.wakeUpAfter, ZoneId.systemDefault())}" - FlowIORequest.WaitForSessionConfirmations -> "for sessions to be confirmed" - is FlowIORequest.ExecuteAsyncOperation -> "for asynchronous operation of type ${request.operation::javaClass} to complete" - FlowIORequest.ForceCheckpoint -> "for forcing a checkpoint at an arbitrary point in a flow" - } + when (request) { + is FlowIORequest.Send -> "to send a message to parties ${request.sessionToMessage.keys.partiesInvolved()}" + is FlowIORequest.Receive -> "to receive messages from parties ${request.sessions.partiesInvolved()}" + is FlowIORequest.SendAndReceive -> "to send and receive messages from parties ${request.sessionToMessage.keys.partiesInvolved()}" + is FlowIORequest.WaitForLedgerCommit -> "for the ledger to commit transaction with hash ${request.hash}" + is FlowIORequest.GetFlowInfo -> "to get flow information from parties ${request.sessions.partiesInvolved()}" + is FlowIORequest.Sleep -> "to wake up from sleep ending at ${LocalDateTime.ofInstant(request.wakeUpAfter, ZoneId.systemDefault())}" + FlowIORequest.WaitForSessionConfirmations -> "for sessions to be confirmed" + is FlowIORequest.ExecuteAsyncOperation -> "for asynchronous operation of type ${request.operation::javaClass} to complete" + FlowIORequest.ForceCheckpoint -> "for forcing a checkpoint at an arbitrary point in a flow" + } ) message.append(".") return message.toString() @@ -82,9 +94,13 @@ internal class FlowMonitor constructor(private val retrieveFlows: () -> Set.ioRequest() = (snapshot().checkpoint.flowState as? FlowState.Started)?.flowIORequest - private fun FlowStateMachineImpl<*>.ongoingDuration(now: Instant) = Duration.between(createdAt(), now) + private fun FlowStateMachineImpl<*>.ongoingDuration(now: Instant): Duration { + return transientState?.value?.checkpoint?.timestamp?.let { Duration.between(it, now) } ?: Duration.ZERO + } - private fun FlowStateMachineImpl<*>.createdAt() = context.trace.invocationId.timestamp + private fun FlowStateMachineImpl<*>.isSuspended() = !snapshot().isFlowResumed private fun FlowStateMachineImpl<*>.isStarted() = transientState?.value?.checkpoint?.flowState is FlowState.Started + + private operator fun StaffedFlowHospital.contains(flow: FlowStateMachine<*>) = contains(flow.id) } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index a8ba2dc4e6..6252c9dcdd 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -791,7 +791,12 @@ class SingleThreadedStateMachineManager( is FlowState.Started -> { val fiber = tryCheckpointDeserialize(flowState.frozenFiber, id) ?: return null val state = StateMachineState( - checkpoint = checkpoint, + // Do a trivial checkpoint copy below, to update the Checkpoint#timestamp value. + // The Checkpoint#timestamp is being used by FlowMonitor as the starting time point of a potential suspension. + // We need to refresh the Checkpoint#timestamp here, in case of an e.g. node start up after a long period. + // If not then, there is a time window (until the next checkpoint update) in which the FlowMonitor + // could log this flow as a waiting flow, from the last checkpoint update i.e. before the node's start up. + checkpoint = checkpoint.copy(), pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(), isFlowResumed = false, isTransactionTracked = false, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index 7816cbf9d5..77e1153181 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -10,6 +10,7 @@ import net.corda.core.internal.FlowIORequest import net.corda.core.serialization.SerializedBytes import net.corda.core.utilities.Try import net.corda.node.services.messaging.DeduplicationHandler +import java.time.Instant /** * The state of the state machine, capturing the state of a flow. It consists of two parts, an *immutable* part that is @@ -62,6 +63,9 @@ data class Checkpoint( val errorState: ErrorState, val numberOfSuspends: Int ) { + + val timestamp: Instant = Instant.now() // This will get updated every time a Checkpoint object is created/ created by copy. + companion object { fun create( diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index 8e1ebc4aa5..f8d6812790 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -46,10 +46,12 @@ import org.junit.Before import org.junit.Test import rx.Notification import rx.Observable +import java.time.Duration import java.time.Instant import java.util.* import java.util.function.Predicate import kotlin.reflect.KClass +import kotlin.test.assertEquals import kotlin.test.assertFailsWith class FlowFrameworkTests { @@ -158,6 +160,54 @@ class FlowFrameworkTests { } } + @Test + fun `FlowMonitor flow suspends on a FlowIORequest`() { // alice flow only, suspends on a FlowIORequest + monitorFlows { aliceFlowMonitor, bobFlowMonitor -> + val terminationSignal = Semaphore(0) + // bob's flow need to wait otherwise it could end the session prematurely + bobNode.registerCordappFlowFactory(ReceiveFlow::class) { NoOpFlow( terminateUponSignal = terminationSignal) } + aliceNode.services.startFlow(ReceiveFlow(bob)) + mockNet.runNetwork() + assertEquals(1, aliceFlowMonitor.waitingFlowDurations(Duration.ZERO).toSet().size) + assertEquals(0, bobFlowMonitor.waitingFlowDurations(Duration.ZERO).toSet().size) + // continue bob's NoOpFlow, it will send an EndSessionMessage to alice + terminationSignal.release() + mockNet.runNetwork() + // alice's ReceiveFlow is not finished because bob sent an EndSessionMessage, check that flow is no longer waiting + assertEquals(0, aliceFlowMonitor.waitingFlowDurations(Duration.ZERO).toSet().size) + } + } + + @Test + fun `FlowMonitor flows suspend on a FlowIORequest`() { // alice and bob's flows, both suspend on a FlowIORequest + monitorFlows { aliceFlowMonitor, bobFlowMonitor -> + bobNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedReceiveFlow(it) } + aliceNode.services.startFlow(ReceiveFlow(bob)) + mockNet.runNetwork() + // both flows are suspened on a receive from the counter party + assertEquals(1, aliceFlowMonitor.waitingFlowDurations(Duration.ZERO).toSet().size) + assertEquals(1, bobFlowMonitor.waitingFlowDurations(Duration.ZERO).toSet().size) + } + } + + @Test + fun `FlowMonitor flow is running`() { // flow is running a "take a long time" task + monitorFlows { aliceFlowMonitor, _ -> + val terminationSignal = Semaphore(0) + // "take a long time" task, implemented by a NoOpFlow stuck in call method + aliceNode.services.startFlow(NoOpFlow( terminateUponSignal = terminationSignal)) + mockNet.waitQuiescent() // current thread needs to wait fiber running on a different thread, has reached the blocking point + assertEquals(0, aliceFlowMonitor.waitingFlowDurations(Duration.ZERO).toSet().size) + // "take a long time" flow continues ... + terminationSignal.release() + assertEquals(0, aliceFlowMonitor.waitingFlowDurations(Duration.ZERO).toSet().size) + } + } + + private fun monitorFlows(script: (FlowMonitor, FlowMonitor) -> Unit) { + script(FlowMonitor(aliceNode.smm, Duration.ZERO, Duration.ZERO), FlowMonitor(bobNode.smm, Duration.ZERO, Duration.ZERO)) + } + @Test fun `receiving unexpected session end before entering sendAndReceive`() { bobNode.registerCordappFlowFactory(WaitForOtherSideEndBeforeSendAndReceive::class) { NoOpFlow() } @@ -709,7 +759,10 @@ internal open class SendFlow(private val payload: Any, private vararg val otherP } } -internal class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic() { +internal class NoOpFlow( + val nonTerminating: Boolean = false, + @Transient val terminateUponSignal: Semaphore? = null +) : FlowLogic() { @Transient var flowStarted = false @@ -719,6 +772,8 @@ internal class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic() if (nonTerminating) { Fiber.park() } + + terminateUponSignal?.acquire() // block at Semaphore and resume upon external signaling } }