ENT-3496: dumpCheckpoints RPC

Dumps all the node's checkpoints as JSON into a single zip file in the node's directory. The output contains:

* All the fields for the top-level flow
* The current sub-flow call stack, along with the current progress tracker step for each sub-flow
* The event that suspended the flow, which if it's a send or sendAndReceive will show the payload that was sent
* Low level information on the active sessions with other peers
This commit is contained in:
Shams Asari 2019-05-24 17:21:27 +01:00
parent aa75157273
commit 5619debf18
5 changed files with 341 additions and 21 deletions

View File

@ -3,7 +3,7 @@
package net.corda.client.jackson.internal
import com.fasterxml.jackson.annotation.*
import com.fasterxml.jackson.annotation.JsonCreator.Mode.*
import com.fasterxml.jackson.annotation.JsonCreator.Mode.DISABLED
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonParseException
@ -34,6 +34,7 @@ import net.corda.core.internal.DigitalSignatureWithCert
import net.corda.core.internal.createComponentGroups
import net.corda.core.internal.kotlinObjectInstance
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
@ -43,7 +44,8 @@ import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.parseAsHex
import net.corda.core.utilities.toHexString
import net.corda.serialization.internal.AllWhitelist
import net.corda.serialization.internal.amqp.*
import net.corda.serialization.internal.amqp.SerializerFactoryBuilder
import net.corda.serialization.internal.amqp.hasCordaSerializable
import java.math.BigDecimal
import java.security.PublicKey
import java.security.cert.CertPath
@ -97,7 +99,7 @@ private class CordaSerializableBeanSerializerModifier : BeanSerializerModifier()
beanDesc: BeanDescription,
beanProperties: MutableList<BeanPropertyWriter>): MutableList<BeanPropertyWriter> {
val beanClass = beanDesc.beanClass
if (hasCordaSerializable(beanClass) && beanClass.kotlinObjectInstance == null) {
if (hasCordaSerializable(beanClass) && beanClass.kotlinObjectInstance == null && !SerializeAsToken::class.java.isAssignableFrom(beanClass)) {
val typeInformation = serializerFactory.getTypeInformation(beanClass)
val properties = typeInformation.propertiesOrEmptyMap
val amqpProperties = properties.mapNotNull { (name, property) ->

View File

@ -109,6 +109,9 @@ interface CordaRPCOps : RPCOps {
@RPCReturnsObservables
fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate>
/** Dump all the current flow checkpoints as JSON into a zip file in the node's directory. */
fun dumpCheckpoints()
/**
* Returns a snapshot of vault states for a given query criteria (and optional order and paging specification)
*

View File

@ -62,6 +62,7 @@ import net.corda.node.services.network.NetworkMapUpdater
import net.corda.node.services.network.NodeInfoWatcher
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.persistence.*
import net.corda.node.services.rpc.CheckpointDumper
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.*
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
@ -257,16 +258,23 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
}
/** The implementation of the [CordaRPCOps] interface used by this node. */
open fun makeRPCOps(cordappLoader: CordappLoader): CordaRPCOps {
val ops: CordaRPCOps = CordaRPCOpsImpl(services, smm, flowStarter) { shutdownExecutor.submit { stop() } }.also { it.closeOnStop() }
open fun makeRPCOps(cordappLoader: CordappLoader, checkpointDumper: CheckpointDumper): CordaRPCOps {
val ops: CordaRPCOps = CordaRPCOpsImpl(
services,
smm,
flowStarter,
checkpointDumper
) {
shutdownExecutor.submit(::stop)
}.also { it.closeOnStop() }
val proxies = mutableListOf<(CordaRPCOps) -> CordaRPCOps>()
// Mind that order is relevant here.
proxies += ::AuthenticatedRpcOpsProxy
if (!configuration.devMode) {
proxies += { it -> ExceptionMaskingRpcOpsProxy(it, true) }
proxies += { ExceptionMaskingRpcOpsProxy(it, true) }
}
proxies += { it -> ExceptionSerialisingRpcOpsProxy(it, configuration.devMode) }
proxies += { it -> ThreadContextAdjustingRpcOpsProxy(it, cordappLoader.appClassLoader) }
proxies += { ExceptionSerialisingRpcOpsProxy(it, configuration.devMode) }
proxies += { ThreadContextAdjustingRpcOpsProxy(it, cordappLoader.appClassLoader) }
return proxies.fold(ops) { delegate, decorate -> decorate(delegate) }
}
@ -327,7 +335,8 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
installCoreFlows()
registerCordappFlows()
services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
val rpcOps = makeRPCOps(cordappLoader)
val checkpointDumper = CheckpointDumper(checkpointStorage, database, services)
val rpcOps = makeRPCOps(cordappLoader, checkpointDumper)
startShell()
networkMapClient?.start(trustRoot)
@ -388,7 +397,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
tokenizableServices = null
verifyCheckpointsCompatible(frozenTokenizableServices)
checkpointDumper.start(frozenTokenizableServices)
smm.start(frozenTokenizableServices)
// Shut down the SMM so no Fibers are scheduled.
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }

View File

@ -31,6 +31,7 @@ import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.rpc.CheckpointDumper
import net.corda.node.services.rpc.context
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.nodeapi.exceptions.NonRpcFlowException
@ -51,6 +52,7 @@ internal class CordaRPCOpsImpl(
private val services: ServiceHubInternal,
private val smm: StateMachineManager,
private val flowStarter: FlowStarter,
private val checkpointDumper: CheckpointDumper,
private val shutdownNode: () -> Unit
) : CordaRPCOps, AutoCloseable {
@ -130,6 +132,8 @@ internal class CordaRPCOpsImpl(
return services.validatedTransactions.track()
}
override fun dumpCheckpoints() = checkpointDumper.dump()
override fun stateMachinesSnapshot(): List<StateMachineInfo> {
val (snapshot, updates) = stateMachinesFeed()
updates.notUsed()
@ -298,17 +302,21 @@ internal class CordaRPCOpsImpl(
override fun shutdown() = terminate(false)
override fun terminate(drainPendingFlows: Boolean) {
if (drainPendingFlows) {
logger.info("Waiting for pending flows to complete before shutting down.")
setFlowsDrainingModeEnabled(true)
drainingShutdownHook.set(pendingFlowsCount().updates.doOnNext {(completed, total) ->
logger.info("Pending flows progress before shutdown: $completed / $total.")
}.doOnCompleted { setPersistentDrainingModeProperty(false, false) }.doOnCompleted(::cancelDrainingShutdownHook).doOnCompleted { logger.info("No more pending flows to drain. Shutting down.") }.doOnCompleted(shutdownNode::invoke).subscribe({
// Nothing to do on each update here, only completion matters.
}, { error ->
logger.error("Error while waiting for pending flows to drain in preparation for shutdown. Cause was: ${error.message}", error)
}))
val subscription = pendingFlowsCount()
.updates
.doOnNext { (completed, total) -> logger.info("Pending flows progress before shutdown: $completed / $total.") }
.doOnCompleted { setPersistentDrainingModeProperty(enabled = false, propagateChange = false) }
.doOnCompleted(::cancelDrainingShutdownHook)
.doOnCompleted { logger.info("No more pending flows to drain. Shutting down.") }
.doOnCompleted(shutdownNode::invoke)
.subscribe(
{ }, // Nothing to do on each update here, only completion matters.
{ error -> logger.error("Error while waiting for pending flows to drain in preparation for shutdown. Cause was: ${error.message}", error) }
)
drainingShutdownHook.set(subscription)
} else {
shutdownNode.invoke()
}
@ -317,19 +325,19 @@ internal class CordaRPCOpsImpl(
override fun isWaitingForShutdown() = drainingShutdownHook.get() != null
override fun close() {
cancelDrainingShutdownHook()
}
private fun cancelDrainingShutdownHook() {
drainingShutdownHook.getAndSet(null)?.let {
it.unsubscribe()
logger.info("Cancelled draining shutdown hook.")
}
}
private fun setPersistentDrainingModeProperty(enabled: Boolean, propagateChange: Boolean) = services.nodeProperties.flowsDrainingMode.setEnabled(enabled, propagateChange)
private fun setPersistentDrainingModeProperty(enabled: Boolean, propagateChange: Boolean) {
services.nodeProperties.flowsDrainingMode.setEnabled(enabled, propagateChange)
}
private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo {
return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context)

View File

@ -0,0 +1,298 @@
package net.corda.node.services.rpc
import co.paralleluniverse.fibers.Stack
import com.fasterxml.jackson.annotation.JsonAutoDetect
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.annotation.JsonValue
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.util.DefaultIndenter
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter
import com.fasterxml.jackson.databind.*
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.databind.ser.BeanPropertyWriter
import com.fasterxml.jackson.databind.ser.BeanSerializerModifier
import net.corda.client.jackson.JacksonSupport
import net.corda.client.jackson.internal.jsonObject
import net.corda.core.context.InvocationOrigin
import net.corda.core.contracts.Attachment
import net.corda.core.contracts.ScheduledStateRef
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.internal.CheckpointSerializationContext
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
import net.corda.core.serialization.internal.checkpointDeserialize
import net.corda.core.utilities.NonEmptySet
import net.corda.core.utilities.ProgressTracker
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.statemachine.*
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
import net.corda.serialization.internal.withTokenContext
import java.time.Instant
import java.time.LocalTime
import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter
import java.util.*
import java.util.zip.ZipEntry
import java.util.zip.ZipOutputStream
class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private val database: CordaPersistence, private val serviceHub: ServiceHubInternal) {
companion object {
private val TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss").withZone(UTC)
}
private lateinit var checkpointSerializationContext: CheckpointSerializationContext
private lateinit var writer: ObjectWriter
fun start(tokenizableServices: List<Any>) {
checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext(
CheckpointSerializeAsTokenContextImpl(
tokenizableServices,
CheckpointSerializationDefaults.CHECKPOINT_SERIALIZER,
CheckpointSerializationDefaults.CHECKPOINT_CONTEXT,
serviceHub
)
)
val mapper = JacksonSupport.createNonRpcMapper()
mapper.registerModule(SimpleModule().apply {
setSerializerModifier(CheckpointDumperBeanModifier)
addSerializer(FlowSessionImplSerializer)
addSerializer(MapSerializer)
addSerializer(AttachmentSerializer)
setMixInAnnotation(FlowLogic::class.java, FlowLogicMixin::class.java)
setMixInAnnotation(SessionId::class.java, SessionIdMixin::class.java)
})
val prettyPrinter = DefaultPrettyPrinter().apply {
indentArraysWith(DefaultIndenter.SYSTEM_LINEFEED_INSTANCE)
}
writer = mapper.writer(prettyPrinter)
}
fun dump() {
val file = serviceHub.configuration.baseDirectory / "checkpoints_dump-${TIME_FORMATTER.format(serviceHub.clock.instant())}.zip"
database.transaction {
checkpointStorage.getAllCheckpoints().use { stream ->
ZipOutputStream(file.outputStream()).use { zip ->
stream.forEach { (runId, serialisedCheckpoint) ->
val checkpoint = serialisedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext)
val json = checkpoint.toJson(runId.uuid)
val jsonBytes = writer.writeValueAsBytes(json)
zip.putNextEntry(ZipEntry("${runId.uuid}.json"))
zip.write(jsonBytes)
zip.closeEntry()
}
}
}
}
}
private val formatter = DateTimeFormatter.ofPattern("HHmmss.SSS")
fun dump(id: UUID, checkpoint: Checkpoint) {
val info = checkpoint.toJson(id)
val file = serviceHub.configuration.baseDirectory / "$id-${formatter.format(LocalTime.now(serviceHub.clock))}.json"
writer.writeValue(file.toFile(), info)
}
private fun Checkpoint.toJson(id: UUID): CheckpointJson {
val (fiber, flowLogic) = when (flowState) {
is FlowState.Unstarted -> {
null to flowState.frozenFlowLogic.checkpointDeserialize(context = checkpointSerializationContext)
}
is FlowState.Started -> {
val fiber = flowState.frozenFiber.checkpointDeserialize(context = checkpointSerializationContext)
fiber to fiber.logic
}
}
val flowCallStack = if (fiber != null) {
// Poke into Quasar's stack and find the object references to the sub-flows so that we can correctly get the current progress
// step for each sub-call.
val stackObjects = fiber.declaredField<Stack>("stack").value.declaredField<Array<*>>("dataObject").value
subFlowStack.map { subFlow ->
val subFlowLogic = stackObjects.find(subFlow.flowClass::isInstance) as? FlowLogic<*>
val currentStep = subFlowLogic?.progressTracker?.currentStep
FlowCall(subFlow.flowClass, if (currentStep == ProgressTracker.UNSTARTED) null else currentStep?.label)
}.reversed()
} else {
emptyList()
}
return CheckpointJson(
id,
flowLogic.javaClass,
flowLogic,
flowCallStack,
(flowState as? FlowState.Started)?.flowIORequest?.toSuspendedOn(),
invocationContext.origin.toOrigin(),
ourIdentity,
sessions.mapNotNull { it.value.toActiveSession(it.key) },
errorState as? ErrorState.Errored
)
}
@Suppress("unused")
private class FlowCall(val flowClass: Class<*>, val progressStep: String?)
@Suppress("unused")
@JsonInclude(Include.NON_NULL)
private class Origin(
val rpc: String? = null,
val peer: CordaX500Name? = null,
val service: String? = null,
val scheduled: ScheduledStateRef? = null,
val shell: InvocationOrigin.Shell? = null
)
private fun InvocationOrigin.toOrigin(): Origin {
return when (this) {
is InvocationOrigin.RPC -> Origin(rpc = actor.id.value)
is InvocationOrigin.Peer -> Origin(peer = party)
is InvocationOrigin.Service -> Origin(service = serviceClassName)
is InvocationOrigin.Scheduled -> Origin(scheduled = scheduledState)
is InvocationOrigin.Shell -> Origin(shell = this)
}
}
@Suppress("unused")
private class CheckpointJson(
val id: UUID,
val flowLogicClass: Class<FlowLogic<*>>,
val flowLogic: FlowLogic<*>,
val flowCallStack: List<FlowCall>,
val suspendedOn: SuspendedOn?,
val origin: Origin,
val ourIdentity: Party,
val activeSessions: List<ActiveSession>,
val errored: ErrorState.Errored?
)
@Suppress("unused")
@JsonInclude(Include.NON_NULL)
private class SuspendedOn(
val send: List<SendJson>? = null,
val receive: NonEmptySet<FlowSession>? = null,
val sendAndReceive: List<SendJson>? = null,
val waitForLedgerCommit: SecureHash? = null,
val waitForStateConsumption: Set<StateRef>? = null,
val getFlowInfo: NonEmptySet<FlowSession>? = null,
val sleepTill: Instant? = null,
val waitForSessionConfirmations: FlowIORequest.WaitForSessionConfirmations? = null,
val customOperation: FlowIORequest.ExecuteAsyncOperation<*>? = null,
val forceCheckpoint: FlowIORequest.ForceCheckpoint? = null
)
@Suppress("unused")
private class SendJson(val session: FlowSession, val sentPayloadType: Class<*>, val sentPayload: Any)
private fun FlowIORequest<*>.toSuspendedOn(): SuspendedOn {
fun Map<FlowSession, SerializedBytes<Any>>.toJson(): List<SendJson> {
return map {
val payload = it.value.deserialize()
SendJson(it.key, payload.javaClass, payload)
}
}
return when (this) {
is FlowIORequest.Send -> SuspendedOn(send = sessionToMessage.toJson())
is FlowIORequest.Receive -> SuspendedOn(receive = sessions)
is FlowIORequest.SendAndReceive -> SuspendedOn(sendAndReceive = sessionToMessage.toJson())
is FlowIORequest.WaitForLedgerCommit -> SuspendedOn(waitForLedgerCommit = hash)
is FlowIORequest.GetFlowInfo -> SuspendedOn(getFlowInfo = sessions)
is FlowIORequest.Sleep -> SuspendedOn(sleepTill = wakeUpAfter)
is FlowIORequest.WaitForSessionConfirmations -> SuspendedOn(waitForSessionConfirmations = this)
is FlowIORequest.ForceCheckpoint -> SuspendedOn(forceCheckpoint = this)
is FlowIORequest.ExecuteAsyncOperation -> {
when (operation) {
is WaitForStateConsumption -> SuspendedOn(waitForStateConsumption = (operation as WaitForStateConsumption).stateRefs)
else -> SuspendedOn(customOperation = this)
}
}
}
}
@Suppress("unused")
private class ActiveSession(
val peer: Party,
val ourSessionId: SessionId,
val receivedMessages: List<DataSessionMessage>,
val errors: List<FlowError>,
val peerFlowInfo: FlowInfo,
val peerSessionId: SessionId?
)
private fun SessionState.toActiveSession(sessionId: SessionId): ActiveSession? {
return if (this is SessionState.Initiated) {
val peerSessionId = (initiatedState as? InitiatedSessionState.Live)?.peerSinkSessionId
ActiveSession(peerParty, sessionId, receivedMessages, errors, peerFlowInfo, peerSessionId)
} else {
null
}
}
@Suppress("unused")
private interface SessionIdMixin {
@get:JsonValue
val toLong: Long
}
@JsonAutoDetect(getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
private interface FlowLogicMixin
private object CheckpointDumperBeanModifier : BeanSerializerModifier() {
override fun changeProperties(config: SerializationConfig,
beanDesc: BeanDescription,
beanProperties: MutableList<BeanPropertyWriter>): MutableList<BeanPropertyWriter> {
// Remove references to any node singletons
beanProperties.removeIf { it.type.isTypeOrSubTypeOf(SerializeAsToken::class.java) }
if (FlowLogic::class.java.isAssignableFrom(beanDesc.beanClass)) {
beanProperties.removeIf {
it.type.isTypeOrSubTypeOf(ProgressTracker::class.java) || it.name == "_stateMachine" || it.name == "deprecatedPartySessionMap"
}
}
return beanProperties
}
}
private object FlowSessionImplSerializer : JsonSerializer<FlowSessionImpl>() {
override fun serialize(value: FlowSessionImpl, gen: JsonGenerator, serializers: SerializerProvider) {
gen.jsonObject {
writeObjectField("peer", value.counterparty)
writeObjectField("ourSessionId", value.sourceSessionId)
}
}
override fun handledType(): Class<FlowSessionImpl> = FlowSessionImpl::class.java
}
private object AttachmentSerializer : JsonSerializer<Attachment>() {
override fun serialize(value: Attachment, gen: JsonGenerator, serializers: SerializerProvider) = gen.writeObject(value.id)
override fun handledType(): Class<Attachment> = Attachment::class.java
}
private object MapSerializer : JsonSerializer<Map<Any, Any>>() {
override fun serialize(map: Map<Any, Any>, gen: JsonGenerator, serializers: SerializerProvider) {
gen.writeStartArray(map.size)
map.forEach { key, value ->
gen.jsonObject {
writeObjectField("key", key)
writeObjectField("value", value)
}
}
gen.writeEndArray()
}
override fun handledType(): Class<Map<Any, Any>> = uncheckedCast(Map::class.java)
}
}