ENT-3496 Add suspendedTimestamp and secondsSpentWaiting to checkpoint dump

This commit is contained in:
LankyDan 2019-06-11 16:22:17 +01:00
parent 143499f6fd
commit 245a14d499

View File

@ -3,6 +3,7 @@ package net.corda.node.services.rpc
import co.paralleluniverse.fibers.Stack
import com.fasterxml.jackson.annotation.JsonAutoDetect
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
import com.fasterxml.jackson.annotation.JsonFormat
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.annotation.JsonValue
@ -41,10 +42,12 @@ import net.corda.node.services.statemachine.*
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
import net.corda.serialization.internal.withTokenContext
import java.time.Duration
import java.time.Instant
import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter
import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.zip.ZipEntry
import java.util.zip.ZipOutputStream
@ -87,7 +90,8 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private
fun dump() {
try {
val file = serviceHub.configuration.baseDirectory / "logs" / "checkpoints_dump-${TIME_FORMATTER.format(serviceHub.clock.instant())}.zip"
val now = serviceHub.clock.instant()
val file = serviceHub.configuration.baseDirectory / "logs" / "checkpoints_dump-${TIME_FORMATTER.format(now)}.zip"
if (lock.getAndIncrement() == 0 && !file.exists()) {
file.parent.toFile().mkdirs()
database.transaction {
@ -95,7 +99,7 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private
ZipOutputStream(file.outputStream()).use { zip ->
stream.forEach { (runId, serialisedCheckpoint) ->
val checkpoint = serialisedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext)
val json = checkpoint.toJson(runId.uuid)
val json = checkpoint.toJson(runId.uuid, now)
val jsonBytes = writer.writeValueAsBytes(json)
zip.putNextEntry(ZipEntry("${json.flowLogicClass.simpleName}-${runId.uuid}.json"))
zip.write(jsonBytes)
@ -112,7 +116,7 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private
}
}
private fun Checkpoint.toJson(id: UUID): CheckpointJson {
private fun Checkpoint.toJson(id: UUID, now: Instant): CheckpointJson {
val (fiber, flowLogic) = when (flowState) {
is FlowState.Unstarted -> {
null to flowState.frozenFlowLogic.checkpointDeserialize(context = checkpointSerializationContext)
@ -135,13 +139,12 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private
} else {
emptyList()
}
return CheckpointJson(
id,
flowLogic.javaClass,
flowLogic,
flowCallStack,
(flowState as? FlowState.Started)?.flowIORequest?.toSuspendedOn(),
(flowState as? FlowState.Started)?.flowIORequest?.toSuspendedOn(suspendedTimestamp(), now),
invocationContext.origin.toOrigin(),
ourIdentity,
sessions.mapNotNull { it.value.toActiveSession(it.key) },
@ -149,17 +152,19 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private
)
}
private fun Checkpoint.suspendedTimestamp(): Instant = invocationContext.trace.invocationId.timestamp
@Suppress("unused")
private class FlowCall(val flowClass: Class<*>, val progressStep: String?)
@Suppress("unused")
@JsonInclude(Include.NON_NULL)
private class Origin(
val rpc: String? = null,
val peer: CordaX500Name? = null,
val service: String? = null,
val scheduled: ScheduledStateRef? = null,
val shell: InvocationOrigin.Shell? = null
val rpc: String? = null,
val peer: CordaX500Name? = null,
val service: String? = null,
val scheduled: ScheduledStateRef? = null,
val shell: InvocationOrigin.Shell? = null
)
private fun InvocationOrigin.toOrigin(): Origin {
@ -198,19 +203,22 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private
val waitForSessionConfirmations: FlowIORequest.WaitForSessionConfirmations? = null,
val customOperation: FlowIORequest.ExecuteAsyncOperation<*>? = null,
val forceCheckpoint: FlowIORequest.ForceCheckpoint? = null
)
) {
@JsonFormat(pattern ="yyyy-MM-dd'T'HH:mm:ss", timezone = "UTC")
lateinit var suspendedTimestamp: Instant
var secondsSpentWaiting: Long = 0
}
@Suppress("unused")
private class SendJson(val session: FlowSession, val sentPayloadType: Class<*>, val sentPayload: Any)
private fun FlowIORequest<*>.toSuspendedOn(): SuspendedOn {
private fun FlowIORequest<*>.toSuspendedOn(suspendedTimestamp: Instant, now: Instant): SuspendedOn {
fun Map<FlowSession, SerializedBytes<Any>>.toJson(): List<SendJson> {
return map {
val payload = it.value.deserialize()
SendJson(it.key, payload.javaClass, payload)
}
}
return when (this) {
is FlowIORequest.Send -> SuspendedOn(send = sessionToMessage.toJson())
is FlowIORequest.Receive -> SuspendedOn(receive = sessions)
@ -226,6 +234,9 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private
else -> SuspendedOn(customOperation = this)
}
}
}.also {
it.suspendedTimestamp = suspendedTimestamp
it.secondsSpentWaiting = TimeUnit.MILLISECONDS.toSeconds(Duration.between(suspendedTimestamp, now).toMillis())
}
}