[CORDA-1605]: When one node in a transaction fails to respond, operator of initiating node needs to understand which node it is waiting for (fix) (#3393)

This commit is contained in:
Michele Sollecito 2018-06-18 17:59:00 +01:00 committed by GitHub
parent 100008b139
commit 20aca788ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 117 additions and 2 deletions

View File

@ -7,6 +7,8 @@ release, see :doc:`upgrade-notes`.
Unreleased
==========
* Added a ``FlowMonitor`` to log information about flows that have been waiting for IO more than a configurable threshold.
* H2 database changes:
* The node's H2 database now listens on ``localhost`` by default.
* The database server address must also be enabled in the node configuration.

View File

@ -232,6 +232,10 @@ absolute path to the node's base directory.
which indicates that the issuer of the TLS certificate is also the issuer of the CRL.
Note: If this parameter is set then the tlsCertCrlDistPoint needs to be set as well.
:flowMonitorPeriodMillis: ``Duration`` of the period suspended flows waiting for IO are logged. Default value is ``60 seconds``.
:flowMonitorSuspensionLoggingThresholdMillis: Threshold ``Duration`` suspended flows waiting for IO need to exceed before they are logged. Default value is ``60 seconds``.
Examples
--------

View File

@ -349,6 +349,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
smm.start(tokenizableServices)
// Shut down the SMM so no Fibers are scheduled.
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
(smm as? StateMachineManagerInternal)?.let {
val flowMonitor = FlowMonitor(smm::snapshot, configuration.flowMonitorPeriodMillis, configuration.flowMonitorSuspensionLoggingThresholdMillis)
runOnStop += { flowMonitor.stop() }
flowMonitor.start()
}
schedulerService.start()
}
_started = this

View File

