mirror of
https://github.com/corda/corda.git
synced 2025-01-31 16:35:43 +00:00
ENT-3496: dumpCheckpoints shell command (#5171)
Dumps all the node's checkpoints as JSON into a single zip file in the node's directory. The output contains: * All the fields for the top-level flow * The current sub-flow call stack, along with the current progress tracker step for each sub-flow * The event that suspended the flow, which if it's a send or sendAndReceive will show the payload that was sent * Low level information on the active sessions with other peers
This commit is contained in:
commit
2e02968c63
@ -239,15 +239,17 @@ open class StringToMethodCallParser<in T : Any> @JvmOverloads constructor(
|
||||
/** Returns a string-to-string map of commands to a string describing available parameter types. */
|
||||
val availableCommands: Map<String, String>
|
||||
get() {
|
||||
return methodMap.entries().map { entry ->
|
||||
val (name, args) = entry // TODO: Kotlin 1.1
|
||||
val argStr = if (args.parameterCount == 0) "" else {
|
||||
val paramNames = methodParamNames[name]!!
|
||||
val typeNames = args.parameters.map { it.type.simpleName }
|
||||
val paramTypes = paramNames.zip(typeNames)
|
||||
paramTypes.joinToString(", ") { "${it.first}: ${it.second}" }
|
||||
return methodMap.entries().mapNotNull { (name, args) ->
|
||||
if (args.parameterCount == 0) {
|
||||
Pair(name, "")
|
||||
} else {
|
||||
methodParamNames[name]?. let { params ->
|
||||
val typeNames = args.parameters.map { it.type.simpleName }
|
||||
val paramTypes = params.zip(typeNames)
|
||||
val paramNames = paramTypes.joinToString(", ") { "${it.first}: ${it.second}" }
|
||||
Pair(name, paramNames)
|
||||
}
|
||||
}
|
||||
Pair(name, argStr)
|
||||
}.toMap()
|
||||
}
|
||||
}
|
||||
|
@ -3,7 +3,7 @@
|
||||
package net.corda.client.jackson.internal
|
||||
|
||||
import com.fasterxml.jackson.annotation.*
|
||||
import com.fasterxml.jackson.annotation.JsonCreator.Mode.*
|
||||
import com.fasterxml.jackson.annotation.JsonCreator.Mode.DISABLED
|
||||
import com.fasterxml.jackson.annotation.JsonInclude.Include
|
||||
import com.fasterxml.jackson.core.JsonGenerator
|
||||
import com.fasterxml.jackson.core.JsonParseException
|
||||
@ -34,6 +34,7 @@ import net.corda.core.internal.DigitalSignatureWithCert
|
||||
import net.corda.core.internal.createComponentGroups
|
||||
import net.corda.core.internal.kotlinObjectInstance
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
@ -43,7 +44,8 @@ import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.parseAsHex
|
||||
import net.corda.core.utilities.toHexString
|
||||
import net.corda.serialization.internal.AllWhitelist
|
||||
import net.corda.serialization.internal.amqp.*
|
||||
import net.corda.serialization.internal.amqp.SerializerFactoryBuilder
|
||||
import net.corda.serialization.internal.amqp.hasCordaSerializable
|
||||
import java.math.BigDecimal
|
||||
import java.security.PublicKey
|
||||
import java.security.cert.CertPath
|
||||
@ -97,7 +99,7 @@ private class CordaSerializableBeanSerializerModifier : BeanSerializerModifier()
|
||||
beanDesc: BeanDescription,
|
||||
beanProperties: MutableList<BeanPropertyWriter>): MutableList<BeanPropertyWriter> {
|
||||
val beanClass = beanDesc.beanClass
|
||||
if (hasCordaSerializable(beanClass) && beanClass.kotlinObjectInstance == null) {
|
||||
if (hasCordaSerializable(beanClass) && beanClass.kotlinObjectInstance == null && !SerializeAsToken::class.java.isAssignableFrom(beanClass)) {
|
||||
val typeInformation = serializerFactory.getTypeInformation(beanClass)
|
||||
val properties = typeInformation.propertiesOrEmptyMap
|
||||
val amqpProperties = properties.mapNotNull { (name, property) ->
|
||||
|
@ -8,6 +8,7 @@ import net.corda.core.context.Actor
|
||||
import net.corda.core.context.Trace
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.PLATFORM_VERSION
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.core.messaging.ClientRpcSslOptions
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.serialization.SerializationCustomSerializer
|
||||
@ -310,7 +311,7 @@ class CordaRPCClient private constructor(
|
||||
}
|
||||
}
|
||||
|
||||
private fun getRpcClient(): RPCClient<CordaRPCOps> {
|
||||
private fun getRpcClient(): RPCClient<InternalCordaRPCOps> {
|
||||
return when {
|
||||
// Client->RPC broker
|
||||
haAddressPool.isEmpty() -> RPCClient(
|
||||
@ -385,7 +386,7 @@ class CordaRPCClient private constructor(
|
||||
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
|
||||
*/
|
||||
fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, targetLegalIdentity: CordaX500Name?): CordaRPCConnection {
|
||||
return CordaRPCConnection(getRpcClient().start(CordaRPCOps::class.java, username, password, externalTrace, impersonatedActor, targetLegalIdentity))
|
||||
return CordaRPCConnection(getRpcClient().start(InternalCordaRPCOps::class.java, username, password, externalTrace, impersonatedActor, targetLegalIdentity))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -3,6 +3,7 @@ package net.corda.client.rpc.internal
|
||||
import net.corda.client.rpc.*
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.core.internal.times
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.messaging.ClientRpcSslOptions
|
||||
@ -44,7 +45,7 @@ class ReconnectingCordaRPCOps private constructor(
|
||||
private val reconnectingRPCConnection: ReconnectingRPCConnection,
|
||||
private val observersPool: ExecutorService,
|
||||
private val userPool: Boolean
|
||||
) : AutoCloseable, CordaRPCOps by proxy(reconnectingRPCConnection, observersPool) {
|
||||
) : AutoCloseable, InternalCordaRPCOps by proxy(reconnectingRPCConnection, observersPool) {
|
||||
|
||||
// Constructors that mirror CordaRPCClient.
|
||||
constructor(
|
||||
@ -77,11 +78,11 @@ class ReconnectingCordaRPCOps private constructor(
|
||||
const val MAX_RETRY_ATTEMPTS_ON_AUTH_ERROR = 3
|
||||
|
||||
private val log = contextLogger()
|
||||
private fun proxy(reconnectingRPCConnection: ReconnectingRPCConnection, observersPool: ExecutorService): CordaRPCOps {
|
||||
private fun proxy(reconnectingRPCConnection: ReconnectingRPCConnection, observersPool: ExecutorService): InternalCordaRPCOps {
|
||||
return Proxy.newProxyInstance(
|
||||
this::class.java.classLoader,
|
||||
arrayOf(CordaRPCOps::class.java),
|
||||
ErrorInterceptingHandler(reconnectingRPCConnection, observersPool)) as CordaRPCOps
|
||||
arrayOf(InternalCordaRPCOps::class.java),
|
||||
ErrorInterceptingHandler(reconnectingRPCConnection, observersPool)) as InternalCordaRPCOps
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,12 @@
|
||||
package net.corda.core.internal.messaging
|
||||
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
|
||||
/**
|
||||
* Contains internal RPC functions that should not be publicly exposed in [CordaRPCOps]
|
||||
*/
|
||||
interface InternalCordaRPCOps : CordaRPCOps {
|
||||
|
||||
/** Dump all the current flow checkpoints as JSON into a zip file in the node's log directory. */
|
||||
fun dumpCheckpoints()
|
||||
}
|
@ -16,6 +16,10 @@ Version 5.0
|
||||
* Removed ``finance-workflows`` dependency on jackson library. The functions that used jackson (e.g. ``FinanceJSONSupport``) have been moved
|
||||
into IRS Demo.
|
||||
|
||||
* Information about checkpointed flows can be retrieved from the shell. Calling ``dumpCheckpoints`` will create a zip file inside the node's
|
||||
``log`` directory. This zip will contain a JSON representation of each checkpointed flow. This information can then be used to determine the
|
||||
state of stuck flows or flows that experienced internal errors and were kept in the node for manual intervention.
|
||||
|
||||
.. _changelog_v4.2:
|
||||
|
||||
Version 4.2
|
||||
|
@ -281,6 +281,15 @@ a drain is complete there should be no outstanding checkpoints or running flows.
|
||||
A node can be drained or undrained via RPC using the ``setFlowsDrainingModeEnabled`` method, and via the shell using
|
||||
the standard ``run`` command to invoke the RPC. See :doc:`shell` to learn more.
|
||||
|
||||
To assist in draining a node, the ``dumpCheckpoints`` shell command will output JSON representations of each checkpointed flow.
|
||||
A zip containing the JSON files is created in the ``logs`` directory of the node. This information can then be used to determine the
|
||||
state of stuck flows or flows that experienced internal errors and were kept in the node for manual intervention. To drain these flows,
|
||||
the node will need to be restarted or the flow will need to be removed using ``killFlow``.
|
||||
|
||||
.. warning:: Deleting checkpoints manually or via ``killFlow`` can lead to an inconsistent ledger among transacting parties. Great care
|
||||
and coordination with a flow's counterparties must be taken to ensure that a initiating flow and flows responding to it are correctly
|
||||
removed. This experience will be improved in the future. Making it easier to kill flows while notifying their counterparties.
|
||||
|
||||
.. _contract_upgrading_ref:
|
||||
|
||||
Contract and state versioning
|
||||
|
@ -21,6 +21,7 @@ import net.corda.core.internal.*
|
||||
import net.corda.core.internal.concurrent.doneFuture
|
||||
import net.corda.core.internal.concurrent.map
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.core.internal.notary.NotaryService
|
||||
import net.corda.core.messaging.*
|
||||
import net.corda.core.node.*
|
||||
@ -62,6 +63,7 @@ import net.corda.node.services.network.NetworkMapUpdater
|
||||
import net.corda.node.services.network.NodeInfoWatcher
|
||||
import net.corda.node.services.network.PersistentNetworkMapCache
|
||||
import net.corda.node.services.persistence.*
|
||||
import net.corda.node.services.rpc.CheckpointDumper
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.services.statemachine.*
|
||||
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
||||
@ -259,16 +261,23 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
/** The implementation of the [CordaRPCOps] interface used by this node. */
|
||||
open fun makeRPCOps(cordappLoader: CordappLoader): CordaRPCOps {
|
||||
val ops: CordaRPCOps = CordaRPCOpsImpl(services, smm, flowStarter) { shutdownExecutor.submit { stop() } }.also { it.closeOnStop() }
|
||||
val proxies = mutableListOf<(CordaRPCOps) -> CordaRPCOps>()
|
||||
open fun makeRPCOps(cordappLoader: CordappLoader, checkpointDumper: CheckpointDumper): CordaRPCOps {
|
||||
val ops: InternalCordaRPCOps = CordaRPCOpsImpl(
|
||||
services,
|
||||
smm,
|
||||
flowStarter,
|
||||
checkpointDumper
|
||||
) {
|
||||
shutdownExecutor.submit(::stop)
|
||||
}.also { it.closeOnStop() }
|
||||
val proxies = mutableListOf<(InternalCordaRPCOps) -> InternalCordaRPCOps>()
|
||||
// Mind that order is relevant here.
|
||||
proxies += ::AuthenticatedRpcOpsProxy
|
||||
if (!configuration.devMode) {
|
||||
proxies += { it -> ExceptionMaskingRpcOpsProxy(it, true) }
|
||||
proxies += { ExceptionMaskingRpcOpsProxy(it, true) }
|
||||
}
|
||||
proxies += { it -> ExceptionSerialisingRpcOpsProxy(it, configuration.devMode) }
|
||||
proxies += { it -> ThreadContextAdjustingRpcOpsProxy(it, cordappLoader.appClassLoader) }
|
||||
proxies += { ExceptionSerialisingRpcOpsProxy(it, configuration.devMode) }
|
||||
proxies += { ThreadContextAdjustingRpcOpsProxy(it, cordappLoader.appClassLoader) }
|
||||
return proxies.fold(ops) { delegate, decorate -> decorate(delegate) }
|
||||
}
|
||||
|
||||
@ -329,7 +338,8 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
installCoreFlows()
|
||||
registerCordappFlows()
|
||||
services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
|
||||
val rpcOps = makeRPCOps(cordappLoader)
|
||||
val checkpointDumper = CheckpointDumper(checkpointStorage, database, services)
|
||||
val rpcOps = makeRPCOps(cordappLoader, checkpointDumper)
|
||||
startShell()
|
||||
networkMapClient?.start(trustRoot)
|
||||
|
||||
@ -390,7 +400,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
tokenizableServices = null
|
||||
|
||||
verifyCheckpointsCompatible(frozenTokenizableServices)
|
||||
|
||||
checkpointDumper.start(frozenTokenizableServices)
|
||||
smm.start(frozenTokenizableServices)
|
||||
// Shut down the SMM so no Fibers are scheduled.
|
||||
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
|
||||
|
@ -17,6 +17,7 @@ import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.FlowStateMachine
|
||||
import net.corda.core.internal.RPC_UPLOADER
|
||||
import net.corda.core.internal.STRUCTURAL_STEP_PREFIX
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.core.internal.sign
|
||||
import net.corda.core.messaging.*
|
||||
import net.corda.core.node.NetworkParameters
|
||||
@ -31,6 +32,7 @@ import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.api.FlowStarter
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.rpc.CheckpointDumper
|
||||
import net.corda.node.services.rpc.context
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.nodeapi.exceptions.NonRpcFlowException
|
||||
@ -51,8 +53,9 @@ internal class CordaRPCOpsImpl(
|
||||
private val services: ServiceHubInternal,
|
||||
private val smm: StateMachineManager,
|
||||
private val flowStarter: FlowStarter,
|
||||
private val checkpointDumper: CheckpointDumper,
|
||||
private val shutdownNode: () -> Unit
|
||||
) : CordaRPCOps, AutoCloseable {
|
||||
) : InternalCordaRPCOps, AutoCloseable {
|
||||
|
||||
private companion object {
|
||||
private val logger = loggerFor<CordaRPCOpsImpl>()
|
||||
@ -130,6 +133,8 @@ internal class CordaRPCOpsImpl(
|
||||
return services.validatedTransactions.track()
|
||||
}
|
||||
|
||||
override fun dumpCheckpoints() = checkpointDumper.dump()
|
||||
|
||||
override fun stateMachinesSnapshot(): List<StateMachineInfo> {
|
||||
val (snapshot, updates) = stateMachinesFeed()
|
||||
updates.notUsed()
|
||||
@ -298,17 +303,21 @@ internal class CordaRPCOpsImpl(
|
||||
override fun shutdown() = terminate(false)
|
||||
|
||||
override fun terminate(drainPendingFlows: Boolean) {
|
||||
|
||||
if (drainPendingFlows) {
|
||||
logger.info("Waiting for pending flows to complete before shutting down.")
|
||||
setFlowsDrainingModeEnabled(true)
|
||||
drainingShutdownHook.set(pendingFlowsCount().updates.doOnNext {(completed, total) ->
|
||||
logger.info("Pending flows progress before shutdown: $completed / $total.")
|
||||
}.doOnCompleted { setPersistentDrainingModeProperty(false, false) }.doOnCompleted(::cancelDrainingShutdownHook).doOnCompleted { logger.info("No more pending flows to drain. Shutting down.") }.doOnCompleted(shutdownNode::invoke).subscribe({
|
||||
// Nothing to do on each update here, only completion matters.
|
||||
}, { error ->
|
||||
logger.error("Error while waiting for pending flows to drain in preparation for shutdown. Cause was: ${error.message}", error)
|
||||
}))
|
||||
val subscription = pendingFlowsCount()
|
||||
.updates
|
||||
.doOnNext { (completed, total) -> logger.info("Pending flows progress before shutdown: $completed / $total.") }
|
||||
.doOnCompleted { setPersistentDrainingModeProperty(enabled = false, propagateChange = false) }
|
||||
.doOnCompleted(::cancelDrainingShutdownHook)
|
||||
.doOnCompleted { logger.info("No more pending flows to drain. Shutting down.") }
|
||||
.doOnCompleted(shutdownNode::invoke)
|
||||
.subscribe(
|
||||
{ }, // Nothing to do on each update here, only completion matters.
|
||||
{ error -> logger.error("Error while waiting for pending flows to drain in preparation for shutdown. Cause was: ${error.message}", error) }
|
||||
)
|
||||
drainingShutdownHook.set(subscription)
|
||||
} else {
|
||||
shutdownNode.invoke()
|
||||
}
|
||||
@ -317,19 +326,19 @@ internal class CordaRPCOpsImpl(
|
||||
override fun isWaitingForShutdown() = drainingShutdownHook.get() != null
|
||||
|
||||
override fun close() {
|
||||
|
||||
cancelDrainingShutdownHook()
|
||||
}
|
||||
|
||||
private fun cancelDrainingShutdownHook() {
|
||||
|
||||
drainingShutdownHook.getAndSet(null)?.let {
|
||||
it.unsubscribe()
|
||||
logger.info("Cancelled draining shutdown hook.")
|
||||
}
|
||||
}
|
||||
|
||||
private fun setPersistentDrainingModeProperty(enabled: Boolean, propagateChange: Boolean) = services.nodeProperties.flowsDrainingMode.setEnabled(enabled, propagateChange)
|
||||
private fun setPersistentDrainingModeProperty(enabled: Boolean, propagateChange: Boolean) {
|
||||
services.nodeProperties.flowsDrainingMode.setEnabled(enabled, propagateChange)
|
||||
}
|
||||
|
||||
private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo {
|
||||
return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context)
|
||||
|
@ -2,6 +2,7 @@ package net.corda.node.internal.rpc.proxies
|
||||
|
||||
import net.corda.client.rpc.PermissionException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.node.internal.InvocationHandlerTemplate
|
||||
import net.corda.node.services.rpc.RpcAuthContext
|
||||
@ -12,7 +13,7 @@ import java.lang.reflect.Proxy
|
||||
/**
|
||||
* Implementation of [CordaRPCOps] that checks authorisation.
|
||||
*/
|
||||
internal class AuthenticatedRpcOpsProxy(private val delegate: CordaRPCOps) : CordaRPCOps by proxy(delegate, ::rpcContext) {
|
||||
internal class AuthenticatedRpcOpsProxy(private val delegate: InternalCordaRPCOps) : InternalCordaRPCOps by proxy(delegate, ::rpcContext) {
|
||||
/**
|
||||
* Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed
|
||||
* to be present.
|
||||
@ -32,13 +33,13 @@ internal class AuthenticatedRpcOpsProxy(private val delegate: CordaRPCOps) : Cor
|
||||
}
|
||||
|
||||
private companion object {
|
||||
private fun proxy(delegate: CordaRPCOps, context: () -> RpcAuthContext): CordaRPCOps {
|
||||
private fun proxy(delegate: InternalCordaRPCOps, context: () -> RpcAuthContext): InternalCordaRPCOps {
|
||||
val handler = PermissionsEnforcingInvocationHandler(delegate, context)
|
||||
return Proxy.newProxyInstance(delegate::class.java.classLoader, arrayOf(CordaRPCOps::class.java), handler) as CordaRPCOps
|
||||
return Proxy.newProxyInstance(delegate::class.java.classLoader, arrayOf(InternalCordaRPCOps::class.java), handler) as InternalCordaRPCOps
|
||||
}
|
||||
}
|
||||
|
||||
private class PermissionsEnforcingInvocationHandler(override val delegate: CordaRPCOps, private val context: () -> RpcAuthContext) : InvocationHandlerTemplate {
|
||||
private class PermissionsEnforcingInvocationHandler(override val delegate: InternalCordaRPCOps, private val context: () -> RpcAuthContext) : InvocationHandlerTemplate {
|
||||
override fun invoke(proxy: Any, method: Method, arguments: Array<out Any?>?) = guard(method.name, context) { super.invoke(proxy, method, arguments) }
|
||||
}
|
||||
}
|
||||
|
@ -1,22 +1,14 @@
|
||||
package net.corda.node.internal.rpc.proxies
|
||||
|
||||
import net.corda.core.ClientRelevantError
|
||||
import net.corda.core.CordaException
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.*
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.TransactionVerificationException
|
||||
import net.corda.core.doOnError
|
||||
import net.corda.core.flows.IdentifiableException
|
||||
import net.corda.core.internal.concurrent.doOnError
|
||||
import net.corda.core.internal.concurrent.mapError
|
||||
import net.corda.core.internal.declaredField
|
||||
import net.corda.core.mapErrors
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.messaging.FlowHandle
|
||||
import net.corda.core.messaging.FlowHandleImpl
|
||||
import net.corda.core.messaging.FlowProgressHandle
|
||||
import net.corda.core.messaging.FlowProgressHandleImpl
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.core.messaging.*
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.internal.InvocationHandlerTemplate
|
||||
import net.corda.nodeapi.exceptions.InternalNodeException
|
||||
@ -25,7 +17,7 @@ import java.lang.reflect.Method
|
||||
import java.lang.reflect.Proxy.newProxyInstance
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
internal class ExceptionMaskingRpcOpsProxy(private val delegate: CordaRPCOps, doLog: Boolean) : CordaRPCOps by proxy(delegate, doLog) {
|
||||
internal class ExceptionMaskingRpcOpsProxy(private val delegate: InternalCordaRPCOps, doLog: Boolean) : InternalCordaRPCOps by proxy(delegate, doLog) {
|
||||
private companion object {
|
||||
private val logger = loggerFor<ExceptionMaskingRpcOpsProxy>()
|
||||
|
||||
@ -34,13 +26,13 @@ internal class ExceptionMaskingRpcOpsProxy(private val delegate: CordaRPCOps, do
|
||||
TransactionVerificationException::class
|
||||
)
|
||||
|
||||
private fun proxy(delegate: CordaRPCOps, doLog: Boolean): CordaRPCOps {
|
||||
private fun proxy(delegate: InternalCordaRPCOps, doLog: Boolean): InternalCordaRPCOps {
|
||||
val handler = ErrorObfuscatingInvocationHandler(delegate, whitelist, doLog)
|
||||
return newProxyInstance(delegate::class.java.classLoader, arrayOf(CordaRPCOps::class.java), handler) as CordaRPCOps
|
||||
return newProxyInstance(delegate::class.java.classLoader, arrayOf(InternalCordaRPCOps::class.java), handler) as InternalCordaRPCOps
|
||||
}
|
||||
}
|
||||
|
||||
private class ErrorObfuscatingInvocationHandler(override val delegate: CordaRPCOps, private val whitelist: Set<KClass<*>>, private val doLog: Boolean) : InvocationHandlerTemplate {
|
||||
private class ErrorObfuscatingInvocationHandler(override val delegate: InternalCordaRPCOps, private val whitelist: Set<KClass<*>>, private val doLog: Boolean) : InvocationHandlerTemplate {
|
||||
override fun invoke(proxy: Any, method: Method, arguments: Array<out Any?>?): Any? {
|
||||
try {
|
||||
val result = super.invoke(proxy, method, arguments)
|
||||
|
@ -1,18 +1,13 @@
|
||||
package net.corda.node.internal.rpc.proxies
|
||||
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.CordaThrowable
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.doOnError
|
||||
import net.corda.core.internal.concurrent.doOnError
|
||||
import net.corda.core.internal.concurrent.mapError
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.core.mapErrors
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.messaging.FlowHandle
|
||||
import net.corda.core.messaging.FlowHandleImpl
|
||||
import net.corda.core.messaging.FlowProgressHandle
|
||||
import net.corda.core.messaging.FlowProgressHandleImpl
|
||||
import net.corda.core.messaging.*
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.internal.InvocationHandlerTemplate
|
||||
@ -20,17 +15,17 @@ import rx.Observable
|
||||
import java.lang.reflect.Method
|
||||
import java.lang.reflect.Proxy.newProxyInstance
|
||||
|
||||
internal class ExceptionSerialisingRpcOpsProxy(private val delegate: CordaRPCOps, doLog: Boolean) : CordaRPCOps by proxy(delegate, doLog) {
|
||||
internal class ExceptionSerialisingRpcOpsProxy(private val delegate: InternalCordaRPCOps, doLog: Boolean) : InternalCordaRPCOps by proxy(delegate, doLog) {
|
||||
private companion object {
|
||||
private val logger = loggerFor<ExceptionSerialisingRpcOpsProxy>()
|
||||
|
||||
private fun proxy(delegate: CordaRPCOps, doLog: Boolean): CordaRPCOps {
|
||||
private fun proxy(delegate: InternalCordaRPCOps, doLog: Boolean): InternalCordaRPCOps {
|
||||
val handler = ErrorSerialisingInvocationHandler(delegate, doLog)
|
||||
return newProxyInstance(delegate::class.java.classLoader, arrayOf(CordaRPCOps::class.java), handler) as CordaRPCOps
|
||||
return newProxyInstance(delegate::class.java.classLoader, arrayOf(InternalCordaRPCOps::class.java), handler) as InternalCordaRPCOps
|
||||
}
|
||||
}
|
||||
|
||||
private class ErrorSerialisingInvocationHandler(override val delegate: CordaRPCOps, private val doLog: Boolean) : InvocationHandlerTemplate {
|
||||
private class ErrorSerialisingInvocationHandler(override val delegate: InternalCordaRPCOps, private val doLog: Boolean) : InvocationHandlerTemplate {
|
||||
override fun invoke(proxy: Any, method: Method, arguments: Array<out Any?>?): Any? {
|
||||
try {
|
||||
val result = super.invoke(proxy, method, arguments)
|
||||
|
@ -1,6 +1,7 @@
|
||||
package net.corda.node.internal.rpc.proxies
|
||||
|
||||
import net.corda.core.internal.executeWithThreadContextClassLoader
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.node.internal.InvocationHandlerTemplate
|
||||
import java.lang.reflect.Method
|
||||
@ -12,15 +13,15 @@ import java.lang.reflect.Proxy
|
||||
* without sensible fallbacks to the classloader of the current instance.
|
||||
* If clients' CorDapps use one of these libraries, this temporary adjustment can ensure that any provided classes from these libraries will be available during RPC calls.
|
||||
*/
|
||||
internal class ThreadContextAdjustingRpcOpsProxy(private val delegate: CordaRPCOps, private val classLoader: ClassLoader): CordaRPCOps by proxy(delegate, classLoader) {
|
||||
internal class ThreadContextAdjustingRpcOpsProxy(private val delegate: InternalCordaRPCOps, private val classLoader: ClassLoader): InternalCordaRPCOps by proxy(delegate, classLoader) {
|
||||
private companion object {
|
||||
private fun proxy(delegate: CordaRPCOps, classLoader: ClassLoader): CordaRPCOps {
|
||||
private fun proxy(delegate: InternalCordaRPCOps, classLoader: ClassLoader): InternalCordaRPCOps {
|
||||
val handler = ThreadContextAdjustingRpcOpsProxy.ThreadContextAdjustingInvocationHandler(delegate, classLoader)
|
||||
return Proxy.newProxyInstance(delegate::class.java.classLoader, arrayOf(CordaRPCOps::class.java), handler) as CordaRPCOps
|
||||
return Proxy.newProxyInstance(delegate::class.java.classLoader, arrayOf(InternalCordaRPCOps::class.java), handler) as InternalCordaRPCOps
|
||||
}
|
||||
}
|
||||
|
||||
private class ThreadContextAdjustingInvocationHandler(override val delegate: CordaRPCOps, private val classLoader: ClassLoader) : InvocationHandlerTemplate {
|
||||
private class ThreadContextAdjustingInvocationHandler(override val delegate: InternalCordaRPCOps, private val classLoader: ClassLoader) : InvocationHandlerTemplate {
|
||||
override fun invoke(proxy: Any, method: Method, arguments: Array<out Any?>?): Any? {
|
||||
return executeWithThreadContextClassLoader(this.classLoader) { super.invoke(proxy, method, arguments) }
|
||||
}
|
||||
|
@ -0,0 +1,314 @@
|
||||
package net.corda.node.services.rpc
|
||||
|
||||
import co.paralleluniverse.fibers.Stack
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
|
||||
import com.fasterxml.jackson.annotation.JsonFormat
|
||||
import com.fasterxml.jackson.annotation.JsonInclude
|
||||
import com.fasterxml.jackson.annotation.JsonInclude.Include
|
||||
import com.fasterxml.jackson.annotation.JsonValue
|
||||
import com.fasterxml.jackson.core.JsonGenerator
|
||||
import com.fasterxml.jackson.core.util.DefaultIndenter
|
||||
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter
|
||||
import com.fasterxml.jackson.databind.*
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule
|
||||
import com.fasterxml.jackson.databind.ser.BeanPropertyWriter
|
||||
import com.fasterxml.jackson.databind.ser.BeanSerializerModifier
|
||||
import net.corda.client.jackson.JacksonSupport
|
||||
import net.corda.client.jackson.internal.jsonObject
|
||||
import net.corda.core.context.InvocationOrigin
|
||||
import net.corda.core.contracts.Attachment
|
||||
import net.corda.core.contracts.ScheduledStateRef
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowInfo
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.internal.CheckpointSerializationContext
|
||||
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
|
||||
import net.corda.core.serialization.internal.checkpointDeserialize
|
||||
import net.corda.core.utilities.NonEmptySet
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.internal.NodeStartup
|
||||
import net.corda.node.services.api.CheckpointStorage
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.statemachine.*
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
|
||||
import net.corda.serialization.internal.withTokenContext
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.time.ZoneOffset.UTC
|
||||
import java.time.format.DateTimeFormatter
|
||||
import java.util.*
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.zip.ZipEntry
|
||||
import java.util.zip.ZipOutputStream
|
||||
|
||||
class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private val database: CordaPersistence, private val serviceHub: ServiceHubInternal) {
|
||||
companion object {
|
||||
private val TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss").withZone(UTC)
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
private val lock = AtomicInteger(0)
|
||||
|
||||
private lateinit var checkpointSerializationContext: CheckpointSerializationContext
|
||||
private lateinit var writer: ObjectWriter
|
||||
|
||||
fun start(tokenizableServices: List<Any>) {
|
||||
checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext(
|
||||
CheckpointSerializeAsTokenContextImpl(
|
||||
tokenizableServices,
|
||||
CheckpointSerializationDefaults.CHECKPOINT_SERIALIZER,
|
||||
CheckpointSerializationDefaults.CHECKPOINT_CONTEXT,
|
||||
serviceHub
|
||||
)
|
||||
)
|
||||
|
||||
val mapper = JacksonSupport.createNonRpcMapper()
|
||||
mapper.registerModule(SimpleModule().apply {
|
||||
setSerializerModifier(CheckpointDumperBeanModifier)
|
||||
addSerializer(FlowSessionImplSerializer)
|
||||
addSerializer(MapSerializer)
|
||||
addSerializer(AttachmentSerializer)
|
||||
setMixInAnnotation(FlowLogic::class.java, FlowLogicMixin::class.java)
|
||||
setMixInAnnotation(SessionId::class.java, SessionIdMixin::class.java)
|
||||
})
|
||||
val prettyPrinter = DefaultPrettyPrinter().apply {
|
||||
indentArraysWith(DefaultIndenter.SYSTEM_LINEFEED_INSTANCE)
|
||||
}
|
||||
writer = mapper.writer(prettyPrinter)
|
||||
}
|
||||
|
||||
fun dump() {
|
||||
val now = serviceHub.clock.instant()
|
||||
val file = serviceHub.configuration.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "checkpoints_dump-${TIME_FORMATTER.format(now)}.zip"
|
||||
try {
|
||||
if (lock.getAndIncrement() == 0 && !file.exists()) {
|
||||
database.transaction {
|
||||
checkpointStorage.getAllCheckpoints().use { stream ->
|
||||
ZipOutputStream(file.outputStream()).use { zip ->
|
||||
stream.forEach { (runId, serialisedCheckpoint) ->
|
||||
val checkpoint = serialisedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext)
|
||||
val json = checkpoint.toJson(runId.uuid, now)
|
||||
val jsonBytes = writer.writeValueAsBytes(json)
|
||||
zip.putNextEntry(ZipEntry("${json.flowLogicClass.simpleName}-${runId.uuid}.json"))
|
||||
zip.write(jsonBytes)
|
||||
zip.closeEntry()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.info("Flow dump already in progress, skipping current call")
|
||||
}
|
||||
} finally {
|
||||
lock.decrementAndGet()
|
||||
}
|
||||
}
|
||||
|
||||
private fun Checkpoint.toJson(id: UUID, now: Instant): CheckpointJson {
|
||||
val (fiber, flowLogic) = when (flowState) {
|
||||
is FlowState.Unstarted -> {
|
||||
null to flowState.frozenFlowLogic.checkpointDeserialize(context = checkpointSerializationContext)
|
||||
}
|
||||
is FlowState.Started -> {
|
||||
val fiber = flowState.frozenFiber.checkpointDeserialize(context = checkpointSerializationContext)
|
||||
fiber to fiber.logic
|
||||
}
|
||||
}
|
||||
|
||||
val flowCallStack = if (fiber != null) {
|
||||
// Poke into Quasar's stack and find the object references to the sub-flows so that we can correctly get the current progress
|
||||
// step for each sub-call.
|
||||
val stackObjects = fiber.declaredField<Stack>("stack").value.declaredField<Array<*>>("dataObject").value
|
||||
subFlowStack.map { subFlow ->
|
||||
val subFlowLogic = stackObjects.find(subFlow.flowClass::isInstance) as? FlowLogic<*>
|
||||
val currentStep = subFlowLogic?.progressTracker?.currentStep
|
||||
FlowCall(subFlow.flowClass, if (currentStep == ProgressTracker.UNSTARTED) null else currentStep?.label)
|
||||
}.reversed()
|
||||
} else {
|
||||
emptyList()
|
||||
}
|
||||
return CheckpointJson(
|
||||
id,
|
||||
flowLogic.javaClass,
|
||||
flowLogic,
|
||||
flowCallStack,
|
||||
(flowState as? FlowState.Started)?.flowIORequest?.toSuspendedOn(suspendedTimestamp(), now),
|
||||
invocationContext.origin.toOrigin(),
|
||||
ourIdentity,
|
||||
sessions.mapNotNull { it.value.toActiveSession(it.key) },
|
||||
errorState as? ErrorState.Errored
|
||||
)
|
||||
}
|
||||
|
||||
private fun Checkpoint.suspendedTimestamp(): Instant = invocationContext.trace.invocationId.timestamp
|
||||
|
||||
@Suppress("unused")
|
||||
private class FlowCall(val flowClass: Class<*>, val progressStep: String?)
|
||||
|
||||
@Suppress("unused")
|
||||
@JsonInclude(Include.NON_NULL)
|
||||
private class Origin(
|
||||
val rpc: String? = null,
|
||||
val peer: CordaX500Name? = null,
|
||||
val service: String? = null,
|
||||
val scheduled: ScheduledStateRef? = null,
|
||||
val shell: InvocationOrigin.Shell? = null
|
||||
)
|
||||
|
||||
private fun InvocationOrigin.toOrigin(): Origin {
|
||||
return when (this) {
|
||||
is InvocationOrigin.RPC -> Origin(rpc = actor.id.value)
|
||||
is InvocationOrigin.Peer -> Origin(peer = party)
|
||||
is InvocationOrigin.Service -> Origin(service = serviceClassName)
|
||||
is InvocationOrigin.Scheduled -> Origin(scheduled = scheduledState)
|
||||
is InvocationOrigin.Shell -> Origin(shell = this)
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("unused")
|
||||
private class CheckpointJson(
|
||||
val id: UUID,
|
||||
val flowLogicClass: Class<FlowLogic<*>>,
|
||||
val flowLogic: FlowLogic<*>,
|
||||
val flowCallStack: List<FlowCall>,
|
||||
val suspendedOn: SuspendedOn?,
|
||||
val origin: Origin,
|
||||
val ourIdentity: Party,
|
||||
val activeSessions: List<ActiveSession>,
|
||||
val errored: ErrorState.Errored?
|
||||
)
|
||||
|
||||
@Suppress("unused")
|
||||
@JsonInclude(Include.NON_NULL)
|
||||
private class SuspendedOn(
|
||||
val send: List<SendJson>? = null,
|
||||
val receive: NonEmptySet<FlowSession>? = null,
|
||||
val sendAndReceive: List<SendJson>? = null,
|
||||
val waitForLedgerCommit: SecureHash? = null,
|
||||
val waitForStateConsumption: Set<StateRef>? = null,
|
||||
val getFlowInfo: NonEmptySet<FlowSession>? = null,
|
||||
val sleepTill: Instant? = null,
|
||||
val waitForSessionConfirmations: FlowIORequest.WaitForSessionConfirmations? = null,
|
||||
val customOperation: FlowIORequest.ExecuteAsyncOperation<*>? = null,
|
||||
val forceCheckpoint: FlowIORequest.ForceCheckpoint? = null
|
||||
) {
|
||||
@JsonFormat(pattern ="yyyy-MM-dd'T'HH:mm:ss", timezone = "UTC")
|
||||
lateinit var suspendedTimestamp: Instant
|
||||
var secondsSpentWaiting: Long = 0
|
||||
}
|
||||
|
||||
@Suppress("unused")
|
||||
private class SendJson(val session: FlowSession, val sentPayloadType: Class<*>, val sentPayload: Any)
|
||||
|
||||
private fun FlowIORequest<*>.toSuspendedOn(suspendedTimestamp: Instant, now: Instant): SuspendedOn {
|
||||
fun Map<FlowSession, SerializedBytes<Any>>.toJson(): List<SendJson> {
|
||||
return map {
|
||||
val payload = it.value.deserialize()
|
||||
SendJson(it.key, payload.javaClass, payload)
|
||||
}
|
||||
}
|
||||
return when (this) {
|
||||
is FlowIORequest.Send -> SuspendedOn(send = sessionToMessage.toJson())
|
||||
is FlowIORequest.Receive -> SuspendedOn(receive = sessions)
|
||||
is FlowIORequest.SendAndReceive -> SuspendedOn(sendAndReceive = sessionToMessage.toJson())
|
||||
is FlowIORequest.WaitForLedgerCommit -> SuspendedOn(waitForLedgerCommit = hash)
|
||||
is FlowIORequest.GetFlowInfo -> SuspendedOn(getFlowInfo = sessions)
|
||||
is FlowIORequest.Sleep -> SuspendedOn(sleepTill = wakeUpAfter)
|
||||
is FlowIORequest.WaitForSessionConfirmations -> SuspendedOn(waitForSessionConfirmations = this)
|
||||
is FlowIORequest.ForceCheckpoint -> SuspendedOn(forceCheckpoint = this)
|
||||
is FlowIORequest.ExecuteAsyncOperation -> {
|
||||
when (operation) {
|
||||
is WaitForStateConsumption -> SuspendedOn(waitForStateConsumption = (operation as WaitForStateConsumption).stateRefs)
|
||||
else -> SuspendedOn(customOperation = this)
|
||||
}
|
||||
}
|
||||
}.also {
|
||||
it.suspendedTimestamp = suspendedTimestamp
|
||||
it.secondsSpentWaiting = TimeUnit.MILLISECONDS.toSeconds(Duration.between(suspendedTimestamp, now).toMillis())
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("unused")
|
||||
private class ActiveSession(
|
||||
val peer: Party,
|
||||
val ourSessionId: SessionId,
|
||||
val receivedMessages: List<DataSessionMessage>,
|
||||
val errors: List<FlowError>,
|
||||
val peerFlowInfo: FlowInfo,
|
||||
val peerSessionId: SessionId?
|
||||
)
|
||||
|
||||
private fun SessionState.toActiveSession(sessionId: SessionId): ActiveSession? {
|
||||
return if (this is SessionState.Initiated) {
|
||||
val peerSessionId = (initiatedState as? InitiatedSessionState.Live)?.peerSinkSessionId
|
||||
ActiveSession(peerParty, sessionId, receivedMessages, errors, peerFlowInfo, peerSessionId)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("unused")
|
||||
private interface SessionIdMixin {
|
||||
@get:JsonValue
|
||||
val toLong: Long
|
||||
}
|
||||
|
||||
@JsonAutoDetect(getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
|
||||
private interface FlowLogicMixin
|
||||
|
||||
private object CheckpointDumperBeanModifier : BeanSerializerModifier() {
|
||||
override fun changeProperties(config: SerializationConfig,
|
||||
beanDesc: BeanDescription,
|
||||
beanProperties: MutableList<BeanPropertyWriter>): MutableList<BeanPropertyWriter> {
|
||||
// Remove references to any node singletons
|
||||
beanProperties.removeIf { it.type.isTypeOrSubTypeOf(SerializeAsToken::class.java) }
|
||||
if (FlowLogic::class.java.isAssignableFrom(beanDesc.beanClass)) {
|
||||
beanProperties.removeIf {
|
||||
it.type.isTypeOrSubTypeOf(ProgressTracker::class.java) || it.name == "_stateMachine" || it.name == "deprecatedPartySessionMap"
|
||||
}
|
||||
}
|
||||
return beanProperties
|
||||
}
|
||||
}
|
||||
|
||||
private object FlowSessionImplSerializer : JsonSerializer<FlowSessionImpl>() {
|
||||
override fun serialize(value: FlowSessionImpl, gen: JsonGenerator, serializers: SerializerProvider) {
|
||||
gen.jsonObject {
|
||||
writeObjectField("peer", value.counterparty)
|
||||
writeObjectField("ourSessionId", value.sourceSessionId)
|
||||
}
|
||||
}
|
||||
override fun handledType(): Class<FlowSessionImpl> = FlowSessionImpl::class.java
|
||||
}
|
||||
|
||||
private object AttachmentSerializer : JsonSerializer<Attachment>() {
|
||||
override fun serialize(value: Attachment, gen: JsonGenerator, serializers: SerializerProvider) = gen.writeObject(value.id)
|
||||
override fun handledType(): Class<Attachment> = Attachment::class.java
|
||||
}
|
||||
|
||||
private object MapSerializer : JsonSerializer<Map<Any, Any>>() {
|
||||
override fun serialize(map: Map<Any, Any>, gen: JsonGenerator, serializers: SerializerProvider) {
|
||||
gen.writeStartArray(map.size)
|
||||
map.forEach { key, value ->
|
||||
gen.jsonObject {
|
||||
writeObjectField("key", key)
|
||||
writeObjectField("value", value)
|
||||
}
|
||||
}
|
||||
gen.writeEndArray()
|
||||
}
|
||||
override fun handledType(): Class<Map<Any, Any>> = uncheckedCast(Map::class.java)
|
||||
}
|
||||
}
|
@ -3,7 +3,7 @@ package net.corda.node.internal.rpc.proxies
|
||||
import com.nhaarman.mockito_kotlin.any
|
||||
import com.nhaarman.mockito_kotlin.mock
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.Test
|
||||
import org.mockito.Mockito.`when`
|
||||
@ -15,7 +15,7 @@ class ThreadContextAdjustingRpcOpsProxyTest {
|
||||
private val proxy = ThreadContextAdjustingRpcOpsProxy(coreOps, mockClassloader)
|
||||
|
||||
|
||||
private interface InstrumentedCordaRPCOps: CordaRPCOps {
|
||||
private interface InstrumentedCordaRPCOps: InternalCordaRPCOps {
|
||||
fun getThreadContextClassLoader(): ClassLoader = Thread.currentThread().contextClassLoader
|
||||
}
|
||||
|
||||
|
@ -1,19 +1,28 @@
|
||||
package net.corda.tools.shell
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.fasterxml.jackson.databind.type.TypeFactory
|
||||
import com.google.common.io.Files
|
||||
import com.jcraft.jsch.ChannelExec
|
||||
import com.jcraft.jsch.JSch
|
||||
import com.nhaarman.mockito_kotlin.any
|
||||
import com.nhaarman.mockito_kotlin.doAnswer
|
||||
import com.nhaarman.mockito_kotlin.mock
|
||||
import net.corda.client.jackson.JacksonSupport
|
||||
import net.corda.client.rpc.RPCException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.list
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.core.messaging.ClientRpcSslOptions
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.internal.NodeStartup
|
||||
import net.corda.node.services.Permissions
|
||||
import net.corda.node.services.Permissions.Companion.all
|
||||
import net.corda.node.services.config.shell.toShellConfig
|
||||
@ -22,6 +31,8 @@ import net.corda.node.utilities.saveToKeyStore
|
||||
import net.corda.node.utilities.saveToTrustStore
|
||||
import net.corda.nodeapi.BrokerRpcSslOptions
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.driver.internal.NodeHandleInternal
|
||||
@ -33,11 +44,15 @@ import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.bouncycastle.util.io.Streams
|
||||
import org.crsh.text.RenderPrintWriter
|
||||
import org.junit.Before
|
||||
import org.junit.Ignore
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import java.util.zip.ZipFile
|
||||
import javax.security.auth.x500.X500Principal
|
||||
import kotlin.test.assertNotEquals
|
||||
import kotlin.test.assertNotNull
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class InteractiveShellIntegrationTest {
|
||||
@ -47,6 +62,13 @@ class InteractiveShellIntegrationTest {
|
||||
|
||||
private val testName = X500Principal("CN=Test,O=R3 Ltd,L=London,C=GB")
|
||||
|
||||
private lateinit var inputObjectMapper: ObjectMapper
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
inputObjectMapper = objectMapperWithClassLoader(InteractiveShell.getCordappsClassloader())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `shell should not log in with invalid credentials`() {
|
||||
val user = User("u", "p", setOf())
|
||||
@ -372,6 +394,53 @@ class InteractiveShellIntegrationTest {
|
||||
}
|
||||
assertThat(successful).isTrue()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `dumpCheckpoints creates zip with json file for suspended flow`() {
|
||||
val user = User("u", "p", setOf(all()))
|
||||
driver(DriverParameters(notarySpecs = emptyList())) {
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user), startInSameProcess = true).getOrThrow()
|
||||
val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user), startInSameProcess = true).getOrThrow()
|
||||
bobNode.stop()
|
||||
|
||||
// create logs directory since the driver is not creating it
|
||||
(aliceNode.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).toFile().mkdir()
|
||||
|
||||
val conf = ShellConfiguration(commandsDirectory = Files.createTempDir().toPath(),
|
||||
user = user.username, password = user.password,
|
||||
hostAndPort = aliceNode.rpcAddress)
|
||||
InteractiveShell.startShell(conf)
|
||||
// setup and configure some mocks required by InteractiveShell.runFlowByNameFragment()
|
||||
val output = mock<RenderPrintWriter> {
|
||||
on { println(any<String>()) } doAnswer {
|
||||
val line = it.arguments[0]
|
||||
assertNotEquals("Please try 'man run' to learn what syntax is acceptable", line)
|
||||
}
|
||||
}
|
||||
|
||||
aliceNode.rpc.startFlow(::SendFlow, bobNode.nodeInfo.singleIdentity())
|
||||
|
||||
InteractiveShell.runRPCFromString(
|
||||
listOf("dumpCheckpoints"), output, mock(), aliceNode.rpc as InternalCordaRPCOps, inputObjectMapper)
|
||||
|
||||
// assert that the checkpoint dump zip has been created
|
||||
val zip = (aliceNode.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).list()
|
||||
.find { it.toString().contains("checkpoints_dump-") }
|
||||
assertNotNull(zip)
|
||||
// assert that a json file has been created for the suspended flow
|
||||
val json = ZipFile((zip!!).toFile()).entries().asSequence()
|
||||
.find { it.name.contains(SendFlow::class.simpleName!!) }
|
||||
assertNotNull(json)
|
||||
}
|
||||
}
|
||||
|
||||
private fun objectMapperWithClassLoader(classLoader: ClassLoader?): ObjectMapper {
|
||||
val objectMapper = JacksonSupport.createNonRpcMapper()
|
||||
val tf = TypeFactory.defaultInstance().withClassLoader(classLoader)
|
||||
objectMapper.typeFactory = tf
|
||||
|
||||
return objectMapper
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("UNUSED")
|
||||
@ -399,4 +468,25 @@ class BurbleFlow : FlowLogic<Unit>() {
|
||||
override fun call() {
|
||||
println("NO OP! (Burble)")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
class SendFlow(private val party: Party) : FlowLogic<Unit>() {
|
||||
override val progressTracker = ProgressTracker()
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
initiateFlow(party).sendAndReceive<String>("hi").unwrap { it }
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(SendFlow::class)
|
||||
class ReceiveFlow(private val session: FlowSession) : FlowLogic<Unit>() {
|
||||
override val progressTracker = ProgressTracker()
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
session.receive<String>().unwrap { it }
|
||||
session.send("hi")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@ package net.corda.tools.shell;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import net.corda.client.jackson.StringToMethodCallParser;
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps;
|
||||
import net.corda.core.messaging.CordaRPCOps;
|
||||
import org.crsh.cli.Argument;
|
||||
import org.crsh.cli.Command;
|
||||
@ -13,10 +14,7 @@ import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
|
||||
import static java.util.Comparator.comparing;
|
||||
|
||||
@ -38,21 +36,31 @@ public class RunShellCommand extends InteractiveShellCommand {
|
||||
public Object main(InvocationContext<Map> context,
|
||||
@Usage("The command to run") @Argument(unquote = false) List<String> command) {
|
||||
logger.info("Executing command \"run {}\",", (command != null) ? String.join(" ", command) : "<no arguments>");
|
||||
StringToMethodCallParser<CordaRPCOps> parser = new StringToMethodCallParser<>(CordaRPCOps.class, objectMapper(InteractiveShell.getCordappsClassloader()));
|
||||
|
||||
if (command == null) {
|
||||
emitHelp(context, parser);
|
||||
emitHelp(context);
|
||||
return null;
|
||||
}
|
||||
|
||||
return InteractiveShell.runRPCFromString(command, out, context, ops(), objectMapper(InteractiveShell.getCordappsClassloader()));
|
||||
}
|
||||
|
||||
private void emitHelp(InvocationContext<Map> context, StringToMethodCallParser<CordaRPCOps> parser) {
|
||||
// Sends data down the pipeline about what commands are available. CRaSH will render it nicely.
|
||||
// Each element we emit is a map of column -> content.
|
||||
Set<Map.Entry<String, String>> entries = parser.getAvailableCommands().entrySet();
|
||||
List<Map.Entry<String, String>> entryList = new ArrayList<>(entries);
|
||||
private void emitHelp(InvocationContext<Map> context) {
|
||||
// to handle the lack of working inheritance in [StringToMethodCallParser] two parsers are used
|
||||
StringToMethodCallParser<CordaRPCOps> cordaRpcOpsParser =
|
||||
new StringToMethodCallParser<>(
|
||||
CordaRPCOps.class, objectMapper(InteractiveShell.getCordappsClassloader()));
|
||||
StringToMethodCallParser<InternalCordaRPCOps> internalCordaRpcOpsParser =
|
||||
new StringToMethodCallParser<>(
|
||||
InternalCordaRPCOps.class, objectMapper(InteractiveShell.getCordappsClassloader()));
|
||||
|
||||
// Sends data down the pipeline about what commands are available. CRaSH will render it nicely.
|
||||
// Each element we emit is a map of column -> content.
|
||||
Set<Map.Entry<String, String>> entries = cordaRpcOpsParser.getAvailableCommands().entrySet();
|
||||
Set<Map.Entry<String, String>> internalEntries = internalCordaRpcOpsParser.getAvailableCommands().entrySet();
|
||||
Set<Map.Entry<String, String>> entrySet = new HashSet<>(entries);
|
||||
entrySet.addAll(internalEntries);
|
||||
List<Map.Entry<String, String>> entryList = new ArrayList<>(entrySet);
|
||||
entryList.sort(comparing(Map.Entry::getKey));
|
||||
for (Map.Entry<String, String> entry : entryList) {
|
||||
// Skip these entries as they aren't really interesting for the user.
|
||||
|
@ -1,13 +1,13 @@
|
||||
package net.corda.tools.shell
|
||||
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
|
||||
import org.crsh.auth.AuthInfo
|
||||
import org.crsh.auth.AuthenticationPlugin
|
||||
import org.crsh.plugin.CRaSHPlugin
|
||||
|
||||
class CordaAuthenticationPlugin(private val rpcOps: (username: String, credential: String) -> CordaRPCOps) : CRaSHPlugin<AuthenticationPlugin<String>>(), AuthenticationPlugin<String> {
|
||||
class CordaAuthenticationPlugin(private val rpcOps: (username: String, credential: String) -> InternalCordaRPCOps) : CRaSHPlugin<AuthenticationPlugin<String>>(), AuthenticationPlugin<String> {
|
||||
|
||||
companion object {
|
||||
private val logger = loggerFor<CordaAuthenticationPlugin>()
|
||||
|
@ -1,12 +1,12 @@
|
||||
package net.corda.tools.shell
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.tools.shell.InteractiveShell.createYamlInputMapper
|
||||
import net.corda.tools.shell.utlities.ANSIProgressRenderer
|
||||
import org.crsh.auth.AuthInfo
|
||||
|
||||
class CordaSSHAuthInfo(val successful: Boolean, val rpcOps: CordaRPCOps, val ansiProgressRenderer: ANSIProgressRenderer? = null, val isSsh: Boolean = false) : AuthInfo {
|
||||
class CordaSSHAuthInfo(val successful: Boolean, val rpcOps: InternalCordaRPCOps, val ansiProgressRenderer: ANSIProgressRenderer? = null, val isSsh: Boolean = false) : AuthInfo {
|
||||
override fun isSuccessful(): Boolean = successful
|
||||
|
||||
val yamlInputMapper: ObjectMapper by lazy {
|
||||
|
@ -23,6 +23,7 @@ import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.core.internal.concurrent.doneFuture
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.core.messaging.*
|
||||
import net.corda.tools.shell.utlities.ANSIProgressRenderer
|
||||
import net.corda.tools.shell.utlities.StdoutANSIProgressRenderer
|
||||
@ -73,8 +74,8 @@ import kotlin.concurrent.thread
|
||||
|
||||
object InteractiveShell {
|
||||
private val log = LoggerFactory.getLogger(javaClass)
|
||||
private lateinit var rpcOps: (username: String, password: String) -> CordaRPCOps
|
||||
private lateinit var ops: CordaRPCOps
|
||||
private lateinit var rpcOps: (username: String, password: String) -> InternalCordaRPCOps
|
||||
private lateinit var ops: InternalCordaRPCOps
|
||||
private lateinit var rpcConn: AutoCloseable
|
||||
private var shell: Shell? = null
|
||||
private var classLoader: ClassLoader? = null
|
||||
@ -104,7 +105,7 @@ object InteractiveShell {
|
||||
classLoader = classLoader)
|
||||
val connection = client.start(username, password)
|
||||
rpcConn = connection
|
||||
connection.proxy
|
||||
connection.proxy as InternalCordaRPCOps
|
||||
}
|
||||
}
|
||||
_startShell(configuration, classLoader)
|
||||
@ -488,7 +489,7 @@ object InteractiveShell {
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>, cordaRPCOps: CordaRPCOps,
|
||||
fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>, cordaRPCOps: InternalCordaRPCOps,
|
||||
inputObjectMapper: ObjectMapper): Any? {
|
||||
val cmd = input.joinToString(" ").trim { it <= ' ' }
|
||||
if (cmd.startsWith("startflow", ignoreCase = true)) {
|
||||
@ -504,7 +505,7 @@ object InteractiveShell {
|
||||
var result: Any? = null
|
||||
try {
|
||||
InputStreamSerializer.invokeContext = context
|
||||
val parser = StringToMethodCallParser(CordaRPCOps::class.java, inputObjectMapper)
|
||||
val parser = StringToMethodCallParser(InternalCordaRPCOps::class.java, inputObjectMapper)
|
||||
val call = parser.parse(cordaRPCOps, cmd)
|
||||
result = call.call()
|
||||
if (result != null && result !== kotlin.Unit && result !is Void) {
|
||||
|
@ -1,20 +1,20 @@
|
||||
package net.corda.tools.shell
|
||||
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.lang.reflect.Proxy
|
||||
|
||||
fun makeRPCOps(getCordaRPCOps: (username: String, credential: String) -> CordaRPCOps, username: String, credential: String): CordaRPCOps {
|
||||
val cordaRPCOps: CordaRPCOps by lazy {
|
||||
fun makeRPCOps(getCordaRPCOps: (username: String, credential: String) -> InternalCordaRPCOps, username: String, credential: String): InternalCordaRPCOps {
|
||||
val cordaRPCOps: InternalCordaRPCOps by lazy {
|
||||
getCordaRPCOps(username, credential)
|
||||
}
|
||||
|
||||
return Proxy.newProxyInstance(CordaRPCOps::class.java.classLoader, arrayOf(CordaRPCOps::class.java), { _, method, args ->
|
||||
return Proxy.newProxyInstance(InternalCordaRPCOps::class.java.classLoader, arrayOf(InternalCordaRPCOps::class.java)) { _, method, args ->
|
||||
try {
|
||||
method.invoke(cordaRPCOps, *(args ?: arrayOf()))
|
||||
} catch (e: InvocationTargetException) {
|
||||
// Unpack exception.
|
||||
throw e.targetException
|
||||
}
|
||||
}) as CordaRPCOps
|
||||
} as InternalCordaRPCOps
|
||||
}
|
||||
|
@ -18,7 +18,7 @@ import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.core.messaging.FlowProgressHandleImpl
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
@ -41,7 +41,7 @@ import kotlin.test.assertFailsWith
|
||||
|
||||
class InteractiveShellTest {
|
||||
lateinit var inputObjectMapper: ObjectMapper
|
||||
lateinit var cordaRpcOps: CordaRPCOps
|
||||
lateinit var cordaRpcOps: InternalCordaRPCOps
|
||||
lateinit var invocationContext: InvocationContext<Map<Any, Any>>
|
||||
lateinit var printWriter: RenderPrintWriter
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user