mirror of
https://github.com/corda/corda.git
synced 2024-12-18 20:47:57 +00:00
CORDA-3094 improvements to checkpoint dumper (#5324)
- Handle errors in Jackson and checkpoint deserialisation. A file notifying the user that the checkpoint dump failed is created when errors occur. - Handle message deserialisation errors. A string placeholder is used if an error occurs. - Add more information about subflows (include their `FlowLogic`) - Increase clarity in checkpoint output field names * CORDA-3904 Add `flowCallStackSummary` Add `flowCallStackSummary` to the output which contains the same content as `flowCallStack` minus each subflow's `FlowLogic`. The `FlowLogic` contains a ton of info which is normally repeated in each subflow. Adding the summary gives an overview of the steps the flow executed and which step it is currently on. The `suspendedOn` field is put underneath the summary and the original call stack is moved below the suspended info. This puts the most useful information towards the top of the json file.
This commit is contained in:
parent
72ac722451
commit
a477d59c15
@ -1,17 +1,151 @@
|
||||
Checkpoint Agent
|
||||
Checkpoint Tooling
|
||||
================
|
||||
|
||||
Firstly, please ensure you understand the mechanics and principles of Corda Flows by reading :doc:`key-concepts-flows` and :doc:`flow-state-machines`.
|
||||
We also recommend you understand the purpose and behaviour of the :doc:`node-flow-hospital` in relation to *checkpoints* and flow recovery.
|
||||
This page contains information about checkpoint tooling. These tools can be used to debug the causes of stuck flows.
|
||||
|
||||
Before reading this page, please ensure you understand the mechanics and principles of Corda Flows by reading :doc:`key-concepts-flows` and :doc:`flow-state-machines`.
|
||||
It is also recommended that you understand the purpose and behaviour of the :doc:`node-flow-hospital` in relation to *checkpoints* and flow recovery.
|
||||
An advanced explanation of :ref:`*checkpoints* <flow_internals_checkpoints_ref>` within the flow state machine can be found here: :doc:`contributing-flow-internals`.
|
||||
|
||||
As a recap,
|
||||
.. note:: As a recap,
|
||||
|
||||
"A flow *checkpoint* is a serialised snapshot of the flow's stack frames and any objects reachable from the stack. Checkpoints are saved to
|
||||
the database automatically when a flow suspends or resumes, which typically happens when sending or receiving messages. A flow may be replayed
|
||||
from the last checkpoint if the node restarts. Automatic checkpointing is an unusual feature of Corda and significantly helps developers write
|
||||
reliable code that can survive node restarts and crashes. It also assists with scaling up, as flows that are waiting for a response can be flushed
|
||||
from memory."
|
||||
A flow *checkpoint* is a serialised snapshot of the flow's stack frames and any objects reachable from the stack. Checkpoints are saved to
|
||||
the database automatically when a flow suspends or resumes, which typically happens when sending or receiving messages. A flow may be replayed
|
||||
from the last checkpoint if the node restarts. Automatic checkpointing is an unusual feature of Corda and significantly helps developers write
|
||||
reliable code that can survive node restarts and crashes. It also assists with scaling up, as flows that are waiting for a response can be flushed
|
||||
from memory.
|
||||
|
||||
The checkpoint tools available are:
|
||||
|
||||
- :ref:`Checkpoint dumper <checkpoint_dumper>`
|
||||
- :ref:`Checkpoint agent <checkpoint_agent>`
|
||||
|
||||
.. _checkpoint_dumper:
|
||||
|
||||
Checkpoint dumper
|
||||
~~~~~~~~~~~~~~~~~
|
||||
|
||||
The checkpoint dumper outputs information about flows running on a node. This is useful for diagnosing the causes of stuck flows. Using the generated output,
|
||||
corrective actions can be taken to resolve the issues flows are facing. One possible solution, is ending a flow using the ``flow kill`` command.
|
||||
|
||||
.. warning:: Deleting checkpoints manually or via ``flow kill``/```killFlow`` can lead to an inconsistent ledger among transacting parties. Great care
|
||||
and coordination with a flow's counterparties must be taken to ensure that a initiating flow and flows responding to it are correctly
|
||||
removed. This experience will be improved in the future. Making it easier to kill flows while notifying their counterparties.
|
||||
|
||||
To retrieve this information, execute ``run dumpCheckpoints`` in the node's shell. The command creates a zip and generates a JSON file for each flow.
|
||||
|
||||
- Each file follows the naming format ``<flow name>-<flow id>.json`` (for example, ``CashIssueAndPaymentFlow-90613d6f-be78-41bd-98e1-33a756c28808.json``).
|
||||
- The zip is placed into the ``logs`` directory of the node and is named ``checkpoints_dump-<date and time>.zip`` (for example, ``checkpoints_dump-20190812-153847``).
|
||||
|
||||
Below are some of the more important fields included in the output:
|
||||
|
||||
- ``flowId``: The id of the flow
|
||||
- ``topLevelFlowClass``: The name of the original flow that was invoked (by RPC or a service)
|
||||
- ``topLevelFlowLogic``: Detailed view of the top level flow
|
||||
- ``flowCallStackSummary``: A summarised list of the current stack of sub flows along with any progress tracker information
|
||||
- ``suspendedOn``: The command that the flow is suspended on (e.g. ``SuspendAndReceive``) which includes the ``suspendedTimestamp``
|
||||
- ``flowCallStack`` A detailed view of the of the current stack of sub flows
|
||||
|
||||
.. _checkpoint_dumper_sample_output:
|
||||
|
||||
Sample output
|
||||
-------------
|
||||
|
||||
Below is an example of the JSON output:
|
||||
|
||||
.. sourcecode:: json
|
||||
|
||||
{
|
||||
"flowId" : "90613d6f-be78-41bd-98e1-33a756c28808",
|
||||
"topLevelFlowClass" : "net.corda.finance.flows.CashIssueAndPaymentFlow",
|
||||
"topLevelFlowLogic" : {
|
||||
"amount" : "10.00 USD",
|
||||
"issueRef" : "MTIzNA==",
|
||||
"recipient" : "O=BigCorporation, L=New York, C=US",
|
||||
"anonymous" : true,
|
||||
"notary" : "O=Notary, L=London, C=GB"
|
||||
},
|
||||
"flowCallStackSummary" : [
|
||||
{
|
||||
"flowClass" : "net.corda.finance.flows.CashIssueAndPaymentFlow",
|
||||
"progressStep" : "Paying recipient"
|
||||
},
|
||||
{
|
||||
"flowClass" : "net.corda.finance.flows.CashPaymentFlow",
|
||||
"progressStep" : "Generating anonymous identities"
|
||||
},
|
||||
{
|
||||
"flowClass" : "net.corda.confidential.SwapIdentitiesFlow",
|
||||
"progressStep" : "Awaiting counterparty's anonymous identity"
|
||||
}
|
||||
],
|
||||
"suspendedOn" : {
|
||||
"sendAndReceive" : [
|
||||
{
|
||||
"session" : {
|
||||
"peer" : "O=BigCorporation, L=New York, C=US",
|
||||
"ourSessionId" : -5024519991106064492
|
||||
},
|
||||
"sentPayloadType" : "net.corda.confidential.SwapIdentitiesFlow$IdentityWithSignature",
|
||||
"sentPayload" : {
|
||||
"identity" : {
|
||||
"class" : "net.corda.core.identity.PartyAndCertificate",
|
||||
"deserialized" : "O=BankOfCorda, L=London, C=GB"
|
||||
},
|
||||
"signature" : "M5DN180OeE4M8jJ3mFohjgeqNYOWXzR6a2PIclJaWyit2uLnmJcZatySoSC12b6e4rQYKIICNFUXRzJnoQTQCg=="
|
||||
}
|
||||
}
|
||||
],
|
||||
"suspendedTimestamp" : "2019-08-12T15:38:39Z",
|
||||
"secondsSpentWaiting" : 7
|
||||
},
|
||||
"flowCallStack" : [
|
||||
{
|
||||
"flowClass" : "net.corda.finance.flows.CashIssueAndPaymentFlow",
|
||||
"progressStep" : "Paying recipient",
|
||||
"flowLogic" : {
|
||||
"amount" : "10.00 USD",
|
||||
"issueRef" : "MTIzNA==",
|
||||
"recipient" : "O=BigCorporation, L=New York, C=US",
|
||||
"anonymous" : true,
|
||||
"notary" : "O=Notary, L=London, C=GB"
|
||||
}
|
||||
},
|
||||
{
|
||||
"flowClass" : "net.corda.finance.flows.CashPaymentFlow",
|
||||
"progressStep" : "Generating anonymous identities",
|
||||
"flowLogic" : {
|
||||
"amount" : "10.00 USD",
|
||||
"recipient" : "O=BigCorporation, L=New York, C=US",
|
||||
"anonymous" : true,
|
||||
"issuerConstraint" : [ ],
|
||||
"notary" : "O=Notary, L=London, C=GB"
|
||||
}
|
||||
},
|
||||
{
|
||||
"flowClass" : "net.corda.confidential.SwapIdentitiesFlow",
|
||||
"progressStep" : "Awaiting counterparty's anonymous identity",
|
||||
"flowLogic" : {
|
||||
"otherSideSession" : {
|
||||
"peer" : "O=BigCorporation, L=New York, C=US",
|
||||
"ourSessionId" : -5024519991106064492
|
||||
},
|
||||
"otherParty" : null
|
||||
}
|
||||
}
|
||||
],
|
||||
"origin" : {
|
||||
"rpc" : "bankUser"
|
||||
},
|
||||
"ourIdentity" : "O=BankOfCorda, L=London, C=GB",
|
||||
"activeSessions" : [ ],
|
||||
"errored" : null
|
||||
}
|
||||
|
||||
.. _checkpoint_agent:
|
||||
|
||||
Checkpoint Agent
|
||||
~~~~~~~~~~~~~~~~
|
||||
|
||||
The Checkpoint Agent is a very low level diagnostics tool that can be used to output the type, size and content of flow *checkpoints* at node runtime.
|
||||
It is primarily targeted at users developing and testing code that may exhibit flow mis-behaviour (preferably before going into production).
|
||||
@ -39,7 +173,7 @@ The agent can be customised with a number of optional parameters described below
|
||||
``-Dcapsule.jvm.args=-javaagent:checkpoint-agent.jar[=arg=value,...]``
|
||||
|
||||
Configuration
|
||||
~~~~~~~~~~~~~
|
||||
-------------
|
||||
|
||||
The checkpoint agent can be started with the following optional parameters:
|
||||
|
||||
@ -63,9 +197,9 @@ These arguments are passed to the JVM along with the agent specification. For ex
|
||||
.. note:: Arguments may be passed into the agent in any order and should **not** contain spaces between them.
|
||||
|
||||
Checkpoint Dump support
|
||||
~~~~~~~~~~~~~~~~~~~~~~~
|
||||
-----------------------
|
||||
|
||||
When used in combination with the ``dumpCheckpoints`` shell command (see :ref:`Upgrading CorDapps <upgrading-cordapps-flow-drains>`.),
|
||||
When used in combination with the ``dumpCheckpoints`` shell command (see :ref:`Checkpoint Dumper <checkpoint_dumper>`),
|
||||
the checkpoint agent will automatically output additional diagnostic information for all checkpoints dumped by the aforementioned tool.
|
||||
|
||||
You should therefore see two different output files upon invoking the checkpoint dumper command:
|
||||
@ -88,13 +222,13 @@ and use the ``dumpCheckpoints`` shell command to trigger diagnostics collection.
|
||||
determine whether the VM has been instrumented or not at runtime.
|
||||
|
||||
Logging configuration
|
||||
~~~~~~~~~~~~~~~~~~~~~
|
||||
---------------------
|
||||
|
||||
The agent will log output to a log4j2 configured logger.
|
||||
|
||||
It is recommended to configure a separate log file to capture this information by configuring an appender as follows:
|
||||
|
||||
.. sourcecode:: none
|
||||
.. sourcecode:: xml
|
||||
|
||||
<Logger name="CheckpointAgent" level="info" additivity="false">
|
||||
<AppenderRef ref="Checkpoint-Agent-RollingFile-Appender"/>
|
||||
@ -104,7 +238,7 @@ It is recommended to configure a separate log file to capture this information b
|
||||
|
||||
In this instance we are specifying a Rolling File appender with archival rotation as follows:
|
||||
|
||||
.. sourcecode:: none
|
||||
.. sourcecode:: xml
|
||||
|
||||
<RollingFile name="Checkpoint-Agent-RollingFile-Appender"
|
||||
fileName="${log-path}/checkpoints_agent-${date:yyyyMMdd-HHmmss}.log"
|
||||
@ -137,7 +271,7 @@ The *log4j2.xml* containing the above configuration must now be be passed to the
|
||||
-Dlog4j.configurationFile=<PATH>/log4j2.xml
|
||||
|
||||
Sample output
|
||||
~~~~~~~~~~~~~
|
||||
-------------
|
||||
|
||||
Using the *log4j2* configuration described above, the following output is generated to a file called ``checkpoints_agent-<DATE>.log`` under
|
||||
the Corda node ``logs`` directory for a single flow execution (in this case):
|
||||
@ -226,63 +360,98 @@ And two additional files will appear in the nodes logs directory:
|
||||
* ``<NODE_BASE>\logs\checkpoints_agent-20190711-185424.log``
|
||||
|
||||
3. Unzip the ``<NODE_BASE>\logs\checkpoints_dump-<date>.zip`` file, and you should see a file with a matching flow id as above:
|
||||
**CashIssueAndPaymentFlow-90613d6f-be78-41bd-98e1-33a756c28808.jsos**
|
||||
**CashIssueAndPaymentFlow-90613d6f-be78-41bd-98e1-33a756c28808.json**
|
||||
|
||||
It contents will contain the following diagnostics information:
|
||||
Its contents will contain the following diagnostics information:
|
||||
|
||||
.. sourcecode:: none
|
||||
.. sourcecode:: json
|
||||
|
||||
{
|
||||
"id" : "90613d6f-be78-41bd-98e1-33a756c28808",
|
||||
"flowLogicClass" : "net.corda.finance.flows.CashIssueAndPaymentFlow",
|
||||
"flowLogic" : {
|
||||
"amount" : "200.00 USD",
|
||||
"issueRef" : "AQ==",
|
||||
"recipient" : "O=BigCorporation, L=New York, C=US",
|
||||
"anonymous" : true,
|
||||
"notary" : "O=Notary Service, L=Zurich, C=CH"
|
||||
},
|
||||
"flowCallStack" : [
|
||||
{
|
||||
"flowClass" : "net.corda.confidential.SwapIdentitiesFlow",
|
||||
"progressStep" : "Awaiting counterparty's anonymous identity"
|
||||
},
|
||||
{
|
||||
"flowClass" : "net.corda.finance.flows.CashPaymentFlow",
|
||||
"progressStep" : "Generating anonymous identities"
|
||||
},
|
||||
{
|
||||
"flowClass" : "net.corda.finance.flows.CashIssueAndPaymentFlow",
|
||||
"progressStep" : "Paying recipient"
|
||||
}
|
||||
],
|
||||
"suspendedOn" : {
|
||||
"sendAndReceive" : [
|
||||
{
|
||||
"session" : {
|
||||
"peer" : "O=BigCorporation, L=New York, C=US",
|
||||
"ourSessionId" : 1443438003030966253
|
||||
},
|
||||
"sentPayloadType" : "net.corda.confidential.SwapIdentitiesFlow$IdentityWithSignature",
|
||||
"sentPayload" : {
|
||||
"identity" : {
|
||||
"class" : "net.corda.core.identity.PartyAndCertificate",
|
||||
"deserialized" : "O=BankOfCorda, L=London, C=GB"
|
||||
},
|
||||
"signature" : "t+7hyUnQE08n3ST4krA/7fi1R8ItdrGvpeEbMFgTBDCHibMWiKo/NaTSVUdfwPmsEtl1PFx0MHz5rtRQ+XuEBg=="
|
||||
}
|
||||
}
|
||||
],
|
||||
"suspendedTimestamp" : "2019-07-10T14:44:58",
|
||||
"secondsSpentWaiting" : 98268
|
||||
},
|
||||
"origin" : {
|
||||
"rpc" : "bankUser"
|
||||
},
|
||||
"ourIdentity" : "O=BankOfCorda, L=London, C=GB",
|
||||
"activeSessions" : [ ],
|
||||
"errored" : null
|
||||
}
|
||||
{
|
||||
"flowId" : "90613d6f-be78-41bd-98e1-33a756c28808",
|
||||
"topLevelFlowClass" : "net.corda.finance.flows.CashIssueAndPaymentFlow",
|
||||
"topLevelFlowLogic" : {
|
||||
"amount" : "10.00 USD",
|
||||
"issueRef" : "MTIzNA==",
|
||||
"recipient" : "O=BigCorporation, L=New York, C=US",
|
||||
"anonymous" : true,
|
||||
"notary" : "O=Notary, L=London, C=GB"
|
||||
},
|
||||
"flowCallStackSummary" : [
|
||||
{
|
||||
"flowClass" : "net.corda.finance.flows.CashIssueAndPaymentFlow",
|
||||
"progressStep" : "Paying recipient"
|
||||
},
|
||||
{
|
||||
"flowClass" : "net.corda.finance.flows.CashPaymentFlow",
|
||||
"progressStep" : "Generating anonymous identities"
|
||||
},
|
||||
{
|
||||
"flowClass" : "net.corda.confidential.SwapIdentitiesFlow",
|
||||
"progressStep" : "Awaiting counterparty's anonymous identity"
|
||||
}
|
||||
],
|
||||
"suspendedOn" : {
|
||||
"sendAndReceive" : [
|
||||
{
|
||||
"session" : {
|
||||
"peer" : "O=BigCorporation, L=New York, C=US",
|
||||
"ourSessionId" : -5024519991106064492
|
||||
},
|
||||
"sentPayloadType" : "net.corda.confidential.SwapIdentitiesFlow$IdentityWithSignature",
|
||||
"sentPayload" : {
|
||||
"identity" : {
|
||||
"class" : "net.corda.core.identity.PartyAndCertificate",
|
||||
"deserialized" : "O=BankOfCorda, L=London, C=GB"
|
||||
},
|
||||
"signature" : "M5DN180OeE4M8jJ3mFohjgeqNYOWXzR6a2PIclJaWyit2uLnmJcZatySoSC12b6e4rQYKIICNFUXRzJnoQTQCg=="
|
||||
}
|
||||
}
|
||||
],
|
||||
"suspendedTimestamp" : "2019-08-12T15:38:39Z",
|
||||
"secondsSpentWaiting" : 7
|
||||
},
|
||||
"flowCallStack" : [
|
||||
{
|
||||
"flowClass" : "net.corda.finance.flows.CashIssueAndPaymentFlow",
|
||||
"progressStep" : "Paying recipient",
|
||||
"flowLogic" : {
|
||||
"amount" : "10.00 USD",
|
||||
"issueRef" : "MTIzNA==",
|
||||
"recipient" : "O=BigCorporation, L=New York, C=US",
|
||||
"anonymous" : true,
|
||||
"notary" : "O=Notary, L=London, C=GB"
|
||||
}
|
||||
},
|
||||
{
|
||||
"flowClass" : "net.corda.finance.flows.CashPaymentFlow",
|
||||
"progressStep" : "Generating anonymous identities",
|
||||
"flowLogic" : {
|
||||
"amount" : "10.00 USD",
|
||||
"recipient" : "O=BigCorporation, L=New York, C=US",
|
||||
"anonymous" : true,
|
||||
"issuerConstraint" : [ ],
|
||||
"notary" : "O=Notary, L=London, C=GB"
|
||||
}
|
||||
},
|
||||
{
|
||||
"flowClass" : "net.corda.confidential.SwapIdentitiesFlow",
|
||||
"progressStep" : "Awaiting counterparty's anonymous identity",
|
||||
"flowLogic" : {
|
||||
"otherSideSession" : {
|
||||
"peer" : "O=BigCorporation, L=New York, C=US",
|
||||
"ourSessionId" : -5024519991106064492
|
||||
},
|
||||
"otherParty" : null
|
||||
}
|
||||
}
|
||||
],
|
||||
"origin" : {
|
||||
"rpc" : "bankUser"
|
||||
},
|
||||
"ourIdentity" : "O=BankOfCorda, L=London, C=GB",
|
||||
"activeSessions" : [ ],
|
||||
"errored" : null
|
||||
}
|
||||
|
||||
4. View the contents of the node agent diagnostics file:
|
||||
|
||||
@ -364,21 +533,27 @@ See also :ref:`Flow draining mode <draining-mode>`.
|
||||
* contacting other participants in the network where their nodes are not responding to an initiated flow.
|
||||
The checkpoint dump gives good diagnostics on the reason a flow may be suspended (including the destination peer participant node that is not responding):
|
||||
|
||||
.. sourcecode:: none
|
||||
.. sourcecode:: json
|
||||
|
||||
"suspendedOn" : {
|
||||
"sendAndReceive" : [
|
||||
{
|
||||
"session" : {
|
||||
"peer" : "O=BigCorporation, L=New York, C=US",
|
||||
"ourSessionId" : 1443438003030966253
|
||||
},
|
||||
"sentPayloadType" : "net.corda.confidential.SwapIdentitiesFlow$IdentityWithSignature",
|
||||
"sentPayload" : {
|
||||
"identity" : {
|
||||
"class" : "net.corda.core.identity.PartyAndCertificate",
|
||||
"deserialized" : "O=BankOfCorda, L=London, C=GB"
|
||||
},
|
||||
"signature" : "t+7hyUnQE08n3ST4krA/7fi1R8ItdrGvpeEbMFgTBDCHibMWiKo/NaTSVUdfwPmsEtl1PFx0MHz5rtRQ+XuEBg=="
|
||||
}
|
||||
}
|
||||
{
|
||||
"suspendedOn" : {
|
||||
"sendAndReceive" : [
|
||||
{
|
||||
"session" : {
|
||||
"peer" : "O=BigCorporation, L=New York, C=US",
|
||||
"ourSessionId" : -5024519991106064492
|
||||
},
|
||||
"sentPayloadType" : "net.corda.confidential.SwapIdentitiesFlow$IdentityWithSignature",
|
||||
"sentPayload" : {
|
||||
"identity" : {
|
||||
"class" : "net.corda.core.identity.PartyAndCertificate",
|
||||
"deserialized" : "O=BankOfCorda, L=London, C=GB"
|
||||
},
|
||||
"signature" : "M5DN180OeE4M8jJ3mFohjgeqNYOWXzR6a2PIclJaWyit2uLnmJcZatySoSC12b6e4rQYKIICNFUXRzJnoQTQCg=="
|
||||
}
|
||||
}
|
||||
],
|
||||
"suspendedTimestamp" : "2019-08-12T15:38:39Z",
|
||||
"secondsSpentWaiting" : 7
|
||||
}
|
||||
}
|
@ -106,13 +106,23 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private
|
||||
checkpointStorage.getAllCheckpoints().use { stream ->
|
||||
ZipOutputStream(file.outputStream()).use { zip ->
|
||||
stream.forEach { (runId, serialisedCheckpoint) ->
|
||||
|
||||
if (isCheckpointAgentRunning)
|
||||
instrumentCheckpointAgent(runId)
|
||||
val checkpoint = serialisedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext)
|
||||
val json = checkpoint.toJson(runId.uuid, now)
|
||||
val jsonBytes = writer.writeValueAsBytes(json)
|
||||
zip.putNextEntry(ZipEntry("${json.flowLogicClass.simpleName}-${runId.uuid}.json"))
|
||||
zip.write(jsonBytes)
|
||||
|
||||
val (bytes, fileName) = try {
|
||||
val checkpoint =
|
||||
serialisedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext)
|
||||
val json = checkpoint.toJson(runId.uuid, now)
|
||||
val jsonBytes = writer.writeValueAsBytes(json)
|
||||
jsonBytes to "${json.topLevelFlowClass.simpleName}-${runId.uuid}.json"
|
||||
} catch (e: Exception) {
|
||||
log.info("Failed to deserialise checkpoint with flowId: ${runId.uuid}", e)
|
||||
val errorBytes = checkpointDeserializationErrorMessage(runId, e).toByteArray()
|
||||
errorBytes to "Undeserialisable-checkpoint-${runId.uuid}.json"
|
||||
}
|
||||
zip.putNextEntry(ZipEntry(fileName))
|
||||
zip.write(bytes)
|
||||
zip.closeEntry()
|
||||
}
|
||||
}
|
||||
@ -162,32 +172,74 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private
|
||||
val flowCallStack = if (fiber != null) {
|
||||
// Poke into Quasar's stack and find the object references to the sub-flows so that we can correctly get the current progress
|
||||
// step for each sub-call.
|
||||
val stackObjects = fiber.declaredField<Stack>("stack").value.declaredField<Array<*>>("dataObject").value
|
||||
subFlowStack.map { subFlow ->
|
||||
val subFlowLogic = stackObjects.find(subFlow.flowClass::isInstance) as? FlowLogic<*>
|
||||
val currentStep = subFlowLogic?.progressTracker?.currentStep
|
||||
FlowCall(subFlow.flowClass, if (currentStep == ProgressTracker.UNSTARTED) null else currentStep?.label)
|
||||
}.reversed()
|
||||
val stackObjects = fiber.getQuasarStack()
|
||||
subFlowStack.map { it.toJson(stackObjects) }
|
||||
} else {
|
||||
emptyList()
|
||||
}
|
||||
|
||||
return CheckpointJson(
|
||||
id,
|
||||
flowLogic.javaClass,
|
||||
flowLogic,
|
||||
flowCallStack,
|
||||
(flowState as? FlowState.Started)?.flowIORequest?.toSuspendedOn(suspendedTimestamp(), now),
|
||||
invocationContext.origin.toOrigin(),
|
||||
ourIdentity,
|
||||
sessions.mapNotNull { it.value.toActiveSession(it.key) },
|
||||
errorState as? ErrorState.Errored
|
||||
flowId = id,
|
||||
topLevelFlowClass = flowLogic.javaClass,
|
||||
topLevelFlowLogic = flowLogic,
|
||||
flowCallStackSummary = flowCallStack.toSummary(),
|
||||
flowCallStack = flowCallStack,
|
||||
suspendedOn = (flowState as? FlowState.Started)?.flowIORequest?.toSuspendedOn(
|
||||
suspendedTimestamp(),
|
||||
now
|
||||
),
|
||||
origin = invocationContext.origin.toOrigin(),
|
||||
ourIdentity = ourIdentity,
|
||||
activeSessions = sessions.mapNotNull { it.value.toActiveSession(it.key) },
|
||||
errored = errorState as? ErrorState.Errored
|
||||
)
|
||||
}
|
||||
|
||||
private fun Checkpoint.suspendedTimestamp(): Instant = invocationContext.trace.invocationId.timestamp
|
||||
|
||||
private fun checkpointDeserializationErrorMessage(
|
||||
checkpointId: StateMachineRunId,
|
||||
exception: Exception
|
||||
): String {
|
||||
return """
|
||||
*** Unable to deserialise checkpoint: ${exception.message} ***
|
||||
*** Check logs for further information, checkpoint flowId: ${checkpointId.uuid} ***
|
||||
"""
|
||||
.trimIndent()
|
||||
}
|
||||
|
||||
private fun FlowStateMachineImpl<*>.getQuasarStack() =
|
||||
declaredField<Stack>("stack").value.declaredField<Array<*>>("dataObject").value
|
||||
|
||||
private fun SubFlow.toJson(stackObjects: Array<*>): FlowCall {
|
||||
val subFlowLogic = stackObjects.find(flowClass::isInstance) as? FlowLogic<*>
|
||||
val currentStep = subFlowLogic?.progressTracker?.currentStep
|
||||
return FlowCall(
|
||||
flowClass = flowClass,
|
||||
progressStep = if (currentStep == ProgressTracker.UNSTARTED) null else currentStep?.label,
|
||||
flowLogic = subFlowLogic
|
||||
)
|
||||
}
|
||||
|
||||
private fun List<FlowCall>.toSummary() = map {
|
||||
FlowCallSummary(
|
||||
it.flowClass,
|
||||
it.progressStep
|
||||
)
|
||||
}
|
||||
|
||||
@Suppress("unused")
|
||||
private class FlowCall(val flowClass: Class<*>, val progressStep: String?)
|
||||
private class FlowCallSummary(
|
||||
val flowClass: Class<*>,
|
||||
val progressStep: String?
|
||||
)
|
||||
|
||||
@Suppress("unused")
|
||||
private class FlowCall(
|
||||
val flowClass: Class<*>,
|
||||
val progressStep: String?,
|
||||
val flowLogic: FlowLogic<*>?
|
||||
)
|
||||
|
||||
@Suppress("unused")
|
||||
@JsonInclude(Include.NON_NULL)
|
||||
@ -211,15 +263,16 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private
|
||||
|
||||
@Suppress("unused")
|
||||
private class CheckpointJson(
|
||||
val id: UUID,
|
||||
val flowLogicClass: Class<FlowLogic<*>>,
|
||||
val flowLogic: FlowLogic<*>,
|
||||
val flowCallStack: List<FlowCall>,
|
||||
val suspendedOn: SuspendedOn?,
|
||||
val origin: Origin,
|
||||
val ourIdentity: Party,
|
||||
val activeSessions: List<ActiveSession>,
|
||||
val errored: ErrorState.Errored?
|
||||
val flowId: UUID,
|
||||
val topLevelFlowClass: Class<FlowLogic<*>>,
|
||||
val topLevelFlowLogic: FlowLogic<*>,
|
||||
val flowCallStackSummary: List<FlowCallSummary>,
|
||||
val suspendedOn: SuspendedOn?,
|
||||
val flowCallStack: List<FlowCall>,
|
||||
val origin: Origin,
|
||||
val ourIdentity: Party,
|
||||
val activeSessions: List<ActiveSession>,
|
||||
val errored: ErrorState.Errored?
|
||||
)
|
||||
|
||||
@Suppress("unused")
|
||||
@ -236,7 +289,7 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private
|
||||
val customOperation: FlowIORequest.ExecuteAsyncOperation<*>? = null,
|
||||
val forceCheckpoint: FlowIORequest.ForceCheckpoint? = null
|
||||
) {
|
||||
@JsonFormat(pattern ="yyyy-MM-dd'T'HH:mm:ss", timezone = "UTC")
|
||||
@JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss'Z'", timezone = "UTC")
|
||||
lateinit var suspendedTimestamp: Instant
|
||||
var secondsSpentWaiting: Long = 0
|
||||
}
|
||||
@ -247,7 +300,7 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private
|
||||
private fun FlowIORequest<*>.toSuspendedOn(suspendedTimestamp: Instant, now: Instant): SuspendedOn {
|
||||
fun Map<FlowSession, SerializedBytes<Any>>.toJson(): List<SendJson> {
|
||||
return map {
|
||||
val payload = it.value.deserialize()
|
||||
val payload = it.value.deserializeOrOutputPlaceholder()
|
||||
SendJson(it.key, payload.javaClass, payload)
|
||||
}
|
||||
}
|
||||
@ -272,6 +325,12 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private
|
||||
}
|
||||
}
|
||||
|
||||
private fun SerializedBytes<Any>.deserializeOrOutputPlaceholder() = try {
|
||||
deserialize()
|
||||
} catch (e: Exception) {
|
||||
"*** Unable to deserialise message payload: ${e.message} ***"
|
||||
}
|
||||
|
||||
@Suppress("unused")
|
||||
private class ActiveSession(
|
||||
val peer: Party,
|
||||
|
Loading…
Reference in New Issue
Block a user