mirror of
https://github.com/corda/corda.git
synced 2025-06-18 15:18:16 +00:00
CORDA-3071 - Checkpoint agent tool (#5295)
* Initial commit based on experimental kryo hook agent. * WIP * Added documentation. * Additional improvements and documentation following more testing. * Added field level instrumentation + basic type handlers for String, byteArray, charArray, primitive types. * Working version (without array type handling) * Missing build.gradle file. * Handle display of Arrays and String. Pruning output to avoid repetition (by loop depth, object count). * Added configurable StackDepth (for display purposes) and filter out ProgressTracker stacks. * Further array handling (Object arrays, 2D, 3D), improved display and general code cleanup. * Various fixes and improvements following demo to RP. * Clean-up * Further clean-up * Set checkpoint id before deserialization. * Update documentation * Final clean-up. * Minor documentation fixes. * Updates following PR review feedback. * Add changelog entry.
This commit is contained in:
@ -341,7 +341,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
installCoreFlows()
|
||||
registerCordappFlows()
|
||||
services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
|
||||
val checkpointDumper = CheckpointDumper(checkpointStorage, database, services)
|
||||
val checkpointDumper = CheckpointDumper(checkpointStorage, database, services, services.configuration.baseDirectory)
|
||||
val rpcOps = makeRPCOps(cordappLoader, checkpointDumper)
|
||||
startShell()
|
||||
networkMapClient?.start(trustRoot)
|
||||
|
@ -1,6 +1,7 @@
|
||||
package net.corda.node.services.rpc
|
||||
|
||||
import co.paralleluniverse.fibers.Stack
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
|
||||
import com.fasterxml.jackson.annotation.JsonFormat
|
||||
@ -24,9 +25,11 @@ import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowInfo
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.serialization.deserialize
|
||||
@ -38,11 +41,12 @@ import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.internal.NodeStartup
|
||||
import net.corda.node.services.api.CheckpointStorage
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.statemachine.*
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
|
||||
import net.corda.serialization.internal.withTokenContext
|
||||
import sun.misc.VMSupport
|
||||
import java.nio.file.Path
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.time.ZoneOffset.UTC
|
||||
@ -53,7 +57,7 @@ import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.zip.ZipEntry
|
||||
import java.util.zip.ZipOutputStream
|
||||
|
||||
class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private val database: CordaPersistence, private val serviceHub: ServiceHubInternal) {
|
||||
class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private val database: CordaPersistence, private val serviceHub: ServiceHub, val baseDirectory: Path) {
|
||||
companion object {
|
||||
private val TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss").withZone(UTC)
|
||||
private val log = contextLogger()
|
||||
@ -64,6 +68,10 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private
|
||||
private lateinit var checkpointSerializationContext: CheckpointSerializationContext
|
||||
private lateinit var writer: ObjectWriter
|
||||
|
||||
private val isCheckpointAgentRunning by lazy {
|
||||
checkpointAgentRunning()
|
||||
}
|
||||
|
||||
fun start(tokenizableServices: List<Any>) {
|
||||
checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext(
|
||||
CheckpointSerializeAsTokenContextImpl(
|
||||
@ -91,13 +99,15 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private
|
||||
|
||||
fun dump() {
|
||||
val now = serviceHub.clock.instant()
|
||||
val file = serviceHub.configuration.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "checkpoints_dump-${TIME_FORMATTER.format(now)}.zip"
|
||||
val file = baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "checkpoints_dump-${TIME_FORMATTER.format(now)}.zip"
|
||||
try {
|
||||
if (lock.getAndIncrement() == 0 && !file.exists()) {
|
||||
database.transaction {
|
||||
checkpointStorage.getAllCheckpoints().use { stream ->
|
||||
ZipOutputStream(file.outputStream()).use { zip ->
|
||||
stream.forEach { (runId, serialisedCheckpoint) ->
|
||||
if (isCheckpointAgentRunning)
|
||||
instrumentCheckpointAgent(runId)
|
||||
val checkpoint = serialisedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext)
|
||||
val json = checkpoint.toJson(runId.uuid, now)
|
||||
val jsonBytes = writer.writeValueAsBytes(json)
|
||||
@ -116,6 +126,28 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private
|
||||
}
|
||||
}
|
||||
|
||||
private fun instrumentCheckpointAgent(checkpointId: StateMachineRunId) {
|
||||
log.info("Checkpoint agent diagnostics for checkpointId: $checkpointId")
|
||||
try {
|
||||
val checkpointHook = Class.forName("net.corda.tools.CheckpointHook").kotlin
|
||||
if (checkpointHook.objectInstance == null)
|
||||
log.info("Instantiating checkpoint agent object instance")
|
||||
val instance = checkpointHook.objectOrNewInstance()
|
||||
val checkpointIdField = instance.declaredField<UUID>(instance.javaClass, "checkpointId")
|
||||
checkpointIdField.value = checkpointId.uuid
|
||||
}
|
||||
catch (e: Exception) {
|
||||
log.error("Checkpoint agent instrumentation failed for checkpointId: $checkpointId\n. ${e.message}")
|
||||
}
|
||||
}
|
||||
|
||||
private fun checkpointAgentRunning(): Boolean {
|
||||
val agentProperties = VMSupport.getAgentProperties()
|
||||
return agentProperties.values.any { value ->
|
||||
(value is String && value.contains("checkpoint-agent.jar"))
|
||||
}
|
||||
}
|
||||
|
||||
private fun Checkpoint.toJson(id: UUID, now: Instant): CheckpointJson {
|
||||
val (fiber, flowLogic) = when (flowState) {
|
||||
is FlowState.Unstarted -> {
|
||||
|
@ -0,0 +1,102 @@
|
||||
package net.corda.node.services.rpc
|
||||
|
||||
import net.corda.core.context.InvocationContext
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
|
||||
import net.corda.core.serialization.internal.checkpointSerialize
|
||||
import net.corda.node.services.persistence.DBCheckpointStorage
|
||||
import net.corda.node.services.statemachine.Checkpoint
|
||||
import net.corda.node.services.statemachine.FlowStart
|
||||
import net.corda.node.services.statemachine.SubFlowVersion
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.node.MockServices
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import java.nio.file.Paths
|
||||
|
||||
class CheckpointDumperTest {
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule()
|
||||
|
||||
private val myself = TestIdentity(CordaX500Name("Me", "London", "GB"))
|
||||
private lateinit var database: CordaPersistence
|
||||
private lateinit var services: MockServices
|
||||
private lateinit var checkpointStorage: DBCheckpointStorage
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
val (db, mockServices) = MockServices.makeTestDatabaseAndPersistentServices(
|
||||
cordappPackages = emptyList(),
|
||||
initialIdentity = myself,
|
||||
moreIdentities = emptySet(),
|
||||
moreKeys = emptySet()
|
||||
)
|
||||
database = db
|
||||
services = mockServices
|
||||
newCheckpointStorage()
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
database.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testDumpCheckpoints() {
|
||||
val dumper = CheckpointDumper(checkpointStorage, database, services, Paths.get("."))
|
||||
dumper.start(emptyList())
|
||||
|
||||
// add a checkpoint
|
||||
val (id, checkpoint) = newCheckpoint()
|
||||
database.transaction {
|
||||
checkpointStorage.addCheckpoint(id, checkpoint)
|
||||
}
|
||||
|
||||
dumper.dump()
|
||||
// check existence of output zip file: checkpoints_dump-<data>.zip
|
||||
}
|
||||
|
||||
// This test will only succeed when the VM startup includes the "checkpoint-agent":
|
||||
// -javaagent:tools/checkpoint-agent/build/libs/checkpoint-agent.jar
|
||||
@Test
|
||||
fun testDumpCheckpointsAndAgentDiagnostics() {
|
||||
val dumper = CheckpointDumper(checkpointStorage, database, services, Paths.get("."))
|
||||
dumper.start(emptyList())
|
||||
|
||||
// add a checkpoint
|
||||
val (id, checkpoint) = newCheckpoint()
|
||||
database.transaction {
|
||||
checkpointStorage.addCheckpoint(id, checkpoint)
|
||||
}
|
||||
|
||||
dumper.dump()
|
||||
// check existence of output zip file: checkpoints_dump-<date>.zip
|
||||
// check existence of output agent log: checkpoints_agent-<data>.log
|
||||
}
|
||||
|
||||
private fun newCheckpointStorage() {
|
||||
database.transaction {
|
||||
checkpointStorage = DBCheckpointStorage()
|
||||
}
|
||||
}
|
||||
|
||||
private fun newCheckpoint(version: Int = 1): Pair<StateMachineRunId, SerializedBytes<Checkpoint>> {
|
||||
val id = StateMachineRunId.createRandom()
|
||||
val logic: FlowLogic<*> = object : FlowLogic<Unit>() {
|
||||
override fun call() {}
|
||||
}
|
||||
val frozenLogic = logic.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
|
||||
val checkpoint = Checkpoint.create(InvocationContext.shell(), FlowStart.Explicit, logic.javaClass, frozenLogic, myself.identity.party, SubFlowVersion.CoreFlow(version), false)
|
||||
.getOrThrow()
|
||||
return id to checkpoint.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user