added checkpoints debug shell command (#6574)

This commit is contained in:
Walter Oggioni 2020-08-25 11:10:25 +02:00 committed by GitHub
parent 57f4858a29
commit 49f598308b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 149 additions and 5 deletions

View File

@ -10,4 +10,8 @@ interface FlowManagerRPCOps : RPCOps {
* Dump all the current flow checkpoints as JSON into a zip file in the node's log directory.
*/
fun dumpCheckpoints()
/** Dump all the current flow checkpoints, alongside with the node's main jar, all CorDapps and driver jars
* into a zip file in the node's log directory. */
fun debugCheckpoints()
}

View File

@ -370,7 +370,12 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
@Volatile
private var _started: S? = null
private val checkpointDumper = CheckpointDumperImpl(checkpointStorage, database, services, services.configuration.baseDirectory)
private val checkpointDumper = CheckpointDumperImpl(
checkpointStorage,
database,
services,
services.configuration.baseDirectory,
services.configuration.cordappDirectories)
private var notaryService: NotaryService? = null

View File

@ -12,4 +12,6 @@ internal class FlowManagerRPCOpsImpl(private val checkpointDumper: CheckpointDum
override val protocolVersion: Int = PLATFORM_VERSION
override fun dumpCheckpoints() = checkpointDumper.dumpCheckpoints()
override fun debugCheckpoints() = checkpointDumper.debugCheckpoints()
}

View File

@ -71,7 +71,11 @@ import net.corda.nodeapi.internal.lifecycle.NodeLifecycleObserver.Companion.repo
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
import net.corda.serialization.internal.withTokenContext
import java.io.InputStream
import java.nio.file.FileSystems
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.time.Duration
import java.time.Instant
import java.time.ZoneOffset.UTC
@ -79,14 +83,17 @@ import java.time.format.DateTimeFormatter
import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.zip.CRC32
import java.util.zip.ZipEntry
import java.util.zip.ZipOutputStream
import kotlin.reflect.KProperty1
import kotlin.reflect.full.companionObject
import kotlin.reflect.full.memberProperties
import kotlin.streams.asSequence
class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, private val database: CordaPersistence,
private val serviceHub: ServiceHub, val baseDirectory: Path) : NodeLifecycleObserver {
private val serviceHub: ServiceHub, val baseDirectory: Path,
private val cordappDirectories: Iterable<Path>) : NodeLifecycleObserver {
companion object {
internal val TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss").withZone(UTC)
private val log = contextLogger()
@ -95,6 +102,68 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
Checkpoint.FlowStatus.HOSPITALIZED,
Checkpoint.FlowStatus.PAUSED
)
private fun writeFiber2Zip(zipOutputStream : ZipOutputStream,
context: CheckpointSerializationContext,
runId: StateMachineRunId,
flowState: FlowState.Started) {
@Suppress("TooGenericExceptionCaught")
try {
flowState.frozenFiber.checkpointDeserialize(context)
} catch (e: Exception) {
log.error("Failed to deserialise checkpoint with flowId: ${runId.uuid}", e)
null
}?.let { fiber ->
val zipEntry = ZipEntry("fibers/${fiber.logic.javaClass.name}-${runId.uuid}.fiber").apply {
//Fibers can easily be compressed, so they are stored as DEFLATED
method = ZipEntry.DEFLATED
}
zipOutputStream.putNextEntry(zipEntry)
zipOutputStream.write(flowState.frozenFiber.bytes)
zipOutputStream.closeEntry()
}
}
private fun computeSizeAndCrc32(inputStream: InputStream,
buffer : ByteArray) : Pair<Long, Long> {
val crc32 = CRC32()
var sz = 0L
while (true) {
val read = inputStream.read(buffer)
if (read < 0) break
sz += read
crc32.update(buffer, 0, read)
}
return sz to crc32.value
}
private fun write2Zip(zip: ZipOutputStream,
inputStream: InputStream,
buffer : ByteArray) {
while (true) {
val read = inputStream.read(buffer)
if (read < 0) break
zip.write(buffer, 0, read)
}
}
private fun writeStoredEntry(zip : ZipOutputStream, source : Path, destinationFileName : String, buffer : ByteArray) {
val zipEntry = ZipEntry(destinationFileName).apply {
// A stored ZipEntry requires computing the size and CRC32 in advance
val (sz, crc32) = Files.newInputStream(source).use {
computeSizeAndCrc32(it, buffer)
}
method = ZipEntry.STORED
size = sz
compressedSize = sz
crc = crc32
}
zip.putNextEntry(zipEntry)
Files.newInputStream(source).use {
write2Zip(zip, it, buffer)
}
zip.closeEntry()
}
}
override val priority: Int = SERVICE_PRIORITY_NORMAL
@ -176,6 +245,57 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
}
}
@Suppress("ComplexMethod")
fun debugCheckpoints() {
val now = serviceHub.clock.instant()
val file = baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "checkpoints_debug-${TIME_FORMATTER.format(now)}.zip"
try {
if (lock.getAndIncrement() == 0 && !file.exists()) {
database.transaction {
checkpointStorage.getCheckpoints(DUMPABLE_CHECKPOINTS).use { stream ->
ZipOutputStream(file.outputStream()).use { zip ->
@Suppress("MagicNumber")
val buffer = ByteArray(0x10000)
//Dump checkpoints in "fibers" folder
for((runId, serializedCheckpoint) in stream) {
val flowState = serializedCheckpoint.deserialize(checkpointSerializationContext).flowState
if(flowState is FlowState.Started) writeFiber2Zip(zip, checkpointSerializationContext, runId, flowState)
}
val jarFilter = { directoryEntry : Path -> directoryEntry.fileName.toString().endsWith(".jar") }
//Dump cordApps jar in the "cordapp" folder
for(cordappDirectory in cordappDirectories) {
val corDappJars = Files.list(cordappDirectory).filter(jarFilter).asSequence()
corDappJars.forEach { corDappJar ->
//Jar files are already compressed, so they are stored in the zip as they are
writeStoredEntry(zip, corDappJar, "cordapps/${corDappJar.fileName}", buffer)
}
}
//Dump all jars contained in the corda.jar in the lib directory and dump all
// the driver jars in the driver folder of the node to the driver folder of the dump file
val pairs = listOf(
"lib" to FileSystems.newFileSystem(
Paths.get(System.getProperty("capsule.jar")), null).getPath("/"),
"drivers" to baseDirectory.resolve("drivers")
)
for((dest, source) in pairs) {
Files.list(source).filter(jarFilter).forEach { jarEntry ->
writeStoredEntry(zip, jarEntry, "$dest/${jarEntry.fileName}", buffer)
}
}
}
}
}
} else {
log.info("Flow dump already in progress, skipping current call")
}
} finally {
lock.decrementAndGet()
}
}
private fun instrumentCheckpointAgent(checkpointId: StateMachineRunId) {
log.info("Checkpoint agent diagnostics for checkpointId: $checkpointId")
try {

View File

@ -56,6 +56,7 @@ class CheckpointDumperImplTest {
private val myself = TestIdentity(CordaX500Name(organisation, "London", "GB"))
private val currentTimestamp = Instant.parse("2019-12-25T10:15:30.00Z")
private val baseDirectory = Files.createTempDirectory("CheckpointDumperTest")
private val corDappDirectories = listOf(baseDirectory.resolve("cordapps"))
private val file = baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME /
"checkpoints_dump-${CheckpointDumperImpl.TIME_FORMATTER.format(currentTimestamp)}.zip"
@ -102,7 +103,7 @@ class CheckpointDumperImplTest {
@Test(timeout=300_000)
fun testDumpCheckpoints() {
val dumper = CheckpointDumperImpl(checkpointStorage, database, services, baseDirectory)
val dumper = CheckpointDumperImpl(checkpointStorage, database, services, baseDirectory, corDappDirectories)
dumper.update(mockAfterStartEvent)
// add a checkpoint
@ -117,7 +118,7 @@ class CheckpointDumperImplTest {
@Test(timeout=300_000)
fun `Checkpoint dumper doesn't output completed checkpoints`() {
val dumper = CheckpointDumperImpl(checkpointStorage, database, services, baseDirectory)
val dumper = CheckpointDumperImpl(checkpointStorage, database, services, baseDirectory, corDappDirectories)
dumper.update(mockAfterStartEvent)
// add a checkpoint
@ -157,7 +158,7 @@ class CheckpointDumperImplTest {
// -javaagent:tools/checkpoint-agent/build/libs/checkpoint-agent.jar
@Test(timeout=300_000)
fun testDumpCheckpointsAndAgentDiagnostics() {
val dumper = CheckpointDumperImpl(checkpointStorage, database, services, Paths.get("."))
val dumper = CheckpointDumperImpl(checkpointStorage, database, services, Paths.get("."), Paths.get("cordapps"))
dumper.update(mockAfterStartEvent)
// add a checkpoint

View File

@ -24,4 +24,11 @@ public class CheckpointShellCommand extends InteractiveShellCommand<FlowManagerR
public void dump() {
runDumpCheckpoints(ops());
}
@Command
@Man("Outputs the contents of all started flow checkpoints in a zip file")
@Usage("Outputs the contents of all started flow checkpoints in a zip file")
public void debug() {
runDebugCheckpoints(ops());
}
}

View File

@ -565,6 +565,11 @@ object InteractiveShell {
rpcOps.dumpCheckpoints()
}
@JvmStatic
fun runDebugCheckpoints(rpcOps: FlowManagerRPCOps) {
rpcOps.debugCheckpoints()
}
@JvmStatic
fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>, cordaRPCOps: CordaRPCOps,
inputObjectMapper: ObjectMapper): Any? {