mirror of
https://github.com/corda/corda.git
synced 2025-06-21 08:40:03 +00:00
CORDA-3959: Eliminate InternalCordaRPCOps (#6600)
* CORDA-3959: Make `ThreadContextAdjustingRpcOpsProxy` flexible for `RPCOps` it uses * CORDA-3959: More changes towards supporting multiple `RPCOps` implementations * CORDA-3959: Detekt baseline update * CORDA-3959: Integration test compilation fix * CORDA-3959: Introduce `CheckpointRPCOpsImpl` and wire it on * CORDA-3959: Use multiple RPCOps interfaces in the shell commands * CORDA-3959: Detekt baseline update * CORDA-3959: Update RPCPermissionsTests * CORDA-3959: Update RPCSecurityManagerTest * CORDA-3959: Remove deprecated marker and rename the property * CORDA-3959: Detekt baseline * CORDA-3959: Introduce AttachmentTrustInfoRPCOpsImpl and wire it on * CORDA-3959: Delete `InternalCordaRPCOps` * CORDA-3959: Detekt baseline update * CORDA-3959: Rename `CheckpointRPCOps` to `FlowManagerRPCOps`
This commit is contained in:
@ -21,7 +21,6 @@ import net.corda.core.internal.createDirectories
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.inputStream
|
||||
import net.corda.core.internal.list
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.core.messaging.ClientRpcSslOptions
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.startFlow
|
||||
@ -49,6 +48,7 @@ import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.NodeHandle
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.driver.internal.NodeHandleInternal
|
||||
import net.corda.testing.driver.internal.checkpoint.CheckpointRpcHelper.checkpointsRpc
|
||||
import net.corda.testing.internal.useSslRpcOverrides
|
||||
import net.corda.testing.node.User
|
||||
import net.corda.testing.node.internal.enclosedCordapp
|
||||
@ -299,7 +299,7 @@ class InteractiveShellIntegrationTest {
|
||||
(alice.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).createDirectories()
|
||||
alice.rpc.startFlow(::ExternalOperationFlow)
|
||||
ExternalOperation.lock.acquire()
|
||||
InteractiveShell.runDumpCheckpoints(alice.rpc as InternalCordaRPCOps)
|
||||
alice.checkpointsRpc.use { InteractiveShell.runDumpCheckpoints(it) }
|
||||
ExternalOperation.lock2.release()
|
||||
|
||||
val zipFile = (alice.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).list().first { "checkpoints_dump-" in it.toString() }
|
||||
@ -322,7 +322,7 @@ class InteractiveShellIntegrationTest {
|
||||
(alice.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).createDirectories()
|
||||
alice.rpc.startFlow(::ExternalAsyncOperationFlow)
|
||||
ExternalAsyncOperation.lock.acquire()
|
||||
InteractiveShell.runDumpCheckpoints(alice.rpc as InternalCordaRPCOps)
|
||||
alice.checkpointsRpc.use { InteractiveShell.runDumpCheckpoints(it) }
|
||||
ExternalAsyncOperation.future.complete(null)
|
||||
val zipFile = (alice.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).list().first { "checkpoints_dump-" in it.toString() }
|
||||
val json = ZipInputStream(zipFile.inputStream()).use { zip ->
|
||||
@ -350,7 +350,7 @@ class InteractiveShellIntegrationTest {
|
||||
assertThrows<TimeoutException> {
|
||||
alice.rpc.startFlow(::WaitForStateConsumptionFlow, stateRefs).returnValue.getOrThrow(10.seconds)
|
||||
}
|
||||
InteractiveShell.runDumpCheckpoints(alice.rpc as InternalCordaRPCOps)
|
||||
alice.checkpointsRpc.use { InteractiveShell.runDumpCheckpoints(it) }
|
||||
val zipFile = (alice.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).list().first { "checkpoints_dump-" in it.toString() }
|
||||
val json = ZipInputStream(zipFile.inputStream()).use { zip ->
|
||||
zip.nextEntry
|
||||
@ -390,7 +390,7 @@ class InteractiveShellIntegrationTest {
|
||||
Thread.sleep(5000)
|
||||
|
||||
mockRenderPrintWriter()
|
||||
InteractiveShell.runDumpCheckpoints(aliceNode.rpc as InternalCordaRPCOps)
|
||||
aliceNode.checkpointsRpc.use { InteractiveShell.runDumpCheckpoints(it) }
|
||||
|
||||
val zipFile = (aliceNode.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME).list().first { "checkpoints_dump-" in it.toString() }
|
||||
val json = ZipInputStream(zipFile.inputStream()).use { zip ->
|
||||
|
@ -1,14 +1,22 @@
|
||||
package net.corda.tools.shell;
|
||||
|
||||
import net.corda.core.internal.messaging.AttachmentTrustInfoRPCOps;
|
||||
import org.crsh.cli.Command;
|
||||
import org.crsh.cli.Man;
|
||||
import org.crsh.cli.Named;
|
||||
import org.crsh.cli.Usage;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import static net.corda.tools.shell.InteractiveShell.runAttachmentTrustInfoView;
|
||||
|
||||
@Named("attachments")
|
||||
public class AttachmentShellCommand extends InteractiveShellCommand {
|
||||
public class AttachmentShellCommand extends InteractiveShellCommand<AttachmentTrustInfoRPCOps> {
|
||||
|
||||
@NotNull
|
||||
@Override
|
||||
public Class<AttachmentTrustInfoRPCOps> getRpcOpsClass() {
|
||||
return AttachmentTrustInfoRPCOps.class;
|
||||
}
|
||||
|
||||
@Command
|
||||
@Man("Displays the trusted CorDapp attachments that have been manually installed or received over the network")
|
||||
|
@ -1,14 +1,22 @@
|
||||
package net.corda.tools.shell;
|
||||
|
||||
import net.corda.core.internal.messaging.FlowManagerRPCOps;
|
||||
import org.crsh.cli.Command;
|
||||
import org.crsh.cli.Man;
|
||||
import org.crsh.cli.Named;
|
||||
import org.crsh.cli.Usage;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import static net.corda.tools.shell.InteractiveShell.*;
|
||||
|
||||
@Named("checkpoints")
|
||||
public class CheckpointShellCommand extends InteractiveShellCommand {
|
||||
public class CheckpointShellCommand extends InteractiveShellCommand<FlowManagerRPCOps> {
|
||||
|
||||
@NotNull
|
||||
@Override
|
||||
public Class<FlowManagerRPCOps> getRpcOpsClass() {
|
||||
return FlowManagerRPCOps.class;
|
||||
}
|
||||
|
||||
@Command
|
||||
@Man("Outputs the contents of all checkpoints as json to be manually reviewed")
|
||||
|
@ -27,7 +27,7 @@ import static net.corda.tools.shell.InteractiveShell.runStateMachinesView;
|
||||
"flow constructors (the right one is picked automatically) are then specified using the same syntax as for the run command."
|
||||
)
|
||||
@Named("flow")
|
||||
public class FlowShellCommand extends InteractiveShellCommand {
|
||||
public class FlowShellCommand extends CordaRpcOpsShellCommand {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(FlowShellCommand.class);
|
||||
|
||||
|
@ -19,7 +19,7 @@ import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
@Named("hashLookup")
|
||||
public class HashLookupShellCommand extends InteractiveShellCommand {
|
||||
public class HashLookupShellCommand extends CordaRpcOpsShellCommand {
|
||||
private static Logger logger = LoggerFactory.getLogger(HashLookupShellCommand.class);
|
||||
private static final String manualText ="Checks if a transaction matching a specified Id hash value is recorded on this node.\n\n" +
|
||||
"Both the transaction Id and the hashed value of a transaction Id (as returned by the Notary in case of a double-spend) is a valid input.\n" +
|
||||
|
@ -18,7 +18,7 @@ import java.util.Map;
|
||||
@Man("Allows you to see and update the format that's currently used for the commands' output.")
|
||||
@Usage("Allows you to see and update the format that's currently used for the commands' output.")
|
||||
@Named("output-format")
|
||||
public class OutputFormatCommand extends InteractiveShellCommand {
|
||||
public class OutputFormatCommand extends CordaRpcOpsShellCommand {
|
||||
|
||||
public OutputFormatCommand() {}
|
||||
|
||||
|
@ -25,7 +25,7 @@ import static java.util.Comparator.comparing;
|
||||
// is the closest you can get in Kotlin to raw types.
|
||||
|
||||
@Named("run")
|
||||
public class RunShellCommand extends InteractiveShellCommand {
|
||||
public class RunShellCommand extends CordaRpcOpsShellCommand {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(RunShellCommand.class);
|
||||
|
||||
|
@ -0,0 +1,8 @@
|
||||
package net.corda.tools.shell;
|
||||
|
||||
import net.corda.core.messaging.RPCOps;
|
||||
import org.crsh.auth.AuthInfo;
|
||||
|
||||
public interface SshAuthInfo extends AuthInfo {
|
||||
<T extends RPCOps> T getOrCreateRpcOps(Class<T> rpcOpsClass);
|
||||
}
|
@ -13,7 +13,7 @@ import java.util.*;
|
||||
import static java.util.stream.Collectors.joining;
|
||||
|
||||
@Named("start")
|
||||
public class StartShellCommand extends InteractiveShellCommand {
|
||||
public class StartShellCommand extends CordaRpcOpsShellCommand {
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(StartShellCommand.class);
|
||||
|
||||
|
@ -1,14 +1,13 @@
|
||||
package net.corda.tools.shell
|
||||
|
||||
import net.corda.client.rpc.CordaRPCConnection
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
|
||||
import org.crsh.auth.AuthInfo
|
||||
import org.crsh.auth.AuthenticationPlugin
|
||||
import org.crsh.plugin.CRaSHPlugin
|
||||
|
||||
class CordaAuthenticationPlugin(private val makeRPCConnection: (username: String, credential: String) -> CordaRPCConnection) : CRaSHPlugin<AuthenticationPlugin<String>>(), AuthenticationPlugin<String> {
|
||||
internal class CordaAuthenticationPlugin(private val rpcOpsProducer: RPCOpsProducer) : CRaSHPlugin<AuthenticationPlugin<String>>(), AuthenticationPlugin<String> {
|
||||
|
||||
companion object {
|
||||
private val logger = loggerFor<CordaAuthenticationPlugin>()
|
||||
@ -24,9 +23,10 @@ class CordaAuthenticationPlugin(private val makeRPCConnection: (username: String
|
||||
return AuthInfo.UNSUCCESSFUL
|
||||
}
|
||||
try {
|
||||
val connection = makeRPCConnection(username, credential)
|
||||
val ops = connection.proxy as InternalCordaRPCOps
|
||||
return CordaSSHAuthInfo(true, ops, isSsh = true, rpcConn = connection)
|
||||
val cordaSSHAuthInfo = CordaSSHAuthInfo(rpcOpsProducer, username, credential, isSsh = true)
|
||||
// We cannot guarantee authentication happened successfully till `RCPClient` session been established, hence doing a dummy call
|
||||
cordaSSHAuthInfo.getOrCreateRpcOps(CordaRPCOps::class.java).protocolVersion
|
||||
return cordaSSHAuthInfo
|
||||
} catch (e: ActiveMQSecurityException) {
|
||||
logger.warn(e.message)
|
||||
} catch (e: Exception) {
|
||||
|
@ -8,6 +8,6 @@ class CordaDisconnectPlugin : CRaSHPlugin<DisconnectPlugin>(), DisconnectPlugin
|
||||
override fun getImplementation() = this
|
||||
|
||||
override fun onDisconnect(userName: String?, authInfo: AuthInfo?) {
|
||||
(authInfo as? CordaSSHAuthInfo)?.rpcConn?.forceClose()
|
||||
(authInfo as? CordaSSHAuthInfo)?.cleanUp()
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,22 @@
|
||||
package net.corda.tools.shell
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.fasterxml.jackson.databind.type.TypeFactory
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
|
||||
internal abstract class CordaRpcOpsShellCommand : InteractiveShellCommand<CordaRPCOps>() {
|
||||
override val rpcOpsClass: Class<out CordaRPCOps> = CordaRPCOps::class.java
|
||||
|
||||
fun objectMapper(classLoader: ClassLoader?): ObjectMapper {
|
||||
val om = createYamlInputMapper()
|
||||
if (classLoader != null) {
|
||||
om.typeFactory = TypeFactory.defaultInstance().withClassLoader(classLoader)
|
||||
}
|
||||
return om
|
||||
}
|
||||
|
||||
private fun createYamlInputMapper(): ObjectMapper {
|
||||
val rpcOps = ops()
|
||||
return InteractiveShell.createYamlInputMapper(rpcOps)
|
||||
}
|
||||
}
|
@ -1,17 +1,53 @@
|
||||
package net.corda.tools.shell
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import net.corda.client.rpc.CordaRPCConnection
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.tools.shell.InteractiveShell.createYamlInputMapper
|
||||
import com.github.benmanes.caffeine.cache.CacheLoader
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.github.benmanes.caffeine.cache.RemovalListener
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import net.corda.client.rpc.RPCConnection
|
||||
import net.corda.core.internal.utilities.InvocationHandlerTemplate
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.tools.shell.utlities.ANSIProgressRenderer
|
||||
import org.crsh.auth.AuthInfo
|
||||
import java.lang.reflect.Proxy
|
||||
|
||||
class CordaSSHAuthInfo(val successful: Boolean, val rpcOps: InternalCordaRPCOps, val ansiProgressRenderer: ANSIProgressRenderer? = null,
|
||||
val isSsh: Boolean = false, val rpcConn: CordaRPCConnection? = null) : AuthInfo {
|
||||
override fun isSuccessful(): Boolean = successful
|
||||
internal class CordaSSHAuthInfo(private val rpcOpsProducer: RPCOpsProducer,
|
||||
private val username: String, private val credential: String, val ansiProgressRenderer: ANSIProgressRenderer? = null,
|
||||
val isSsh: Boolean = false) : SshAuthInfo {
|
||||
override fun isSuccessful(): Boolean = true
|
||||
|
||||
val yamlInputMapper: ObjectMapper by lazy {
|
||||
createYamlInputMapper(rpcOps)
|
||||
/**
|
||||
* It is necessary to have a cache to prevent creation of too many proxies for the same class. Proxy ensures that RPC connections gracefully
|
||||
* closed when cache entry is eliminated
|
||||
*/
|
||||
private val proxiesCache = Caffeine.newBuilder()
|
||||
.maximumSize(10)
|
||||
.removalListener(RemovalListener<Class<out RPCOps>, Pair<RPCOps, RPCConnection<RPCOps>>> { _, value, _ -> value?.second?.close() })
|
||||
.executor(MoreExecutors.directExecutor())
|
||||
.build(CacheLoader<Class<out RPCOps>, Pair<RPCOps, RPCConnection<RPCOps>>> { key -> createRpcOps(key) })
|
||||
|
||||
override fun <T : RPCOps> getOrCreateRpcOps(rpcOpsClass: Class<T>): T {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
return proxiesCache.get(rpcOpsClass)!!.first as T
|
||||
}
|
||||
|
||||
fun cleanUp() {
|
||||
proxiesCache.asMap().forEach {
|
||||
proxiesCache.invalidate(it.key)
|
||||
it.value.second.forceClose()
|
||||
}
|
||||
}
|
||||
|
||||
private fun <T : RPCOps> createRpcOps(rpcOpsClass: Class<out T>): Pair<T, RPCConnection<T>> {
|
||||
val producerResult = rpcOpsProducer(username, credential, rpcOpsClass)
|
||||
val anotherProxy = proxyRPCOps(producerResult.proxy, rpcOpsClass)
|
||||
return anotherProxy to producerResult
|
||||
}
|
||||
|
||||
private fun <T : RPCOps> proxyRPCOps(instance: T, rpcOpsClass: Class<out T>): T {
|
||||
require(rpcOpsClass.isInterface) { "$rpcOpsClass must be an interface" }
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
return Proxy.newProxyInstance(rpcOpsClass.classLoader, arrayOf(rpcOpsClass), object : InvocationHandlerTemplate {
|
||||
override val delegate = instance
|
||||
}) as T
|
||||
}
|
||||
}
|
@ -9,11 +9,8 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
|
||||
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator
|
||||
import net.corda.client.jackson.JacksonSupport
|
||||
import net.corda.client.jackson.StringToMethodCallParser
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.client.rpc.CordaRPCClientConfiguration
|
||||
import net.corda.client.rpc.CordaRPCConnection
|
||||
import net.corda.client.rpc.GracefulReconnect
|
||||
import net.corda.client.rpc.PermissionException
|
||||
import net.corda.client.rpc.RPCConnection
|
||||
import net.corda.client.rpc.internal.RPCUtils.isShutdownMethodName
|
||||
import net.corda.client.rpc.notUsed
|
||||
import net.corda.core.CordaException
|
||||
@ -27,7 +24,8 @@ import net.corda.core.internal.concurrent.doneFuture
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.createDirectories
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.core.internal.messaging.AttachmentTrustInfoRPCOps
|
||||
import net.corda.core.internal.messaging.FlowManagerRPCOps
|
||||
import net.corda.core.internal.packageName_
|
||||
import net.corda.core.internal.rootCause
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
@ -72,11 +70,10 @@ import java.lang.reflect.ParameterizedType
|
||||
import java.lang.reflect.Type
|
||||
import java.lang.reflect.UndeclaredThrowableException
|
||||
import java.nio.file.Path
|
||||
import java.util.*
|
||||
import java.util.Properties
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.ExecutionException
|
||||
import java.util.concurrent.Future
|
||||
import kotlin.collections.ArrayList
|
||||
import kotlin.concurrent.thread
|
||||
|
||||
// TODO: Add command history.
|
||||
@ -95,9 +92,9 @@ const val STANDALONE_SHELL_PERMISSION = "ALL"
|
||||
@Suppress("MaxLineLength")
|
||||
object InteractiveShell {
|
||||
private val log = LoggerFactory.getLogger(javaClass)
|
||||
private lateinit var makeRPCConnection: (username: String, password: String) -> CordaRPCConnection
|
||||
private lateinit var ops: InternalCordaRPCOps
|
||||
private lateinit var rpcConn: CordaRPCConnection
|
||||
private lateinit var rpcOpsProducer: RPCOpsProducer
|
||||
private lateinit var startupValidation: Lazy<CordaRPCOps>
|
||||
private var rpcConn: RPCConnection<CordaRPCOps>? = null
|
||||
private var shell: Shell? = null
|
||||
private var classLoader: ClassLoader? = null
|
||||
private lateinit var shellConfiguration: ShellConfiguration
|
||||
@ -113,26 +110,7 @@ object InteractiveShell {
|
||||
}
|
||||
|
||||
fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null, standalone: Boolean = false) {
|
||||
makeRPCConnection = { username: String, password: String ->
|
||||
val connection = if (standalone) {
|
||||
CordaRPCClient(
|
||||
configuration.hostAndPort,
|
||||
configuration.ssl,
|
||||
classLoader
|
||||
).start(username, password, gracefulReconnect = GracefulReconnect())
|
||||
} else {
|
||||
CordaRPCClient(
|
||||
hostAndPort = configuration.hostAndPort,
|
||||
configuration = CordaRPCClientConfiguration.DEFAULT.copy(
|
||||
maxReconnectAttempts = 1
|
||||
),
|
||||
sslConfiguration = configuration.ssl,
|
||||
classLoader = classLoader
|
||||
).start(username, password)
|
||||
}
|
||||
rpcConn = connection
|
||||
connection
|
||||
}
|
||||
rpcOpsProducer = DefaultRPCOpsProducer(configuration, classLoader, standalone)
|
||||
launchShell(configuration, standalone, classLoader)
|
||||
}
|
||||
|
||||
@ -253,7 +231,7 @@ object InteractiveShell {
|
||||
// Don't use the Java language plugin (we may not have tools.jar available at runtime), this
|
||||
// will cause any commands using JIT Java compilation to be suppressed. In CRaSH upstream that
|
||||
// is only the 'jmx' command.
|
||||
return super.getPlugins().filterNot { it is JavaLanguage } + CordaAuthenticationPlugin(makeRPCConnection) +
|
||||
return super.getPlugins().filterNot { it is JavaLanguage } + CordaAuthenticationPlugin(rpcOpsProducer) +
|
||||
CordaDisconnectPlugin()
|
||||
}
|
||||
}
|
||||
@ -262,15 +240,20 @@ object InteractiveShell {
|
||||
context.refresh()
|
||||
this.config = config
|
||||
start(context)
|
||||
val rpcOps = { username: String, password: String -> makeRPCConnection(username, password).proxy as InternalCordaRPCOps }
|
||||
ops = makeRPCOps(rpcOps, localUserName, localUserPassword)
|
||||
return context.getPlugin(ShellFactory::class.java).create(null, CordaSSHAuthInfo(false, ops,
|
||||
StdoutANSIProgressRenderer), shellSafety)
|
||||
startupValidation = lazy {
|
||||
rpcOpsProducer(localUserName, localUserPassword, CordaRPCOps::class.java).let {
|
||||
rpcConn = it
|
||||
it.proxy
|
||||
}
|
||||
}
|
||||
// For local shell create an artificial authInfo with super user permissions
|
||||
val authInfo = CordaSSHAuthInfo(rpcOpsProducer, localUserName, localUserPassword, StdoutANSIProgressRenderer)
|
||||
return context.getPlugin(ShellFactory::class.java).create(null, authInfo, shellSafety)
|
||||
}
|
||||
}
|
||||
|
||||
fun nodeInfo() = try {
|
||||
ops.nodeInfo()
|
||||
startupValidation.value.nodeInfo()
|
||||
} catch (e: UndeclaredThrowableException) {
|
||||
throw e.cause ?: e
|
||||
}
|
||||
@ -572,13 +555,13 @@ object InteractiveShell {
|
||||
@JvmStatic
|
||||
fun runAttachmentTrustInfoView(
|
||||
out: RenderPrintWriter,
|
||||
rpcOps: InternalCordaRPCOps
|
||||
rpcOps: AttachmentTrustInfoRPCOps
|
||||
): Any {
|
||||
return AttachmentTrustTable(out, rpcOps.attachmentTrustInfos)
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
fun runDumpCheckpoints(rpcOps: InternalCordaRPCOps) {
|
||||
fun runDumpCheckpoints(rpcOps: FlowManagerRPCOps) {
|
||||
rpcOps.dumpCheckpoints()
|
||||
}
|
||||
|
||||
@ -682,7 +665,7 @@ object InteractiveShell {
|
||||
latch.await()
|
||||
// Unsubscribe or we hold up the shutdown
|
||||
subscription.unsubscribe()
|
||||
rpcConn.forceClose()
|
||||
rpcConn?.forceClose()
|
||||
onExit.invoke()
|
||||
} catch (e: InterruptedException) {
|
||||
// Cancelled whilst draining flows. So let's carry on from here
|
||||
@ -690,7 +673,7 @@ object InteractiveShell {
|
||||
display { println("...cancelled clean shutdown.") }
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
display { println("RPC failed: ${e.rootCause}", Color.red) }
|
||||
display { println("RPC failed: ${e.rootCause}", Decoration.bold, Color.red) }
|
||||
} finally {
|
||||
InputStreamSerializer.invokeContext = null
|
||||
InputStreamDeserializer.closeAll()
|
||||
|
@ -1,23 +1,24 @@
|
||||
package net.corda.tools.shell
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.fasterxml.jackson.databind.type.TypeFactory
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import org.crsh.command.BaseCommand
|
||||
import org.crsh.shell.impl.command.CRaSHSession
|
||||
|
||||
/**
|
||||
* Simply extends CRaSH BaseCommand to add easy access to the RPC ops class.
|
||||
*/
|
||||
open class InteractiveShellCommand : BaseCommand() {
|
||||
fun ops() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).rpcOps
|
||||
fun ansiProgressRenderer() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).ansiProgressRenderer
|
||||
fun objectMapper(classLoader: ClassLoader?): ObjectMapper {
|
||||
val om = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).yamlInputMapper
|
||||
if (classLoader != null) {
|
||||
om.typeFactory = TypeFactory.defaultInstance().withClassLoader(classLoader)
|
||||
}
|
||||
return om
|
||||
internal abstract class InteractiveShellCommand<T : RPCOps> : BaseCommand() {
|
||||
|
||||
abstract val rpcOpsClass: Class<out T>
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
fun ops(): T {
|
||||
val cRaSHSession = context.session as CRaSHSession
|
||||
val authInfo = cRaSHSession.authInfo as SshAuthInfo
|
||||
return authInfo.getOrCreateRpcOps(rpcOpsClass)
|
||||
}
|
||||
|
||||
fun ansiProgressRenderer() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).ansiProgressRenderer
|
||||
|
||||
fun isSsh() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).isSsh
|
||||
}
|
||||
|
@ -0,0 +1,49 @@
|
||||
package net.corda.tools.shell
|
||||
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.client.rpc.CordaRPCClientConfiguration
|
||||
import net.corda.client.rpc.GracefulReconnect
|
||||
import net.corda.client.rpc.RPCConnection
|
||||
import net.corda.client.rpc.internal.RPCClient
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.RPCOps
|
||||
|
||||
internal interface RPCOpsProducer {
|
||||
/**
|
||||
* Returns [RPCConnection] of underlying proxy. Proxy can be obtained at any time by calling [RPCConnection.proxy]
|
||||
*/
|
||||
operator fun <T : RPCOps> invoke(username: String?, credential: String?, rpcOpsClass: Class<T>) : RPCConnection<T>
|
||||
}
|
||||
|
||||
internal class DefaultRPCOpsProducer(private val configuration: ShellConfiguration, private val classLoader: ClassLoader? = null, private val standalone: Boolean) : RPCOpsProducer {
|
||||
|
||||
override fun <T : RPCOps> invoke(username: String?, credential: String?, rpcOpsClass: Class<T>): RPCConnection<T> {
|
||||
|
||||
return if (rpcOpsClass == CordaRPCOps::class.java) {
|
||||
// For CordaRPCOps we are using CordaRPCClient
|
||||
val connection = if (standalone) {
|
||||
CordaRPCClient(
|
||||
configuration.hostAndPort,
|
||||
configuration.ssl,
|
||||
classLoader
|
||||
).start(username!!, credential!!, gracefulReconnect = GracefulReconnect())
|
||||
} else {
|
||||
CordaRPCClient(
|
||||
hostAndPort = configuration.hostAndPort,
|
||||
configuration = CordaRPCClientConfiguration.DEFAULT.copy(
|
||||
maxReconnectAttempts = 1
|
||||
),
|
||||
sslConfiguration = configuration.ssl,
|
||||
classLoader = classLoader
|
||||
).start(username!!, credential!!)
|
||||
}
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
connection as RPCConnection<T>
|
||||
} else {
|
||||
// For other types "plain" RPCClient is used
|
||||
val rpcClient = RPCClient<T>(configuration.hostAndPort, configuration.ssl)
|
||||
val connection = rpcClient.start(rpcOpsClass, username!!, credential!!)
|
||||
connection
|
||||
}
|
||||
}
|
||||
}
|
@ -1,20 +0,0 @@
|
||||
package net.corda.tools.shell
|
||||
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.lang.reflect.Proxy
|
||||
|
||||
fun makeRPCOps(getCordaRPCOps: (username: String, credential: String) -> InternalCordaRPCOps, username: String, credential: String): InternalCordaRPCOps {
|
||||
val cordaRPCOps: InternalCordaRPCOps by lazy {
|
||||
getCordaRPCOps(username, credential)
|
||||
}
|
||||
|
||||
return Proxy.newProxyInstance(InternalCordaRPCOps::class.java.classLoader, arrayOf(InternalCordaRPCOps::class.java)) { _, method, args ->
|
||||
try {
|
||||
method.invoke(cordaRPCOps, *(args ?: arrayOf()))
|
||||
} catch (e: InvocationTargetException) {
|
||||
// Unpack exception.
|
||||
throw e.targetException
|
||||
}
|
||||
} as InternalCordaRPCOps
|
||||
}
|
@ -18,7 +18,7 @@ import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.FlowProgressHandleImpl
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
@ -42,7 +42,7 @@ import kotlin.test.assertFailsWith
|
||||
|
||||
class InteractiveShellTest {
|
||||
lateinit var inputObjectMapper: ObjectMapper
|
||||
lateinit var cordaRpcOps: InternalCordaRPCOps
|
||||
lateinit var cordaRpcOps: CordaRPCOps
|
||||
lateinit var invocationContext: InvocationContext<Map<Any, Any>>
|
||||
lateinit var printWriter: RenderPrintWriter
|
||||
|
||||
|
Reference in New Issue
Block a user