diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt index 7e7cc5b38d..b07e7391f4 100644 --- a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt +++ b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt @@ -3,6 +3,7 @@ package net.corda.node.services.rpc import co.paralleluniverse.fibers.Stack import com.fasterxml.jackson.annotation.JsonAutoDetect import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility +import com.fasterxml.jackson.annotation.JsonFormat import com.fasterxml.jackson.annotation.JsonInclude import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.annotation.JsonValue @@ -41,10 +42,12 @@ import net.corda.node.services.statemachine.* import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl import net.corda.serialization.internal.withTokenContext +import java.time.Duration import java.time.Instant import java.time.ZoneOffset.UTC import java.time.format.DateTimeFormatter import java.util.* +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import java.util.zip.ZipEntry import java.util.zip.ZipOutputStream @@ -87,7 +90,8 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private fun dump() { try { - val file = serviceHub.configuration.baseDirectory / "logs" / "checkpoints_dump-${TIME_FORMATTER.format(serviceHub.clock.instant())}.zip" + val now = serviceHub.clock.instant() + val file = serviceHub.configuration.baseDirectory / "logs" / "checkpoints_dump-${TIME_FORMATTER.format(now)}.zip" if (lock.getAndIncrement() == 0 && !file.exists()) { file.parent.toFile().mkdirs() database.transaction { @@ -95,7 +99,7 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private ZipOutputStream(file.outputStream()).use { zip -> stream.forEach { (runId, serialisedCheckpoint) -> val checkpoint = serialisedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext) - val json = checkpoint.toJson(runId.uuid) + val json = checkpoint.toJson(runId.uuid, now) val jsonBytes = writer.writeValueAsBytes(json) zip.putNextEntry(ZipEntry("${json.flowLogicClass.simpleName}-${runId.uuid}.json")) zip.write(jsonBytes) @@ -112,7 +116,7 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private } } - private fun Checkpoint.toJson(id: UUID): CheckpointJson { + private fun Checkpoint.toJson(id: UUID, now: Instant): CheckpointJson { val (fiber, flowLogic) = when (flowState) { is FlowState.Unstarted -> { null to flowState.frozenFlowLogic.checkpointDeserialize(context = checkpointSerializationContext) @@ -135,13 +139,12 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private } else { emptyList() } - return CheckpointJson( id, flowLogic.javaClass, flowLogic, flowCallStack, - (flowState as? FlowState.Started)?.flowIORequest?.toSuspendedOn(), + (flowState as? FlowState.Started)?.flowIORequest?.toSuspendedOn(suspendedTimestamp(), now), invocationContext.origin.toOrigin(), ourIdentity, sessions.mapNotNull { it.value.toActiveSession(it.key) }, @@ -149,17 +152,19 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private ) } + private fun Checkpoint.suspendedTimestamp(): Instant = invocationContext.trace.invocationId.timestamp + @Suppress("unused") private class FlowCall(val flowClass: Class<*>, val progressStep: String?) @Suppress("unused") @JsonInclude(Include.NON_NULL) private class Origin( - val rpc: String? = null, - val peer: CordaX500Name? = null, - val service: String? = null, - val scheduled: ScheduledStateRef? = null, - val shell: InvocationOrigin.Shell? = null + val rpc: String? = null, + val peer: CordaX500Name? = null, + val service: String? = null, + val scheduled: ScheduledStateRef? = null, + val shell: InvocationOrigin.Shell? = null ) private fun InvocationOrigin.toOrigin(): Origin { @@ -198,19 +203,22 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private val waitForSessionConfirmations: FlowIORequest.WaitForSessionConfirmations? = null, val customOperation: FlowIORequest.ExecuteAsyncOperation<*>? = null, val forceCheckpoint: FlowIORequest.ForceCheckpoint? = null - ) + ) { + @JsonFormat(pattern ="yyyy-MM-dd'T'HH:mm:ss", timezone = "UTC") + lateinit var suspendedTimestamp: Instant + var secondsSpentWaiting: Long = 0 + } @Suppress("unused") private class SendJson(val session: FlowSession, val sentPayloadType: Class<*>, val sentPayload: Any) - private fun FlowIORequest<*>.toSuspendedOn(): SuspendedOn { + private fun FlowIORequest<*>.toSuspendedOn(suspendedTimestamp: Instant, now: Instant): SuspendedOn { fun Map>.toJson(): List { return map { val payload = it.value.deserialize() SendJson(it.key, payload.javaClass, payload) } } - return when (this) { is FlowIORequest.Send -> SuspendedOn(send = sessionToMessage.toJson()) is FlowIORequest.Receive -> SuspendedOn(receive = sessions) @@ -226,6 +234,9 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private else -> SuspendedOn(customOperation = this) } } + }.also { + it.suspendedTimestamp = suspendedTimestamp + it.secondsSpentWaiting = TimeUnit.MILLISECONDS.toSeconds(Duration.between(suspendedTimestamp, now).toMillis()) } }