ENT-6421 Public version of FlowManagerRPCOps (#7006)

A public version of `FlowManagerRPCOps` which does not live in an
internal package has been added. This new interface shares the same name
as the internal one.

Because of the name sharing, the internal version has been
`@Deprecated`.

`FlowManagerRPCOpsImpl` implements both the new and old interfaces. This
 allows for backwards compatibility, allowing old shells or clients to
 call the old interface on newer nodes without breaking.
This commit is contained in:
Dan Newton 2022-01-07 16:40:29 +00:00 committed by GitHub
parent df5604874b
commit 044202550d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 118 additions and 23 deletions

View File

@ -5,13 +5,20 @@ import net.corda.core.messaging.RPCOps
/**
* RPC operations to perform operations related to flows including management of associated persistent states like checkpoints.
*/
@Deprecated(
"A public version of this interface has been exposed that should be interacted with using the MultiRPCClient",
ReplaceWith("net.corda.core.messaging.flows.FlowManagerRPCOps")
)
interface FlowManagerRPCOps : RPCOps {
/**
* Dump all the current flow checkpoints as JSON into a zip file in the node's log directory.
*/
fun dumpCheckpoints()
/** Dump all the current flow checkpoints, alongside with the node's main jar, all CorDapps and driver jars
* into a zip file in the node's log directory. */
/**
* Dump all the current flow checkpoints, alongside with the node's main jar, all CorDapps and driver jars into a zip file in the node's
* log directory.
*/
fun debugCheckpoints()
}

View File

@ -0,0 +1,20 @@
package net.corda.core.messaging.flows
import net.corda.core.messaging.RPCOps
/**
* RPC operations to perform operations related to flows including management of associated persistent states like checkpoints.
*/
interface FlowManagerRPCOps : RPCOps {
/**
* Dump all the current flow checkpoints as JSON into a zip file in the node's log directory.
*/
fun dumpCheckpoints()
/**
* Dump all the current flow checkpoints, alongside with the node's main jar, all CorDapps and driver jars into a zip file in the node's
* log directory.
*/
fun debugCheckpoints()
}

View File

@ -0,0 +1,67 @@
@file:Suppress("DEPRECATION")
package net.corda.node.services.messaging
import net.corda.client.rpc.ext.MultiRPCClient
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.internal.isRegularFile
import net.corda.core.internal.list
import net.corda.core.messaging.flows.FlowManagerRPCOps
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.internal.NodeStartup
import net.corda.node.services.Permissions
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import org.junit.Test
import kotlin.test.assertNotNull
import net.corda.core.internal.messaging.FlowManagerRPCOps as InternalFlowManagerRPCOps
class FlowManagerRPCOpsTest {
@Test(timeout = 300_000)
fun `net_corda_core_internal_messaging_FlowManagerRPCOps can be accessed using the MultiRPCClient`() {
val user = User("user", "password", setOf(Permissions.all()))
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val client = MultiRPCClient(nodeAHandle.rpcAddress, InternalFlowManagerRPCOps::class.java, user.username, user.password)
val logDirPath = nodeAHandle.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME
logDirPath.createDirectories()
client.use {
val rpcOps = it.start().getOrThrow(20.seconds).proxy
rpcOps.dumpCheckpoints()
it.stop()
}
assertNotNull(logDirPath.list().singleOrNull { it.isRegularFile() })
}
}
@Test(timeout = 300_000)
fun `net_corda_core_messaging_flows_FlowManagerRPCOps can be accessed using the MultiRPCClient`() {
val user = User("user", "password", setOf(Permissions.all()))
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val client = MultiRPCClient(nodeAHandle.rpcAddress, FlowManagerRPCOps::class.java, user.username, user.password)
val logDirPath = nodeAHandle.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME
logDirPath.createDirectories()
client.use {
val rpcOps = it.start().getOrThrow(20.seconds).proxy
rpcOps.dumpCheckpoints()
it.stop()
}
assertNotNull(logDirPath.list().singleOrNull { it.isRegularFile() })
}
}
}

View File

