CORDA-3083: Backport dumpCheckpoints shell command (#5305)

This is a port of https://github.com/corda/corda/pull/5171 with some further improvements like printing out the contents of all the flows in the sub-flow stack (rather than just the current progress tracker step) and better error handling.
This commit is contained in:
Dan Newton 2019-07-18 10:42:14 +01:00 committed by Shams Asari
parent 2b36b6d8cf
commit 43dfd636dc
20 changed files with 530 additions and 43 deletions

View File

@ -216,15 +216,17 @@ open class StringToMethodCallParser<in T : Any> @JvmOverloads constructor(
/** Returns a string-to-string map of commands to a string describing available parameter types. */
val availableCommands: Map<String, String>
get() {
return methodMap.entries().map { entry ->
val (name, args) = entry // TODO: Kotlin 1.1
val argStr = if (args.parameterCount == 0) "" else {
val paramNames = methodParamNames[name]!!
val typeNames = args.parameters.map { it.type.simpleName }
val paramTypes = paramNames.zip(typeNames)
paramTypes.map { "${it.first}: ${it.second}" }.joinToString(", ")
return methodMap.entries().mapNotNull { (name, args) ->
if (args.parameterCount == 0) {
Pair(name, "")
} else {
methodParamNames[name]?. let { params ->
val typeNames = args.parameters.map { it.type.simpleName }
val paramTypes = params.zip(typeNames)
val paramNames = paramTypes.joinToString(", ") { "${it.first}: ${it.second}" }
Pair(name, paramNames)
}
}
Pair(name, argStr)
}.toMap()
}
}

View File

@ -6,6 +6,7 @@ import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.serialization.internal.effectiveSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport
@ -104,7 +105,7 @@ class CordaRPCClient private constructor(
}
}
private val rpcClient = RPCClient<CordaRPCOps>(
private val rpcClient = RPCClient<InternalCordaRPCOps>(
tcpTransport(ConnectionDirection.Outbound(), hostAndPort, config = sslConfiguration),
configuration.toRpcClientConfiguration(),
KRYO_RPC_CLIENT_CONTEXT
@ -136,7 +137,7 @@ class CordaRPCClient private constructor(
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/
fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?): CordaRPCConnection {
return CordaRPCConnection(rpcClient.start(CordaRPCOps::class.java, username, password, externalTrace, impersonatedActor))
return CordaRPCConnection(rpcClient.start(InternalCordaRPCOps::class.java, username, password, externalTrace, impersonatedActor))
}
/**

View File

@ -0,0 +1,12 @@
package net.corda.core.internal.messaging
import net.corda.core.messaging.CordaRPCOps
/**
* Contains internal RPC functions that should not be publicly exposed in [CordaRPCOps]
*/
interface InternalCordaRPCOps : CordaRPCOps {
/** Dump all the current flow checkpoints as JSON into a zip file in the node's log directory. */
fun dumpCheckpoints()
}

View File

@ -20,6 +20,7 @@ import net.corda.finance.flows.CashIssueFlow
import net.corda.node.internal.SecureCordaRPCOps
import net.corda.node.internal.StartedNode
import net.corda.node.services.Permissions.Companion.startFlow
import net.corda.node.services.rpc.CheckpointDumper
import net.corda.testing.contracts.DummyContract
import net.corda.testing.contracts.DummyContractV2
import net.corda.testing.contracts.DummyContractV3
@ -128,7 +129,7 @@ class ContractUpgradeFlowTest {
return startRpcClient<CordaRPCOps>(
rpcAddress = startRpcServer(
rpcUser = user,
ops = SecureCordaRPCOps(node.services, node.smm, node.database, node.services)
ops = SecureCordaRPCOps(node.services, node.smm, node.database, node.services, CheckpointDumper(node.checkpointStorage, node.database, node.services))
).get().broker.hostAndPort!!,
username = user.username,
password = user.password

View File

@ -16,6 +16,10 @@ Version 3.4
* Documentation updates
* Information about checkpointed flows can be retrieved from the shell. Calling ``dumpCheckpoints`` will create a zip file inside the node's
``log`` directory. This zip will contain a JSON representation of each checkpointed flow. This information can then be used to determine the
state of stuck flows or flows that experienced internal errors and were kept in the node for manual intervention.
Version 3.3
-----------

View File

@ -275,6 +275,17 @@ a drain is complete there should be no outstanding checkpoints or running flows.
A node can be drained or undrained via RPC using the ``setFlowsDrainingModeEnabled`` method, and via the shell using
the standard ``run`` command to invoke the RPC. See :doc:`shell` to learn more.
To assist in draining a node, the ``dumpCheckpoints`` shell command will output JSON representations of each checkpointed flow.
A zip containing the JSON files is created in the ``logs`` directory of the node. This information can then be used to determine the
state of stuck flows or flows that experienced internal errors and were kept in the node for manual intervention. To drain these flows,
the node will need to be restarted or the flow will need to be removed manually.
.. warning:: Deleting checkpoints manually can lead to an inconsistent ledger among transacting parties. Great care
and coordination with a flow's counterparties must be taken to ensure that a initiating flow and flows responding to it are correctly
removed. This experience will be improved in the future. Making it easier to kill flows while notifying their counterparties.
.. _contract_upgrading_ref:
Contract and state versioning
-----------------------------
@ -641,4 +652,4 @@ Then, in ``generateMappedObject``, add support for the new schema:
}
With this approach, whenever the state object is stored in the vault, a representation of it will be stored in two
separate database tables where possible - one for each supported schema.
separate database tables where possible - one for each supported schema.

View File

@ -1,9 +1,12 @@
package net.corda.node.shell;
import net.corda.core.messaging.*;
import net.corda.client.jackson.*;
import org.crsh.cli.*;
import org.crsh.command.*;
import net.corda.client.jackson.StringToMethodCallParser;
import net.corda.core.internal.messaging.InternalCordaRPCOps;
import org.crsh.cli.Argument;
import org.crsh.cli.Command;
import org.crsh.cli.Man;
import org.crsh.cli.Usage;
import org.crsh.command.InvocationContext;
import java.util.*;
@ -23,7 +26,7 @@ public class RunShellCommand extends InteractiveShellCommand {
InvocationContext<Map> context,
@Usage("The command to run") @Argument(unquote = false) List<String> command
) {
StringToMethodCallParser<CordaRPCOps> parser = new StringToMethodCallParser<>(CordaRPCOps.class, objectMapper());
StringToMethodCallParser<InternalCordaRPCOps> parser = new StringToMethodCallParser<>(InternalCordaRPCOps.class, objectMapper());
if (command == null) {
emitHelp(context, parser);
@ -33,7 +36,7 @@ public class RunShellCommand extends InteractiveShellCommand {
return InteractiveShell.runRPCFromString(command, out, context, ops());
}
private void emitHelp(InvocationContext<Map> context, StringToMethodCallParser<CordaRPCOps> parser) {
private void emitHelp(InvocationContext<Map> context, StringToMethodCallParser<InternalCordaRPCOps> parser) {
// Sends data down the pipeline about what commands are available. CRaSH will render it nicely.
// Each element we emit is a map of column -> content.
Set<Map.Entry<String, String>> entries = parser.getAvailableCommands().entrySet();

View File

@ -17,6 +17,7 @@ import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.*
import net.corda.core.node.*
@ -49,6 +50,7 @@ import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.network.*
import net.corda.node.services.persistence.*
import net.corda.node.services.rpc.CheckpointDumper
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.*
import net.corda.node.services.transactions.*
@ -159,8 +161,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
private var _started: StartedNode<AbstractNode>? = null
/** The implementation of the [CordaRPCOps] interface used by this node. */
open fun makeRPCOps(flowStarter: FlowStarter, database: CordaPersistence, smm: StateMachineManager): CordaRPCOps {
return SecureCordaRPCOps(services, smm, database, flowStarter)
open fun makeRPCOps(flowStarter: FlowStarter, database: CordaPersistence, smm: StateMachineManager, checkpointDumper: CheckpointDumper): InternalCordaRPCOps {
return SecureCordaRPCOps(services, smm, database, flowStarter, checkpointDumper)
}
private fun initCertificate() {
@ -242,6 +244,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
drainingModePollPeriod = configuration.drainingModePollPeriod,
nodeProperties = nodeProperties)
val checkpointDumper = CheckpointDumper(checkpointStorage, database, services)
(serverThread as? ExecutorService)?.let {
runOnStop += {
// We wait here, even though any in-flight messages should have been drained away because the
@ -252,7 +256,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
makeVaultObservers(schedulerService, database.hibernateConfig, smm, schemaService, flowLogicRefFactory)
val rpcOps = makeRPCOps(flowStarter, database, smm)
val rpcOps = makeRPCOps(flowStarter, database, smm, checkpointDumper)
startMessagingService(rpcOps)
installCoreFlows()
val cordaServices = installCordaServices(flowStarter)
@ -260,6 +264,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
registerCordappFlows(smm)
_services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
startShell(rpcOps)
checkpointDumper.start(tokenizableServices)
Pair(StartedNodeImpl(this, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
}
@ -293,7 +298,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
*/
protected abstract fun getRxIoScheduler(): Scheduler
open fun startShell(rpcOps: CordaRPCOps) {
open fun startShell(rpcOps: InternalCordaRPCOps) {
InteractiveShell.startShell(configuration, rpcOps, securityManager, _services.identityService, _services.database)
}

View File

@ -15,6 +15,7 @@ import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.RPC_UPLOADER
import net.corda.core.internal.STRUCTURAL_STEP_PREFIX
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.internal.sign
import net.corda.core.messaging.*
import net.corda.core.node.NodeInfo
@ -28,6 +29,7 @@ import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.rpc.CheckpointDumper
import net.corda.node.services.messaging.context
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.nodeapi.exceptions.RejectedCommandException
@ -45,8 +47,9 @@ internal class CordaRPCOpsImpl(
private val services: ServiceHubInternal,
private val smm: StateMachineManager,
private val database: CordaPersistence,
private val flowStarter: FlowStarter
) : CordaRPCOps {
private val flowStarter: FlowStarter,
private val checkpointDumper: CheckpointDumper
) : InternalCordaRPCOps {
override fun networkMapSnapshot(): List<NodeInfo> {
val (snapshot, updates) = networkMapFeed()
updates.notUsed()
@ -102,6 +105,8 @@ internal class CordaRPCOpsImpl(
}
}
override fun dumpCheckpoints() = checkpointDumper.dump()
override fun stateMachinesSnapshot(): List<StateMachineInfo> {
val (snapshot, updates) = stateMachinesFeed()
updates.notUsed()

View File

@ -7,7 +7,7 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.ParametersUpdateInfo
import net.corda.core.node.NodeInfo
@ -20,7 +20,7 @@ import java.io.InputStream
import java.security.PublicKey
// TODO change to KFunction reference after Kotlin fixes https://youtrack.jetbrains.com/issue/KT-12140
class RpcAuthorisationProxy(private val implementation: CordaRPCOps, private val context: () -> RpcAuthContext) : CordaRPCOps {
class RpcAuthorisationProxy(private val implementation: InternalCordaRPCOps, private val context: () -> RpcAuthContext) : InternalCordaRPCOps {
override fun networkParametersFeed(): DataFeed<ParametersUpdateInfo?, ParametersUpdateInfo> = guard("networkParametersFeed") {
implementation.networkParametersFeed()
}
@ -179,4 +179,6 @@ class RpcAuthorisationProxy(private val implementation: CordaRPCOps, private val
return action()
}
}
override fun dumpCheckpoints() = guard("dumpCheckpoints", implementation::dumpCheckpoints)
}

View File

@ -1,9 +1,11 @@
package net.corda.node.internal
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.messaging.CordaRPCOps
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.rpcContext
import net.corda.node.services.rpc.CheckpointDumper
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -14,7 +16,8 @@ class SecureCordaRPCOps(services: ServiceHubInternal,
smm: StateMachineManager,
database: CordaPersistence,
flowStarter: FlowStarter,
val unsafe: CordaRPCOps = CordaRPCOpsImpl(services, smm, database, flowStarter)) : CordaRPCOps by RpcAuthorisationProxy(unsafe, ::rpcContext) {
checkpointDumper: CheckpointDumper,
val unsafe: InternalCordaRPCOps = CordaRPCOpsImpl(services, smm, database, flowStarter, checkpointDumper)) : InternalCordaRPCOps by RpcAuthorisationProxy(unsafe, ::rpcContext) {
/**
* Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed

View File

@ -3,6 +3,7 @@ package net.corda.node.services.api
import net.corda.core.crypto.SecureHash
import net.corda.core.serialization.SerializedBytes
import net.corda.node.services.statemachine.FlowStateMachineImpl
import java.util.stream.Stream
/**
* Thread-safe storage of fiber checkpoints.
@ -27,6 +28,10 @@ interface CheckpointStorage {
*/
fun forEach(block: (Checkpoint) -> Boolean)
/**
* Added as part of [CheckpointDumper] backport. Replacement for [CheckpointStorage.forEach].
*/
fun getAllCheckpoints(): Stream<Pair<String, Checkpoint>>
}
// This class will be serialised, so everything it points to transitively must also be serialisable (with Kryo).

View File

@ -5,11 +5,12 @@ import net.corda.node.services.api.Checkpoint
import net.corda.node.services.api.CheckpointStorage
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.currentDBSession
import org.hibernate.annotations.Type
import java.io.Serializable
import java.util.stream.Stream
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
import org.hibernate.annotations.Type
/**
* Simple checkpoint key value storage in DB.
@ -56,4 +57,14 @@ class DBCheckpointStorage : CheckpointStorage {
}
}
}
override fun getAllCheckpoints(): Stream<Pair<String, Checkpoint>> {
val session = currentDBSession()
val criteriaQuery = session.criteriaBuilder.createQuery(DBCheckpoint::class.java)
val root = criteriaQuery.from(DBCheckpoint::class.java)
criteriaQuery.select(root)
return session.createQuery(criteriaQuery).stream().map {
it.checkpointId to Checkpoint(SerializedBytes(it.checkpoint))
}
}
}

View File

@ -0,0 +1,420 @@
package net.corda.node.services.rpc
import co.paralleluniverse.fibers.Stack
import com.fasterxml.jackson.annotation.*
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.util.DefaultIndenter
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter
import com.fasterxml.jackson.databind.*
import com.fasterxml.jackson.databind.annotation.JsonSerialize
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.databind.ser.BeanPropertyWriter
import com.fasterxml.jackson.databind.ser.BeanSerializerModifier
import com.google.common.primitives.Booleans
import net.corda.client.jackson.JacksonSupport
import net.corda.client.jackson.internal.jsonObject
import net.corda.core.context.InvocationOrigin
import net.corda.core.contracts.*
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.div
import net.corda.core.internal.exists
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.transactions.*
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.contextLogger
import net.corda.node.internal.NodeStartup
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.statemachine.*
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.serialization.SerializeAsTokenContextImpl
import net.corda.nodeapi.internal.serialization.withTokenContext
import java.lang.reflect.Field
import java.nio.file.Files
import java.time.Duration
import java.time.Instant
import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.zip.ZipEntry
import java.util.zip.ZipOutputStream
class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private val database: CordaPersistence, private val serviceHub: ServiceHubInternal) {
companion object {
private val TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss").withZone(UTC)
private val log = contextLogger()
}
private val lock = AtomicInteger(0)
private lateinit var checkpointSerializationContext: SerializeAsTokenContextImpl
private lateinit var writer: ObjectWriter
fun start(tokenizableServices: List<Any>) {
checkpointSerializationContext = SerializeAsTokenContextImpl(
tokenizableServices,
SerializationDefaults.SERIALIZATION_FACTORY,
SerializationDefaults.CHECKPOINT_CONTEXT,
serviceHub
)
val mapper = JacksonSupport.createNonRpcMapper()
mapper.registerModule(SimpleModule().apply {
setSerializerModifier(CheckpointDumperBeanModifier)
addSerializer(FlowSessionImplSerializer)
addSerializer(MapSerializer)
addSerializer(AttachmentSerializer)
setMixInAnnotation(FlowLogic::class.java, FlowLogicMixin::class.java)
setMixInAnnotation(SessionId::class.java, SessionIdMixin::class.java)
setMixInAnnotation(SignedTransaction::class.java, SignedTransactionMixin::class.java)
setMixInAnnotation(WireTransaction::class.java, WireTransactionMixin::class.java)
setMixInAnnotation(FlowSessionInternal::class.java, FlowSessionInternalMixin::class.java)
})
val prettyPrinter = DefaultPrettyPrinter().apply {
indentArraysWith(DefaultIndenter.SYSTEM_LINEFEED_INSTANCE)
}
writer = mapper.writer(prettyPrinter)
}
fun dump() {
val now = serviceHub.clock.instant()
val file = serviceHub.configuration.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "checkpoints_dump-${TIME_FORMATTER.format(now)}.zip"
try {
if (lock.getAndIncrement() == 0 && !file.exists()) {
database.transaction {
checkpointStorage.getAllCheckpoints().use { stream ->
ZipOutputStream(Files.newOutputStream(file)).use { zip ->
stream.forEach { (runId, serialisedCheckpoint) ->
try {
val checkpoint = serialisedCheckpoint.serializedFiber.deserialize(
context = SerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext(checkpointSerializationContext)
)
val json = checkpoint.toJson(runId, now)
val jsonBytes = writer.writeValueAsBytes(json)
zip.putNextEntry(ZipEntry("${json.topLevelFlowClass.simpleName}-$runId.json"))
zip.write(jsonBytes)
zip.closeEntry()
} catch (e: Exception) {
log.info("Failed to deserialise checkpoint with id: $runId", e)
zip.putNextEntry(ZipEntry("Undeserialisable-checkpoint-$runId.json"))
zip.write(
"""
*** Unable to deserialise checkpoint: ${e.message} ***
*** Check logs for further information, checkpoint id: $runId ***
"""
.trimIndent().toByteArray()
)
zip.closeEntry()
}
}
}
}
}
} else {
log.info("Flow dump already in progress, skipping current call")
}
} finally {
lock.decrementAndGet()
}
}
private fun FlowStateMachineImpl<*>.toJson(id: String, now: Instant): CheckpointJson {
// Poke into Quasar's stack and find the object references to the sub-flows so that we can correctly get the current progress
// step for each sub-call.
val stackObjects = declaredField<Stack>("stack").value
.declaredField<Array<*>>("dataObject").value
.filterIsInstance<FlowLogic<*>>()
.toSet()
.map {
FlowCall(
it.javaClass,
if (it.progressTracker?.currentStep == ProgressTracker.UNSTARTED) null else it.progressTracker?.currentStep?.label,
it
)
}
val activeSessions = openSessions.mapNotNull { (key, session) ->
ActiveSession(
key.second,
session.ourSessionId,
session.receivedMessages.toList(),
(session.state as? FlowSessionState.Initiated)?.peerSessionId
)
}
return CheckpointJson(
id,
logic.javaClass,
stackObjects,
waitingForResponse?.toSuspendedOn(suspendedTimestamp(), now),
context.origin.toOrigin(),
ourIdentity,
activeSessions
)
}
private fun FlowStateMachineImpl<*>.suspendedTimestamp(): Instant = context.trace.invocationId.timestamp
@Suppress("unused")
private class FlowCall(val flowClass: Class<*>, val progressStep: String?, val flowLogic: FlowLogic<*>)
@Suppress("unused")
@JsonInclude(Include.NON_NULL)
private class Origin(
val rpc: String? = null,
val peer: CordaX500Name? = null,
val service: String? = null,
val scheduled: ScheduledStateRef? = null,
val shell: InvocationOrigin.Shell? = null
)
private fun InvocationOrigin.toOrigin(): Origin {
return when (this) {
is InvocationOrigin.RPC -> Origin(rpc = actor.id.value)
is InvocationOrigin.Peer -> Origin(peer = party)
is InvocationOrigin.Service -> Origin(service = serviceClassName)
is InvocationOrigin.Scheduled -> Origin(scheduled = scheduledState)
is InvocationOrigin.Shell -> Origin(shell = this)
}
}
@Suppress("unused")
private class CheckpointJson(
val flowId: String,
val topLevelFlowClass: Class<FlowLogic<*>>,
val flowCallStack: List<FlowCall>,
val suspendedOn: SuspendedOn?,
val origin: Origin,
val ourIdentity: Party,
val activeSessions: List<ActiveSession>
)
@Suppress("unused")
@JsonInclude(Include.NON_NULL)
private class SuspendedOn(
val receive: ReceiveOnly? = null,
val sendAndReceive: SendAndReceiveJson? = null,
val waitForLedgerCommit: SecureHash? = null,
val receiveAll: ReceiveAll? = null
) {
@JsonFormat(pattern = "yyyy-MM-dd'T'HH:mm:ss", timezone = "UTC")
lateinit var suspendedTimestamp: Instant
var secondsSpentWaiting: Long = 0
}
@Suppress("unused")
private class SendAndReceiveJson(val session: FlowSession, val sentPayloadType: Class<*>?, val sentPayload: Any?, val receivedPayloadType: Class<*>?)
private fun WaitingRequest.toSuspendedOn(suspendedTimestamp: Instant, now: Instant): SuspendedOn {
fun SendAndReceive.toJson(): SendAndReceiveJson {
val payload = when (this.message) {
is ExistingSessionMessage -> (message.payload as? DataSessionMessage)?.payload?.deserializeOrOutputPlaceholder()
is InitialSessionMessage -> message.firstPayload?.deserializeOrOutputPlaceholder()
}
return SendAndReceiveJson(session.flowSession, payload?.javaClass, payload, userReceiveType)
}
return when (this) {
is ReceiveOnly -> SuspendedOn(receive = this)
is SendAndReceive -> SuspendedOn(sendAndReceive = this.toJson())
is WaitForLedgerCommit -> SuspendedOn(waitForLedgerCommit = hash)
is ReceiveAll -> SuspendedOn(receiveAll = this)
// should not be possible but the compiler wont let it through without this case
else -> SuspendedOn()
}.also {
it.suspendedTimestamp = suspendedTimestamp
it.secondsSpentWaiting = TimeUnit.MILLISECONDS.toSeconds(Duration.between(suspendedTimestamp, now).toMillis())
}
}
private fun SerializedBytes<Any>.deserializeOrOutputPlaceholder() = try {
deserialize()
} catch (e: Exception) {
"*** Unable to deserialise message payload: ${e.message} ***"
}
@Suppress("unused")
private class ActiveSession(
val peer: Party,
val ourSessionId: SessionId,
val receivedMessages: List<ReceivedSessionMessage>,
val peerSessionId: SessionId?
)
@Suppress("unused")
private interface SessionIdMixin {
@get:JsonValue
val toLong: Long
}
@JsonAutoDetect(getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
@JsonIgnoreProperties("flowUsedForSessions")
private interface FlowLogicMixin
private object CheckpointDumperBeanModifier : BeanSerializerModifier() {
override fun changeProperties(config: SerializationConfig,
beanDesc: BeanDescription,
beanProperties: MutableList<BeanPropertyWriter>): MutableList<BeanPropertyWriter> {
// Remove references to any node singletons
beanProperties.removeIf { it.type.isTypeOrSubTypeOf(SerializeAsToken::class.java) }
if (FlowLogic::class.java.isAssignableFrom(beanDesc.beanClass)) {
beanProperties.removeIf {
it.type.isTypeOrSubTypeOf(ProgressTracker::class.java) || it.name == "_stateMachine" || it.name == "deprecatedPartySessionMap"
}
}
return beanProperties
}
}
private object FlowSessionImplSerializer : JsonSerializer<FlowSessionImpl>() {
override fun serialize(value: FlowSessionImpl, gen: JsonGenerator, serializers: SerializerProvider) {
gen.jsonObject {
writeObjectField("peer", value.counterparty)
}
}
override fun handledType(): Class<FlowSessionImpl> = FlowSessionImpl::class.java
}
private object AttachmentSerializer : JsonSerializer<Attachment>() {
override fun serialize(value: Attachment, gen: JsonGenerator, serializers: SerializerProvider) = gen.writeObject(value.id)
override fun handledType(): Class<Attachment> = Attachment::class.java
}
private object MapSerializer : JsonSerializer<Map<Any, Any>>() {
override fun serialize(map: Map<Any, Any>, gen: JsonGenerator, serializers: SerializerProvider) {
gen.writeStartArray(map.size)
map.forEach { key, value ->
gen.jsonObject {
writeObjectField("key", key)
writeObjectField("value", value)
}
}
gen.writeEndArray()
}
override fun handledType(): Class<Map<Any, Any>> = uncheckedCast(Map::class.java)
}
@JsonSerialize(using = SignedTransactionSerializer::class)
private interface SignedTransactionMixin
private class SignedTransactionSerializer : JsonSerializer<SignedTransaction>() {
override fun serialize(value: SignedTransaction, gen: JsonGenerator, serializers: SerializerProvider) {
val core = value.coreTransaction
val stxJson = when (core) {
is WireTransaction -> StxJson(wire = core, signatures = value.sigs)
is FilteredTransaction -> StxJson(filtered = core, signatures = value.sigs)
is NotaryChangeWireTransaction -> StxJson(notaryChangeWire = core, signatures = value.sigs)
is ContractUpgradeWireTransaction -> StxJson(contractUpgradeWire = core, signatures = value.sigs)
is ContractUpgradeFilteredTransaction -> StxJson(contractUpgradeFiltered = core, signatures = value.sigs)
else -> throw IllegalArgumentException("Don't know about ${core.javaClass}")
}
gen.writeObject(stxJson)
}
}
@JsonInclude(Include.NON_NULL)
private data class StxJson(
val wire: WireTransaction? = null,
val filtered: FilteredTransaction? = null,
val notaryChangeWire: NotaryChangeWireTransaction? = null,
val contractUpgradeWire: ContractUpgradeWireTransaction? = null,
val contractUpgradeFiltered: ContractUpgradeFilteredTransaction? = null,
val signatures: List<TransactionSignature>
) {
init {
val count = Booleans.countTrue(wire != null, filtered != null, notaryChangeWire != null, contractUpgradeWire != null, contractUpgradeFiltered != null)
require(count == 1) { this }
}
}
@JsonSerialize(using = WireTransactionSerializer::class)
private interface WireTransactionMixin
private class WireTransactionSerializer : JsonSerializer<WireTransaction>() {
override fun serialize(value: WireTransaction, gen: JsonGenerator, serializers: SerializerProvider) {
gen.writeObject(WireTransactionJson(
value.id,
value.notary,
value.inputs,
value.outputs,
value.commands,
value.timeWindow,
value.attachments,
value.privacySalt
))
}
}
private class WireTransactionJson(
val id: SecureHash,
val notary: Party?,
val inputs: List<StateRef>,
val outputs: List<TransactionState<*>>,
val commands: List<Command<*>>,
val timeWindow: TimeWindow?,
val attachments: List<SecureHash>,
val privacySalt: PrivacySalt
)
@JsonIgnoreProperties("flow", "fiber")
@JsonAutoDetect(getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
private interface FlowSessionInternalMixin
}
// Taken from newer corda version due to parent classes not being searched for fields
// needed to access `stack` inside `FlowStateMachineImpl`
private fun <T> Any.declaredField(name: String): DeclaredField<T> = DeclaredField(javaClass, name, this)
private class DeclaredField<T>(clazz: Class<*>, name: String, private val receiver: Any?) {
private val javaField = findField(name, clazz)
var value: T
get() {
synchronized(this) {
return javaField.accessible { uncheckedCast<Any?, T>(get(receiver)) }
}
}
set(value) {
synchronized(this) {
javaField.accessible {
set(receiver, value)
}
}
}
val name: String = javaField.name
private fun <RESULT> Field.accessible(action: Field.() -> RESULT): RESULT {
val accessible = isAccessible
isAccessible = true
try {
return action(this)
} finally {
isAccessible = accessible
}
}
@Throws(NoSuchFieldException::class)
private fun findField(fieldName: String, clazz: Class<*>?): Field {
if (clazz == null) {
throw NoSuchFieldException(fieldName)
}
return try {
return clazz.getDeclaredField(fieldName)
} catch (e: NoSuchFieldException) {
findField(fieldName, clazz.superclass)
}
}
}

View File

@ -3,7 +3,7 @@ package net.corda.node.shell
import net.corda.core.context.Actor
import net.corda.core.context.InvocationContext
import net.corda.core.identity.CordaX500Name
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.node.internal.security.Password
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.internal.security.tryAuthenticate
@ -11,7 +11,7 @@ import org.crsh.auth.AuthInfo
import org.crsh.auth.AuthenticationPlugin
import org.crsh.plugin.CRaSHPlugin
class CordaAuthenticationPlugin(private val rpcOps: CordaRPCOps, private val securityManager: RPCSecurityManager, private val nodeLegalName: CordaX500Name) : CRaSHPlugin<AuthenticationPlugin<String>>(), AuthenticationPlugin<String> {
class CordaAuthenticationPlugin(private val rpcOps: InternalCordaRPCOps, private val securityManager: RPCSecurityManager, private val nodeLegalName: CordaX500Name) : CRaSHPlugin<AuthenticationPlugin<String>>(), AuthenticationPlugin<String> {
override fun getImplementation(): AuthenticationPlugin<String> = this

View File

@ -1,9 +1,9 @@
package net.corda.node.shell
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.node.utilities.ANSIProgressRenderer
import org.crsh.auth.AuthInfo
class CordaSSHAuthInfo(val successful: Boolean, val rpcOps: CordaRPCOps, val ansiProgressRenderer: ANSIProgressRenderer? = null) : AuthInfo {
class CordaSSHAuthInfo(val successful: Boolean, val rpcOps: InternalCordaRPCOps, val ansiProgressRenderer: ANSIProgressRenderer? = null) : AuthInfo {
override fun isSuccessful(): Boolean = successful
}

View File

@ -18,6 +18,7 @@ import net.corda.core.internal.*
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.StateMachineUpdate
@ -81,7 +82,7 @@ object InteractiveShell {
private lateinit var node: StartedNode<Node>
@VisibleForTesting
internal lateinit var database: CordaPersistence
private lateinit var rpcOps: CordaRPCOps
private lateinit var rpcOps: InternalCordaRPCOps
private lateinit var securityManager: RPCSecurityManager
private lateinit var identityService: IdentityService
private var shell: Shell? = null
@ -91,7 +92,7 @@ object InteractiveShell {
* Starts an interactive shell connected to the local terminal. This shell gives administrator access to the node
* internals.
*/
fun startShell(configuration: NodeConfiguration, cordaRPCOps: CordaRPCOps, securityManager: RPCSecurityManager, identityService: IdentityService, database: CordaPersistence) {
fun startShell(configuration: NodeConfiguration, cordaRPCOps: InternalCordaRPCOps, securityManager: RPCSecurityManager, identityService: IdentityService, database: CordaPersistence) {
this.rpcOps = cordaRPCOps
this.securityManager = securityManager
this.identityService = identityService
@ -368,8 +369,8 @@ object InteractiveShell {
}
@JvmStatic
fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>, cordaRPCOps: CordaRPCOps): Any? {
val parser = StringToMethodCallParser(CordaRPCOps::class.java, context.attributes["mapper"] as ObjectMapper)
fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>, cordaRPCOps: InternalCordaRPCOps): Any? {
val parser = StringToMethodCallParser(InternalCordaRPCOps::class.java, context.attributes["mapper"] as ObjectMapper)
val cmd = input.joinToString(" ").trim { it <= ' ' }
if (cmd.toLowerCase().startsWith("startflow")) {

View File

@ -1,7 +1,7 @@
package net.corda.node.shell
import net.corda.core.context.InvocationContext
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.security.AuthorizingSubject
import net.corda.node.services.messaging.CURRENT_RPC_CONTEXT
@ -11,9 +11,9 @@ import java.lang.reflect.Proxy
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future
fun makeRPCOpsWithContext(cordaRPCOps: CordaRPCOps, invocationContext:InvocationContext, authorizingSubject: AuthorizingSubject) : CordaRPCOps {
fun makeRPCOpsWithContext(cordaRPCOps: InternalCordaRPCOps, invocationContext:InvocationContext, authorizingSubject: AuthorizingSubject) : InternalCordaRPCOps {
return Proxy.newProxyInstance(CordaRPCOps::class.java.classLoader, arrayOf(CordaRPCOps::class.java), { _, method, args ->
return Proxy.newProxyInstance(InternalCordaRPCOps::class.java.classLoader, arrayOf(InternalCordaRPCOps::class.java), { _, method, args ->
RPCContextRunner(invocationContext, authorizingSubject) {
try {
method.invoke(cordaRPCOps, *(args ?: arrayOf()))
@ -22,7 +22,7 @@ fun makeRPCOpsWithContext(cordaRPCOps: CordaRPCOps, invocationContext:Invocation
throw e.targetException
}
}.get().getOrThrow()
}) as CordaRPCOps
}) as InternalCordaRPCOps
}
private class RPCContextRunner<T>(val invocationContext: InvocationContext, val authorizingSubject: AuthorizingSubject, val block:() -> T): Thread() {

View File

@ -32,6 +32,7 @@ import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.node.services.Permissions.Companion.startFlow
import net.corda.node.services.messaging.CURRENT_RPC_CONTEXT
import net.corda.node.services.messaging.RpcAuthContext
import net.corda.node.services.rpc.CheckpointDumper
import net.corda.nodeapi.internal.config.User
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.expect
@ -82,7 +83,7 @@ class CordaRPCOpsImplTest {
fun setup() {
mockNet = InternalMockNetwork(cordappPackages = listOf("net.corda.finance.contracts.asset", "net.corda.finance.schemas"))
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
rpc = SecureCordaRPCOps(aliceNode.services, aliceNode.smm, aliceNode.database, aliceNode.services)
rpc = SecureCordaRPCOps(aliceNode.services, aliceNode.smm, aliceNode.database, aliceNode.services, CheckpointDumper(aliceNode.checkpointStorage, aliceNode.database, aliceNode.services))
CURRENT_RPC_CONTEXT.set(RpcAuthContext(InvocationContext.rpc(testActor()), buildSubject("TEST_USER", emptySet())))
mockNet.runNetwork()

View File

@ -13,8 +13,8 @@ import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.createDirectories
import net.corda.core.internal.createDirectory
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.SingleMessageRecipient
@ -278,7 +278,7 @@ open class InternalMockNetwork(private val cordappPackages: List<String>,
return E2ETestKeyManagementService(identityService, keyPairs)
}
override fun startShell(rpcOps: CordaRPCOps) {
override fun startShell(rpcOps: InternalCordaRPCOps) {
//No mock shell
}