@ -26,6 +26,9 @@ import java.util.*
val Int.MB: Long get() = this * 1024L * 1024L
private val DEFAULT_FLOW_MONITOR_PERIOD_MILLIS: Duration = Duration.ofMinutes(1)
private val DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS: Duration = Duration.ofMinutes(1)
interface NodeConfiguration : NodeSSLConfiguration {
val myLegalName: CordaX500Name
val emailAddress: String
@ -62,6 +65,9 @@ interface NodeConfiguration : NodeSSLConfiguration {
val tlsCertCrlDistPoint: URL?
val tlsCertCrlIssuer: String?
val effectiveH2Settings: NodeH2Settings?
val flowMonitorPeriodMillis: Duration get() = DEFAULT_FLOW_MONITOR_PERIOD_MILLIS
val flowMonitorSuspensionLoggingThresholdMillis: Duration get() = DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS
fun validate(): List<String>
companion object {
@ -193,7 +199,9 @@ data class NodeConfigurationImpl(
private val h2port: Int? = null,
private val h2Settings: NodeH2Settings? = null,
// do not use or remove (used by Capsule)
private val jarDirs: List<String> = emptyList()
private val jarDirs: List<String> = emptyList(),
override val flowMonitorPeriodMillis: Duration = DEFAULT_FLOW_MONITOR_PERIOD_MILLIS,
override val flowMonitorSuspensionLoggingThresholdMillis: Duration = DEFAULT_FLOW_MONITOR_SUSPENSION_LOGGING_THRESHOLD_MILLIS
) : NodeConfiguration {
companion object {
private val logger = loggerFor<NodeConfigurationImpl>()
@ -477,4 +485,4 @@ data class SecurityConfiguration(val authService: SecurityConfiguration.AuthServ
id = AuthServiceId("NODE_CONFIG"))
}
}
}
}

View File

@ -0,0 +1,89 @@
package net.corda.node.services.statemachine
import net.corda.core.flows.FlowSession
import net.corda.core.internal.FlowIORequest
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.LifecycleSupport
import java.time.Duration
import java.time.Instant
import java.time.LocalDateTime
import java.time.ZoneId
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
internal class FlowMonitor constructor(private val retrieveFlows: () -> Set<FlowStateMachineImpl<*>>,
private val monitoringPeriod: Duration,
private val suspensionLoggingThreshold: Duration,
private var scheduler: ScheduledExecutorService? = null) : LifecycleSupport {
private companion object {
private fun defaultScheduler(): ScheduledExecutorService {
return Executors.newSingleThreadScheduledExecutor()
}
private val logger = loggerFor<FlowMonitor>()
}
override var started = false
private var shutdownScheduler = false
override fun start() {
synchronized(this) {
if (scheduler == null) {
scheduler = defaultScheduler()
shutdownScheduler = true
}
scheduler!!.scheduleAtFixedRate({ logFlowsWaitingForParty(suspensionLoggingThreshold) }, 0, monitoringPeriod.toMillis(), TimeUnit.MILLISECONDS)
started = true
}
}
override fun stop() {
synchronized(this) {
if (shutdownScheduler) {
scheduler!!.shutdown()
}
started = false
}
}
private fun logFlowsWaitingForParty(suspensionLoggingThreshold: Duration) {
val now = Instant.now()
val flows = retrieveFlows()
for (flow in flows) {
if (flow.isStarted() && flow.ongoingDuration(now) >= suspensionLoggingThreshold) {
flow.ioRequest()?.let { request -> warningMessageForFlowWaitingOnIo(request, flow, now) }?.let(logger::info)
}
}
}
private fun warningMessageForFlowWaitingOnIo(request: FlowIORequest<*>, flow: FlowStateMachineImpl<*>, now: Instant): String {
val message = StringBuilder("Flow with id ${flow.id.uuid} has been waiting for ${flow.ongoingDuration(now).toMillis() / 1000} seconds ")
message.append(
when (request) {
is FlowIORequest.Send -> "to send a message to parties ${request.sessionToMessage.keys.partiesInvolved()}"
is FlowIORequest.Receive -> "to receive messages from parties ${request.sessions.partiesInvolved()}"
is FlowIORequest.SendAndReceive -> "to send and receive messages from parties ${request.sessionToMessage.keys.partiesInvolved()}"
is FlowIORequest.WaitForLedgerCommit -> "for the ledger to commit transaction with hash ${request.hash}"
is FlowIORequest.GetFlowInfo -> "to get flow information from parties ${request.sessions.partiesInvolved()}"
is FlowIORequest.Sleep -> "to wake up from sleep ending at ${LocalDateTime.ofInstant(request.wakeUpAfter, ZoneId.systemDefault())}"
FlowIORequest.WaitForSessionConfirmations -> "for sessions to be confirmed"
is FlowIORequest.ExecuteAsyncOperation -> "for asynchronous operation of type ${request.operation::javaClass} to complete"
}
)
message.append(".")
return message.toString()
}
private fun Iterable<FlowSession>.partiesInvolved() = map { it.counterparty }.joinToString(", ", "[", "]")
private fun FlowStateMachineImpl<*>.ioRequest() = (snapshot().checkpoint.flowState as? FlowState.Started)?.flowIORequest
private fun FlowStateMachineImpl<*>.ongoingDuration(now: Instant) = Duration.between(createdAt(), now)
private fun FlowStateMachineImpl<*>.createdAt() = context.trace.invocationId.timestamp
private fun FlowStateMachineImpl<*>.isStarted() = transientState?.value?.checkpoint?.flowState is FlowState.Started
}

View File

@ -142,6 +142,8 @@ class SingleThreadedStateMachineManager(
}
}
override fun snapshot(): Set<FlowStateMachineImpl<*>> = mutex.content.flows.values.map { it.fiber }.toSet()
override fun <A : FlowLogic<*>> findStateMachines(flowClass: Class<A>): List<Pair<A, CordaFuture<*>>> {
return mutex.locked {
flows.values.mapNotNull {

View File

@ -84,6 +84,11 @@ interface StateMachineManager {
fun deliverExternalEvent(event: ExternalEvent)
val flowHospital: StaffedFlowHospital
/**
* Returns a snapshot of all [FlowStateMachineImpl]s currently managed.
*/
fun snapshot(): Set<FlowStateMachineImpl<*>>
}
// These must be idempotent! A later failure in the state transition may error the flow state, and a replay may call