@ -39,13 +39,13 @@ import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.div
import net.corda.core.internal.messaging.AttachmentTrustInfoRPCOps
import net.corda.core.internal.messaging.FlowManagerRPCOps
import net.corda.core.internal.notary.NotaryService
import net.corda.core.internal.rootMessage
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.ClientRpcSslOptions
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.flows.FlowManagerRPCOps
import net.corda.core.node.AppServiceHub
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
@ -179,7 +179,8 @@ import java.sql.Savepoint
import java.time.Clock
import java.time.Duration
import java.time.format.DateTimeParseException
import java.util.*
import java.util.ArrayList
import java.util.Properties
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
@ -404,23 +405,21 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
}
/** The implementation of the [RPCOps] interfaces used by this node. */
@Suppress("DEPRECATION")
open fun makeRPCOps(cordappLoader: CordappLoader): List<RPCOps> {
val cordaRPCOpsImpl = Pair(CordaRPCOps::class.java, CordaRPCOpsImpl(
services,
smm,
flowStarter
) {
shutdownExecutor.submit(::stop)
}.also { it.closeOnStop() })
val cordaRPCOps = CordaRPCOpsImpl(services, smm, flowStarter) { shutdownExecutor.submit(::stop) }
cordaRPCOps.closeOnStop()
val flowManagerRPCOps = FlowManagerRPCOpsImpl(checkpointDumper)
val attachmentTrustInfoRPCOps = AttachmentTrustInfoRPCOpsImpl(services.attachmentTrustCalculator)
val checkpointRPCOpsImpl = Pair(FlowManagerRPCOps::class.java, FlowManagerRPCOpsImpl(checkpointDumper))
val attachmentTrustInfoRPCOps = Pair(AttachmentTrustInfoRPCOps::class.java, AttachmentTrustInfoRPCOpsImpl(services.attachmentTrustCalculator))
return listOf(cordaRPCOpsImpl, checkpointRPCOpsImpl, attachmentTrustInfoRPCOps).map { rpcOpsImplPair ->
return listOf(
CordaRPCOps::class.java to cordaRPCOps,
FlowManagerRPCOps::class.java to flowManagerRPCOps,
net.corda.core.internal.messaging.FlowManagerRPCOps::class.java to flowManagerRPCOps,
AttachmentTrustInfoRPCOps::class.java to attachmentTrustInfoRPCOps
).map { (targetInterface, implementation) ->
// Mind that order of proxies is important
val targetInterface = rpcOpsImplPair.first
val stage1Proxy = AuthenticatedRpcOpsProxy.proxy(rpcOpsImplPair.second, targetInterface)
val stage1Proxy = AuthenticatedRpcOpsProxy.proxy(implementation, targetInterface)
val stage2Proxy = ThreadContextAdjustingRpcOpsProxy.proxy(stage1Proxy, targetInterface, cordappLoader.appClassLoader)
stage2Proxy

View File

@ -1,13 +1,15 @@
@file:Suppress("DEPRECATION")
package net.corda.node.internal.checkpoints
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.internal.messaging.FlowManagerRPCOps
import net.corda.core.messaging.flows.FlowManagerRPCOps
import net.corda.node.services.rpc.CheckpointDumperImpl
import net.corda.core.internal.messaging.FlowManagerRPCOps as InternalFlowManagerRPCOps
/**
* Implementation of [FlowManagerRPCOps]
*/
internal class FlowManagerRPCOpsImpl(private val checkpointDumper: CheckpointDumperImpl) : FlowManagerRPCOps {
internal class FlowManagerRPCOpsImpl(private val checkpointDumper: CheckpointDumperImpl) : FlowManagerRPCOps, InternalFlowManagerRPCOps {
override val protocolVersion: Int = PLATFORM_VERSION

View File

@ -1,6 +1,6 @@
package net.corda.testing.driver.internal.checkpoint
import net.corda.core.internal.messaging.FlowManagerRPCOps
import net.corda.core.messaging.flows.FlowManagerRPCOps
import net.corda.testing.driver.NodeHandle
object CheckpointRpcHelper {

View File

@ -1,6 +1,6 @@
package net.corda.tools.shell;
import net.corda.core.internal.messaging.FlowManagerRPCOps;
import net.corda.core.messaging.flows.FlowManagerRPCOps;
import org.crsh.cli.Command;
import org.crsh.cli.Man;
import org.crsh.cli.Named;

View File

@ -25,7 +25,6 @@ import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.internal.messaging.AttachmentTrustInfoRPCOps
import net.corda.core.internal.messaging.FlowManagerRPCOps
import net.corda.core.internal.packageName_
import net.corda.core.internal.rootCause
import net.corda.core.internal.uncheckedCast
@ -33,6 +32,7 @@ import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.flows.FlowManagerRPCOps
import net.corda.core.messaging.pendingFlowsCount
import net.corda.tools.shell.utlities.ANSIProgressRenderer
import net.corda.tools.shell.utlities.StdoutANSIProgressRenderer