From 5619debf18d64b0f4189eddb940512535c041320 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Fri, 24 May 2019 17:21:27 +0100 Subject: [PATCH] ENT-3496: dumpCheckpoints RPC Dumps all the node's checkpoints as JSON into a single zip file in the node's directory. The output contains: * All the fields for the top-level flow * The current sub-flow call stack, along with the current progress tracker step for each sub-flow * The event that suspended the flow, which if it's a send or sendAndReceive will show the payload that was sent * Low level information on the active sessions with other peers --- .../client/jackson/internal/CordaModule.kt | 8 +- .../net/corda/core/messaging/CordaRPCOps.kt | 3 + .../net/corda/node/internal/AbstractNode.kt | 23 +- .../corda/node/internal/CordaRPCOpsImpl.kt | 30 +- .../node/services/rpc/CheckpointDumper.kt | 298 ++++++++++++++++++ 5 files changed, 341 insertions(+), 21 deletions(-) create mode 100644 node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt diff --git a/client/jackson/src/main/kotlin/net/corda/client/jackson/internal/CordaModule.kt b/client/jackson/src/main/kotlin/net/corda/client/jackson/internal/CordaModule.kt index 14d82e2ab4..910f679d67 100644 --- a/client/jackson/src/main/kotlin/net/corda/client/jackson/internal/CordaModule.kt +++ b/client/jackson/src/main/kotlin/net/corda/client/jackson/internal/CordaModule.kt @@ -3,7 +3,7 @@ package net.corda.client.jackson.internal import com.fasterxml.jackson.annotation.* -import com.fasterxml.jackson.annotation.JsonCreator.Mode.* +import com.fasterxml.jackson.annotation.JsonCreator.Mode.DISABLED import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.core.JsonGenerator import com.fasterxml.jackson.core.JsonParseException @@ -34,6 +34,7 @@ import net.corda.core.internal.DigitalSignatureWithCert import net.corda.core.internal.createComponentGroups import net.corda.core.internal.kotlinObjectInstance import net.corda.core.node.NodeInfo +import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize @@ -43,7 +44,8 @@ import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.parseAsHex import net.corda.core.utilities.toHexString import net.corda.serialization.internal.AllWhitelist -import net.corda.serialization.internal.amqp.* +import net.corda.serialization.internal.amqp.SerializerFactoryBuilder +import net.corda.serialization.internal.amqp.hasCordaSerializable import java.math.BigDecimal import java.security.PublicKey import java.security.cert.CertPath @@ -97,7 +99,7 @@ private class CordaSerializableBeanSerializerModifier : BeanSerializerModifier() beanDesc: BeanDescription, beanProperties: MutableList): MutableList { val beanClass = beanDesc.beanClass - if (hasCordaSerializable(beanClass) && beanClass.kotlinObjectInstance == null) { + if (hasCordaSerializable(beanClass) && beanClass.kotlinObjectInstance == null && !SerializeAsToken::class.java.isAssignableFrom(beanClass)) { val typeInformation = serializerFactory.getTypeInformation(beanClass) val properties = typeInformation.propertiesOrEmptyMap val amqpProperties = properties.mapNotNull { (name, property) -> diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index ecc03e5763..3079b501a4 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -109,6 +109,9 @@ interface CordaRPCOps : RPCOps { @RPCReturnsObservables fun stateMachinesFeed(): DataFeed, StateMachineUpdate> + /** Dump all the current flow checkpoints as JSON into a zip file in the node's directory. */ + fun dumpCheckpoints() + /** * Returns a snapshot of vault states for a given query criteria (and optional order and paging specification) * diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index d5432bff8e..aad8affa82 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -62,6 +62,7 @@ import net.corda.node.services.network.NetworkMapUpdater import net.corda.node.services.network.NodeInfoWatcher import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.node.services.persistence.* +import net.corda.node.services.rpc.CheckpointDumper import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.statemachine.* import net.corda.node.services.transactions.InMemoryTransactionVerifierService @@ -257,16 +258,23 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } /** The implementation of the [CordaRPCOps] interface used by this node. */ - open fun makeRPCOps(cordappLoader: CordappLoader): CordaRPCOps { - val ops: CordaRPCOps = CordaRPCOpsImpl(services, smm, flowStarter) { shutdownExecutor.submit { stop() } }.also { it.closeOnStop() } + open fun makeRPCOps(cordappLoader: CordappLoader, checkpointDumper: CheckpointDumper): CordaRPCOps { + val ops: CordaRPCOps = CordaRPCOpsImpl( + services, + smm, + flowStarter, + checkpointDumper + ) { + shutdownExecutor.submit(::stop) + }.also { it.closeOnStop() } val proxies = mutableListOf<(CordaRPCOps) -> CordaRPCOps>() // Mind that order is relevant here. proxies += ::AuthenticatedRpcOpsProxy if (!configuration.devMode) { - proxies += { it -> ExceptionMaskingRpcOpsProxy(it, true) } + proxies += { ExceptionMaskingRpcOpsProxy(it, true) } } - proxies += { it -> ExceptionSerialisingRpcOpsProxy(it, configuration.devMode) } - proxies += { it -> ThreadContextAdjustingRpcOpsProxy(it, cordappLoader.appClassLoader) } + proxies += { ExceptionSerialisingRpcOpsProxy(it, configuration.devMode) } + proxies += { ThreadContextAdjustingRpcOpsProxy(it, cordappLoader.appClassLoader) } return proxies.fold(ops) { delegate, decorate -> decorate(delegate) } } @@ -327,7 +335,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, installCoreFlows() registerCordappFlows() services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows } - val rpcOps = makeRPCOps(cordappLoader) + val checkpointDumper = CheckpointDumper(checkpointStorage, database, services) + val rpcOps = makeRPCOps(cordappLoader, checkpointDumper) startShell() networkMapClient?.start(trustRoot) @@ -388,7 +397,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, tokenizableServices = null verifyCheckpointsCompatible(frozenTokenizableServices) - + checkpointDumper.start(frozenTokenizableServices) smm.start(frozenTokenizableServices) // Shut down the SMM so no Fibers are scheduled. runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) } diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index 9019e0b7e3..40efde217c 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -31,6 +31,7 @@ import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.loggerFor import net.corda.node.services.api.FlowStarter import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.rpc.CheckpointDumper import net.corda.node.services.rpc.context import net.corda.node.services.statemachine.StateMachineManager import net.corda.nodeapi.exceptions.NonRpcFlowException @@ -51,6 +52,7 @@ internal class CordaRPCOpsImpl( private val services: ServiceHubInternal, private val smm: StateMachineManager, private val flowStarter: FlowStarter, + private val checkpointDumper: CheckpointDumper, private val shutdownNode: () -> Unit ) : CordaRPCOps, AutoCloseable { @@ -130,6 +132,8 @@ internal class CordaRPCOpsImpl( return services.validatedTransactions.track() } + override fun dumpCheckpoints() = checkpointDumper.dump() + override fun stateMachinesSnapshot(): List { val (snapshot, updates) = stateMachinesFeed() updates.notUsed() @@ -298,17 +302,21 @@ internal class CordaRPCOpsImpl( override fun shutdown() = terminate(false) override fun terminate(drainPendingFlows: Boolean) { - if (drainPendingFlows) { logger.info("Waiting for pending flows to complete before shutting down.") setFlowsDrainingModeEnabled(true) - drainingShutdownHook.set(pendingFlowsCount().updates.doOnNext {(completed, total) -> - logger.info("Pending flows progress before shutdown: $completed / $total.") - }.doOnCompleted { setPersistentDrainingModeProperty(false, false) }.doOnCompleted(::cancelDrainingShutdownHook).doOnCompleted { logger.info("No more pending flows to drain. Shutting down.") }.doOnCompleted(shutdownNode::invoke).subscribe({ - // Nothing to do on each update here, only completion matters. - }, { error -> - logger.error("Error while waiting for pending flows to drain in preparation for shutdown. Cause was: ${error.message}", error) - })) + val subscription = pendingFlowsCount() + .updates + .doOnNext { (completed, total) -> logger.info("Pending flows progress before shutdown: $completed / $total.") } + .doOnCompleted { setPersistentDrainingModeProperty(enabled = false, propagateChange = false) } + .doOnCompleted(::cancelDrainingShutdownHook) + .doOnCompleted { logger.info("No more pending flows to drain. Shutting down.") } + .doOnCompleted(shutdownNode::invoke) + .subscribe( + { }, // Nothing to do on each update here, only completion matters. + { error -> logger.error("Error while waiting for pending flows to drain in preparation for shutdown. Cause was: ${error.message}", error) } + ) + drainingShutdownHook.set(subscription) } else { shutdownNode.invoke() } @@ -317,19 +325,19 @@ internal class CordaRPCOpsImpl( override fun isWaitingForShutdown() = drainingShutdownHook.get() != null override fun close() { - cancelDrainingShutdownHook() } private fun cancelDrainingShutdownHook() { - drainingShutdownHook.getAndSet(null)?.let { it.unsubscribe() logger.info("Cancelled draining shutdown hook.") } } - private fun setPersistentDrainingModeProperty(enabled: Boolean, propagateChange: Boolean) = services.nodeProperties.flowsDrainingMode.setEnabled(enabled, propagateChange) + private fun setPersistentDrainingModeProperty(enabled: Boolean, propagateChange: Boolean) { + services.nodeProperties.flowsDrainingMode.setEnabled(enabled, propagateChange) + } private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo { return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context) diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt new file mode 100644 index 0000000000..2a2e540cf6 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt @@ -0,0 +1,298 @@ +package net.corda.node.services.rpc + +import co.paralleluniverse.fibers.Stack +import com.fasterxml.jackson.annotation.JsonAutoDetect +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility +import com.fasterxml.jackson.annotation.JsonInclude +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.annotation.JsonValue +import com.fasterxml.jackson.core.JsonGenerator +import com.fasterxml.jackson.core.util.DefaultIndenter +import com.fasterxml.jackson.core.util.DefaultPrettyPrinter +import com.fasterxml.jackson.databind.* +import com.fasterxml.jackson.databind.module.SimpleModule +import com.fasterxml.jackson.databind.ser.BeanPropertyWriter +import com.fasterxml.jackson.databind.ser.BeanSerializerModifier +import net.corda.client.jackson.JacksonSupport +import net.corda.client.jackson.internal.jsonObject +import net.corda.core.context.InvocationOrigin +import net.corda.core.contracts.Attachment +import net.corda.core.contracts.ScheduledStateRef +import net.corda.core.contracts.StateRef +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FlowInfo +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party +import net.corda.core.internal.* +import net.corda.core.serialization.SerializeAsToken +import net.corda.core.serialization.SerializedBytes +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.internal.CheckpointSerializationContext +import net.corda.core.serialization.internal.CheckpointSerializationDefaults +import net.corda.core.serialization.internal.checkpointDeserialize +import net.corda.core.utilities.NonEmptySet +import net.corda.core.utilities.ProgressTracker +import net.corda.node.services.api.CheckpointStorage +import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.statemachine.* +import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl +import net.corda.serialization.internal.withTokenContext +import java.time.Instant +import java.time.LocalTime +import java.time.ZoneOffset.UTC +import java.time.format.DateTimeFormatter +import java.util.* +import java.util.zip.ZipEntry +import java.util.zip.ZipOutputStream + +class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private val database: CordaPersistence, private val serviceHub: ServiceHubInternal) { + companion object { + private val TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss").withZone(UTC) + } + + private lateinit var checkpointSerializationContext: CheckpointSerializationContext + private lateinit var writer: ObjectWriter + + fun start(tokenizableServices: List) { + checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext( + CheckpointSerializeAsTokenContextImpl( + tokenizableServices, + CheckpointSerializationDefaults.CHECKPOINT_SERIALIZER, + CheckpointSerializationDefaults.CHECKPOINT_CONTEXT, + serviceHub + ) + ) + + val mapper = JacksonSupport.createNonRpcMapper() + mapper.registerModule(SimpleModule().apply { + setSerializerModifier(CheckpointDumperBeanModifier) + addSerializer(FlowSessionImplSerializer) + addSerializer(MapSerializer) + addSerializer(AttachmentSerializer) + setMixInAnnotation(FlowLogic::class.java, FlowLogicMixin::class.java) + setMixInAnnotation(SessionId::class.java, SessionIdMixin::class.java) + }) + val prettyPrinter = DefaultPrettyPrinter().apply { + indentArraysWith(DefaultIndenter.SYSTEM_LINEFEED_INSTANCE) + } + writer = mapper.writer(prettyPrinter) + } + + fun dump() { + val file = serviceHub.configuration.baseDirectory / "checkpoints_dump-${TIME_FORMATTER.format(serviceHub.clock.instant())}.zip" + database.transaction { + checkpointStorage.getAllCheckpoints().use { stream -> + ZipOutputStream(file.outputStream()).use { zip -> + stream.forEach { (runId, serialisedCheckpoint) -> + val checkpoint = serialisedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext) + val json = checkpoint.toJson(runId.uuid) + val jsonBytes = writer.writeValueAsBytes(json) + zip.putNextEntry(ZipEntry("${runId.uuid}.json")) + zip.write(jsonBytes) + zip.closeEntry() + } + } + } + } + } + + private val formatter = DateTimeFormatter.ofPattern("HHmmss.SSS") + + fun dump(id: UUID, checkpoint: Checkpoint) { + val info = checkpoint.toJson(id) + val file = serviceHub.configuration.baseDirectory / "$id-${formatter.format(LocalTime.now(serviceHub.clock))}.json" + writer.writeValue(file.toFile(), info) + } + + private fun Checkpoint.toJson(id: UUID): CheckpointJson { + val (fiber, flowLogic) = when (flowState) { + is FlowState.Unstarted -> { + null to flowState.frozenFlowLogic.checkpointDeserialize(context = checkpointSerializationContext) + } + is FlowState.Started -> { + val fiber = flowState.frozenFiber.checkpointDeserialize(context = checkpointSerializationContext) + fiber to fiber.logic + } + } + + val flowCallStack = if (fiber != null) { + // Poke into Quasar's stack and find the object references to the sub-flows so that we can correctly get the current progress + // step for each sub-call. + val stackObjects = fiber.declaredField("stack").value.declaredField>("dataObject").value + subFlowStack.map { subFlow -> + val subFlowLogic = stackObjects.find(subFlow.flowClass::isInstance) as? FlowLogic<*> + val currentStep = subFlowLogic?.progressTracker?.currentStep + FlowCall(subFlow.flowClass, if (currentStep == ProgressTracker.UNSTARTED) null else currentStep?.label) + }.reversed() + } else { + emptyList() + } + + return CheckpointJson( + id, + flowLogic.javaClass, + flowLogic, + flowCallStack, + (flowState as? FlowState.Started)?.flowIORequest?.toSuspendedOn(), + invocationContext.origin.toOrigin(), + ourIdentity, + sessions.mapNotNull { it.value.toActiveSession(it.key) }, + errorState as? ErrorState.Errored + ) + } + + @Suppress("unused") + private class FlowCall(val flowClass: Class<*>, val progressStep: String?) + + @Suppress("unused") + @JsonInclude(Include.NON_NULL) + private class Origin( + val rpc: String? = null, + val peer: CordaX500Name? = null, + val service: String? = null, + val scheduled: ScheduledStateRef? = null, + val shell: InvocationOrigin.Shell? = null + ) + + private fun InvocationOrigin.toOrigin(): Origin { + return when (this) { + is InvocationOrigin.RPC -> Origin(rpc = actor.id.value) + is InvocationOrigin.Peer -> Origin(peer = party) + is InvocationOrigin.Service -> Origin(service = serviceClassName) + is InvocationOrigin.Scheduled -> Origin(scheduled = scheduledState) + is InvocationOrigin.Shell -> Origin(shell = this) + } + } + + @Suppress("unused") + private class CheckpointJson( + val id: UUID, + val flowLogicClass: Class>, + val flowLogic: FlowLogic<*>, + val flowCallStack: List, + val suspendedOn: SuspendedOn?, + val origin: Origin, + val ourIdentity: Party, + val activeSessions: List, + val errored: ErrorState.Errored? + ) + + @Suppress("unused") + @JsonInclude(Include.NON_NULL) + private class SuspendedOn( + val send: List? = null, + val receive: NonEmptySet? = null, + val sendAndReceive: List? = null, + val waitForLedgerCommit: SecureHash? = null, + val waitForStateConsumption: Set? = null, + val getFlowInfo: NonEmptySet? = null, + val sleepTill: Instant? = null, + val waitForSessionConfirmations: FlowIORequest.WaitForSessionConfirmations? = null, + val customOperation: FlowIORequest.ExecuteAsyncOperation<*>? = null, + val forceCheckpoint: FlowIORequest.ForceCheckpoint? = null + ) + + @Suppress("unused") + private class SendJson(val session: FlowSession, val sentPayloadType: Class<*>, val sentPayload: Any) + + private fun FlowIORequest<*>.toSuspendedOn(): SuspendedOn { + fun Map>.toJson(): List { + return map { + val payload = it.value.deserialize() + SendJson(it.key, payload.javaClass, payload) + } + } + + return when (this) { + is FlowIORequest.Send -> SuspendedOn(send = sessionToMessage.toJson()) + is FlowIORequest.Receive -> SuspendedOn(receive = sessions) + is FlowIORequest.SendAndReceive -> SuspendedOn(sendAndReceive = sessionToMessage.toJson()) + is FlowIORequest.WaitForLedgerCommit -> SuspendedOn(waitForLedgerCommit = hash) + is FlowIORequest.GetFlowInfo -> SuspendedOn(getFlowInfo = sessions) + is FlowIORequest.Sleep -> SuspendedOn(sleepTill = wakeUpAfter) + is FlowIORequest.WaitForSessionConfirmations -> SuspendedOn(waitForSessionConfirmations = this) + is FlowIORequest.ForceCheckpoint -> SuspendedOn(forceCheckpoint = this) + is FlowIORequest.ExecuteAsyncOperation -> { + when (operation) { + is WaitForStateConsumption -> SuspendedOn(waitForStateConsumption = (operation as WaitForStateConsumption).stateRefs) + else -> SuspendedOn(customOperation = this) + } + } + } + } + + @Suppress("unused") + private class ActiveSession( + val peer: Party, + val ourSessionId: SessionId, + val receivedMessages: List, + val errors: List, + val peerFlowInfo: FlowInfo, + val peerSessionId: SessionId? + ) + + private fun SessionState.toActiveSession(sessionId: SessionId): ActiveSession? { + return if (this is SessionState.Initiated) { + val peerSessionId = (initiatedState as? InitiatedSessionState.Live)?.peerSinkSessionId + ActiveSession(peerParty, sessionId, receivedMessages, errors, peerFlowInfo, peerSessionId) + } else { + null + } + } + + @Suppress("unused") + private interface SessionIdMixin { + @get:JsonValue + val toLong: Long + } + + @JsonAutoDetect(getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY) + private interface FlowLogicMixin + + private object CheckpointDumperBeanModifier : BeanSerializerModifier() { + override fun changeProperties(config: SerializationConfig, + beanDesc: BeanDescription, + beanProperties: MutableList): MutableList { + // Remove references to any node singletons + beanProperties.removeIf { it.type.isTypeOrSubTypeOf(SerializeAsToken::class.java) } + if (FlowLogic::class.java.isAssignableFrom(beanDesc.beanClass)) { + beanProperties.removeIf { + it.type.isTypeOrSubTypeOf(ProgressTracker::class.java) || it.name == "_stateMachine" || it.name == "deprecatedPartySessionMap" + } + } + return beanProperties + } + } + + private object FlowSessionImplSerializer : JsonSerializer() { + override fun serialize(value: FlowSessionImpl, gen: JsonGenerator, serializers: SerializerProvider) { + gen.jsonObject { + writeObjectField("peer", value.counterparty) + writeObjectField("ourSessionId", value.sourceSessionId) + } + } + override fun handledType(): Class = FlowSessionImpl::class.java + } + + private object AttachmentSerializer : JsonSerializer() { + override fun serialize(value: Attachment, gen: JsonGenerator, serializers: SerializerProvider) = gen.writeObject(value.id) + override fun handledType(): Class = Attachment::class.java + } + + private object MapSerializer : JsonSerializer>() { + override fun serialize(map: Map, gen: JsonGenerator, serializers: SerializerProvider) { + gen.writeStartArray(map.size) + map.forEach { key, value -> + gen.jsonObject { + writeObjectField("key", key) + writeObjectField("value", value) + } + } + gen.writeEndArray() + } + override fun handledType(): Class> = uncheckedCast(Map::class.java) + } +}