mirror of
https://github.com/corda/corda.git
synced 2025-06-19 07:38:22 +00:00
Integrate CRaSH shell (SSHD). Joint effort between Mike Hearn and Marek Skocovsky.
The shell is embedded in the node and offers the ability to monitor and control the node via the launching terminal. Still to do: * Switch to a fork of CRaSH that we can maintain ourselves, and merge in Marek's SSH patch so we can enable SSH access. * Add persistent command history that survives restarts. * Tab completion for the 'flow' and 'run' commands. * Remove the 'jul' command and replace it with a command that lets you see and tail the log4j logs instead. * Fix or remove the other crash commands that have bitrotted since 2015.
This commit is contained in:
committed by
Mike Hearn
parent
c5966a93e5
commit
262c87a5c6
@ -2,6 +2,8 @@
|
||||
// must also be in the default package. When using Kotlin there are a whole host of exceptions
|
||||
// trying to construct this from Capsule, so it is written in Java.
|
||||
|
||||
import sun.misc.*;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.file.*;
|
||||
import java.util.*;
|
||||
@ -45,6 +47,17 @@ public class CordaCaplet extends Capsule {
|
||||
return classpath;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void liftoff() {
|
||||
super.liftoff();
|
||||
Signal.handle(new Signal("INT"), new SignalHandler() {
|
||||
@Override
|
||||
public void handle(Signal signal) {
|
||||
// Disable Ctrl-C for this process, so the child process can handle it in the shell instead.
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private Boolean isJAR(File file) {
|
||||
return file.getName().toLowerCase().endsWith(".jar");
|
||||
}
|
||||
|
@ -29,6 +29,8 @@ class ArgsParser {
|
||||
.withValuesConvertedBy(object : EnumConverter<Level>(Level::class.java) {})
|
||||
.defaultsTo(Level.INFO)
|
||||
private val logToConsoleArg = optionParser.accepts("log-to-console", "If set, prints logging to the console as well as to a file.")
|
||||
private val sshdServerArg = optionParser.accepts("sshd", "Enables SSHD server for node administration.")
|
||||
private val noLocalShellArg = optionParser.accepts("no-local-shell", "Do not start the embedded shell locally.")
|
||||
private val isRegistrationArg = optionParser.accepts("initial-registration", "Start initial node registration with Corda network to obtain certificate from the permissioning server.")
|
||||
private val isVersionArg = optionParser.accepts("version", "Print the version and exit")
|
||||
private val helpArg = optionParser.accepts("help").forHelp()
|
||||
@ -45,7 +47,9 @@ class ArgsParser {
|
||||
val logToConsole = optionSet.has(logToConsoleArg)
|
||||
val isRegistration = optionSet.has(isRegistrationArg)
|
||||
val isVersion = optionSet.has(isVersionArg)
|
||||
return CmdLineOptions(baseDirectory, configFile, help, loggingLevel, logToConsole, isRegistration, isVersion)
|
||||
val noLocalShell = optionSet.has(noLocalShellArg)
|
||||
val sshdServer = optionSet.has(sshdServerArg)
|
||||
return CmdLineOptions(baseDirectory, configFile, help, loggingLevel, logToConsole, isRegistration, isVersion, noLocalShell, sshdServer)
|
||||
}
|
||||
|
||||
fun printHelp(sink: PrintStream) = optionParser.printHelpOn(sink)
|
||||
@ -57,7 +61,9 @@ data class CmdLineOptions(val baseDirectory: Path,
|
||||
val loggingLevel: Level,
|
||||
val logToConsole: Boolean,
|
||||
val isRegistration: Boolean,
|
||||
val isVersion: Boolean) {
|
||||
val isVersion: Boolean,
|
||||
val noLocalShell: Boolean,
|
||||
val sshdServer: Boolean) {
|
||||
fun loadConfig(allowMissingConfig: Boolean = false, configOverrides: Map<String, Any?> = emptyMap()): Config {
|
||||
return ConfigHelper.loadConfig(baseDirectory, configFile, allowMissingConfig, configOverrides)
|
||||
}
|
||||
|
@ -11,7 +11,6 @@ import net.corda.core.node.Version
|
||||
import net.corda.core.utilities.Emoji
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.services.config.FullNodeConfiguration
|
||||
import net.corda.node.utilities.ANSIProgressObserver
|
||||
import net.corda.node.utilities.registration.HTTPNetworkRegistrationService
|
||||
import net.corda.node.utilities.registration.NetworkRegistrationHelper
|
||||
import org.fusesource.jansi.Ansi
|
||||
@ -19,6 +18,7 @@ import org.fusesource.jansi.AnsiConsole
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.lang.management.ManagementFactory
|
||||
import java.net.InetAddress
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import kotlin.system.exitProcess
|
||||
|
||||
@ -78,7 +78,8 @@ fun main(args: Array<String>) {
|
||||
|
||||
drawBanner(nodeVersionInfo)
|
||||
|
||||
System.setProperty("log-path", (cmdlineOptions.baseDirectory / LOGS_DIRECTORY_NAME).toString())
|
||||
val dir: Path = cmdlineOptions.baseDirectory
|
||||
System.setProperty("log-path", (dir / "logs").toString())
|
||||
|
||||
val log = LoggerFactory.getLogger("Main")
|
||||
printBasicNodeInfo("Logs can be found in", System.getProperty("log-path"))
|
||||
@ -129,8 +130,11 @@ fun main(args: Array<String>) {
|
||||
val elapsed = (System.currentTimeMillis() - startTime) / 10 / 100.0
|
||||
printBasicNodeInfo("Node for \"${node.info.legalIdentity.name}\" started up and registered in $elapsed sec")
|
||||
|
||||
if (renderBasicInfoToConsole)
|
||||
ANSIProgressObserver(node.smm)
|
||||
// Don't start the shell if there's no console attached.
|
||||
val runShell = !cmdlineOptions.noLocalShell && System.console() != null
|
||||
node.startupComplete.thenAccept {
|
||||
InteractiveShell.startShell(dir, runShell, cmdlineOptions.sshdServer, node)
|
||||
}
|
||||
} failure {
|
||||
log.error("Error during network map registration", it)
|
||||
exitProcess(1)
|
||||
@ -230,4 +234,4 @@ private fun drawBanner(nodeVersionInfo: NodeVersionInfo) {
|
||||
newline().
|
||||
reset())
|
||||
}
|
||||
}
|
||||
}
|
360
node/src/main/kotlin/net/corda/node/InteractiveShell.kt
Normal file
360
node/src/main/kotlin/net/corda/node/InteractiveShell.kt
Normal file
@ -0,0 +1,360 @@
|
||||
package net.corda.node
|
||||
|
||||
import com.fasterxml.jackson.core.JsonFactory
|
||||
import com.fasterxml.jackson.core.JsonGenerator
|
||||
import com.fasterxml.jackson.databind.JsonSerializer
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.fasterxml.jackson.databind.SerializationFeature
|
||||
import com.fasterxml.jackson.databind.SerializerProvider
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule
|
||||
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
|
||||
import net.corda.core.div
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowStateMachine
|
||||
import net.corda.core.utilities.Emoji
|
||||
import net.corda.jackson.JacksonSupport
|
||||
import net.corda.jackson.StringToMethodCallParser
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.node.utilities.ANSIProgressRenderer
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.CURRENT_RPC_USER
|
||||
import net.corda.nodeapi.User
|
||||
import org.crsh.command.InvocationContext
|
||||
import org.crsh.console.jline.JLineProcessor
|
||||
import org.crsh.console.jline.TerminalFactory
|
||||
import org.crsh.console.jline.console.ConsoleReader
|
||||
import org.crsh.shell.ShellFactory
|
||||
import org.crsh.standalone.Bootstrap
|
||||
import org.crsh.text.Color
|
||||
import org.crsh.text.RenderPrintWriter
|
||||
import org.crsh.util.InterruptHandler
|
||||
import org.crsh.util.Utils
|
||||
import org.crsh.vfs.FS
|
||||
import org.crsh.vfs.spi.file.FileMountFactory
|
||||
import org.crsh.vfs.spi.url.ClassPathMountFactory
|
||||
import rx.Observable
|
||||
import rx.Subscriber
|
||||
import java.io.FileDescriptor
|
||||
import java.io.FileInputStream
|
||||
import java.io.PrintWriter
|
||||
import java.lang.reflect.Constructor
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.util.*
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.ExecutionException
|
||||
import java.util.logging.Level
|
||||
import java.util.logging.Logger
|
||||
import kotlin.concurrent.thread
|
||||
|
||||
// TODO: Add command history.
|
||||
// TODO: Command completion.
|
||||
// TODO: Find a way to inject this directly into CRaSH as a command, without needing JIT source compilation.
|
||||
// TODO: Add serialisers for InputStream so attachments can be uploaded through the shell.
|
||||
// TODO: Do something sensible with commands that return a future.
|
||||
// TODO: Configure default renderers, send objects down the pipeline, add commands to do json/xml/yaml outputs.
|
||||
// TODO: Add a command to view last N lines/tail/control log4j2 loggers.
|
||||
// TODO: Review or fix the JVM commands which have bitrotted and some are useless.
|
||||
// TODO: Get rid of the 'java' command, it's kind of worthless.
|
||||
// TODO: Fix up the 'dashboard' command which has some rendering issues.
|
||||
// TODO: Resurrect or reimplement the mail plugin.
|
||||
// TODO: Make it notice new shell commands added after the node started.
|
||||
|
||||
object InteractiveShell {
|
||||
private lateinit var node: Node
|
||||
|
||||
/**
|
||||
* Starts an interactive shell connected to the local terminal. This shell gives administrator access to the node
|
||||
* internals.
|
||||
*/
|
||||
fun startShell(dir: Path, runLocalShell: Boolean, runSSHServer: Boolean, node: Node) {
|
||||
this.node = node
|
||||
var runSSH = runSSHServer
|
||||
|
||||
Logger.getLogger("").level = Level.OFF // TODO: Is this really needed?
|
||||
|
||||
val classpathDriver = ClassPathMountFactory(Thread.currentThread().contextClassLoader)
|
||||
val fileDriver = FileMountFactory(Utils.getCurrentDirectory());
|
||||
|
||||
val extraCommandsPath = (dir / "shell-commands").toAbsolutePath()
|
||||
Files.createDirectories(extraCommandsPath)
|
||||
val commandsFS = FS.Builder()
|
||||
.register("file", fileDriver)
|
||||
.mount("file:" + extraCommandsPath)
|
||||
.register("classpath", classpathDriver)
|
||||
.mount("classpath:/net/corda/node/shell/")
|
||||
.mount("classpath:/crash/commands/")
|
||||
.build()
|
||||
// TODO: Re-point to our own conf path.
|
||||
val confFS = FS.Builder()
|
||||
.register("classpath", classpathDriver)
|
||||
.mount("classpath:/crash")
|
||||
.build()
|
||||
|
||||
val bootstrap = Bootstrap(Thread.currentThread().contextClassLoader, confFS, commandsFS)
|
||||
|
||||
val config = Properties()
|
||||
if (runSSH) {
|
||||
// TODO: Finish and enable SSH access.
|
||||
// This means bringing the CRaSH SSH plugin into the Corda tree and applying Marek's patches
|
||||
// found in https://github.com/marekdapps/crash/commit/8a37ce1c7ef4d32ca18f6396a1a9d9841f7ff643
|
||||
// to that local copy, as CRaSH is no longer well maintained by the upstream and the SSH plugin
|
||||
// that it comes with is based on a very old version of Apache SSHD which can't handle connections
|
||||
// from newer SSH clients. It also means hooking things up to the authentication system.
|
||||
printBasicNodeInfo("SSH server access is not fully implemented, sorry.")
|
||||
runSSH = false
|
||||
}
|
||||
|
||||
if (runSSH) {
|
||||
// Enable SSH access. Note: these have to be strings, even though raw object assignments also work.
|
||||
config["crash.ssh.keypath"] = (dir / "sshkey").toString()
|
||||
config["crash.ssh.keygen"] = "true"
|
||||
// config["crash.ssh.port"] = node.configuration.sshdAddress.port.toString()
|
||||
config["crash.auth"] = "simple"
|
||||
config["crash.auth.simple.username"] = "admin"
|
||||
config["crash.auth.simple.password"] = "admin"
|
||||
}
|
||||
|
||||
bootstrap.config = config
|
||||
bootstrap.setAttributes(mapOf(
|
||||
"node" to node,
|
||||
"services" to node.services,
|
||||
"ops" to node.rpcOps,
|
||||
"mapper" to shellObjectMapper
|
||||
))
|
||||
bootstrap.bootstrap()
|
||||
|
||||
// TODO: Automatically set up the JDBC sub-command with a connection to the database.
|
||||
|
||||
if (runSSH) {
|
||||
// printBasicNodeInfo("SSH server listening on address", node.configuration.sshdAddress.toString())
|
||||
}
|
||||
|
||||
// Possibly bring up a local shell in the launching terminal window, unless it's disabled.
|
||||
if (!runLocalShell)
|
||||
return
|
||||
val shell = bootstrap.context.getPlugin(ShellFactory::class.java).create(null)
|
||||
val terminal = TerminalFactory.create()
|
||||
val consoleReader = ConsoleReader("Corda", FileInputStream(FileDescriptor.`in`), System.out, terminal)
|
||||
val jlineProcessor = JLineProcessor(terminal.isAnsiSupported, shell, consoleReader, System.out)
|
||||
InterruptHandler { jlineProcessor.interrupt() }.install()
|
||||
thread(name = "Command line shell processor", isDaemon = true) {
|
||||
// Give whoever has local shell access administrator access to the node.
|
||||
CURRENT_RPC_USER.set(User(ArtemisMessagingComponent.NODE_USER, "", setOf()))
|
||||
Emoji.renderIfSupported {
|
||||
jlineProcessor.run()
|
||||
}
|
||||
}
|
||||
thread(name = "Command line shell terminator", isDaemon = true) {
|
||||
// Wait for the shell to finish.
|
||||
jlineProcessor.closed()
|
||||
terminal.restore()
|
||||
node.stop()
|
||||
}
|
||||
}
|
||||
|
||||
val shellObjectMapper: ObjectMapper by lazy {
|
||||
// Return a standard Corda Jackson object mapper, configured to use YAML by default and with extra
|
||||
// serializers.
|
||||
//
|
||||
// TODO: This should become the default renderer rather than something used specifically by commands.
|
||||
JacksonSupport.createInMemoryMapper(node.services.identityService, YAMLFactory())
|
||||
}
|
||||
|
||||
private object ObservableSerializer : JsonSerializer<Observable<*>>() {
|
||||
override fun serialize(value: Observable<*>, gen: JsonGenerator, serializers: SerializerProvider) {
|
||||
gen.writeString("(observable)")
|
||||
}
|
||||
}
|
||||
|
||||
private fun createOutputMapper(factory: JsonFactory): ObjectMapper {
|
||||
return JacksonSupport.createNonRpcMapper(factory).apply({
|
||||
// Register serializers for stateful objects from libraries that are special to the RPC system and don't
|
||||
// make sense to print out to the screen. For classes we own, annotations can be used instead.
|
||||
val rpcModule = SimpleModule("RPC module")
|
||||
rpcModule.addSerializer(Observable::class.java, ObservableSerializer)
|
||||
registerModule(rpcModule)
|
||||
|
||||
disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
|
||||
enable(SerializationFeature.INDENT_OUTPUT)
|
||||
})
|
||||
}
|
||||
|
||||
private val yamlMapper by lazy { createOutputMapper(YAMLFactory()) }
|
||||
private val jsonMapper by lazy { createOutputMapper(JsonFactory()) }
|
||||
|
||||
enum class RpcResponsePrintingFormat {
|
||||
yaml, json, tostring
|
||||
}
|
||||
|
||||
/**
|
||||
* Called from the 'flow' shell command. Takes a name fragment and finds a matching flow, or prints out
|
||||
* the list of options if the request is ambiguous. Then parses [inputData] as constructor arguments using
|
||||
* the [runFlowFromString] method and starts the requested flow using the [ANSIProgressRenderer] to draw
|
||||
* the progress tracker. Ctrl-C can be used to cancel.
|
||||
*/
|
||||
@JvmStatic
|
||||
fun runFlowByNameFragment(nameFragment: String, inputData: String, output: RenderPrintWriter) {
|
||||
val matches = node.flowLogicFactory.flowWhitelist.keys.filter { nameFragment in it }
|
||||
if (matches.size > 1) {
|
||||
output.println("Ambigous name provided, please be more specific. Your options are:")
|
||||
matches.forEachIndexed { i, s -> output.println("${i+1}. $s", Color.yellow) }
|
||||
return
|
||||
}
|
||||
val match = matches.single()
|
||||
val clazz = Class.forName(match)
|
||||
if (!FlowLogic::class.java.isAssignableFrom(clazz))
|
||||
throw IllegalStateException("Found a non-FlowLogic class in the whitelist? $clazz")
|
||||
try {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
val fsm = runFlowFromString({ node.services.startFlow(it) }, inputData, clazz as Class<FlowLogic<*>>)
|
||||
// Show the progress tracker on the console until the flow completes or is interrupted with a
|
||||
// Ctrl-C keypress.
|
||||
val latch = CountDownLatch(1)
|
||||
ANSIProgressRenderer.onDone = { latch.countDown() }
|
||||
ANSIProgressRenderer.progressTracker = (fsm as FlowStateMachineImpl).logic.progressTracker
|
||||
try {
|
||||
// Wait for the flow to end and the progress tracker to notice. By the time the latch is released
|
||||
// the tracker is done with the screen.
|
||||
latch.await()
|
||||
} catch(e: InterruptedException) {
|
||||
ANSIProgressRenderer.progressTracker = null
|
||||
// TODO: When the flow framework allows us to kill flows mid-flight, do so here.
|
||||
} catch(e: ExecutionException) {
|
||||
// It has already been logged by the framework code and printed by the ANSI progress renderer.
|
||||
}
|
||||
} catch(e: NoApplicableConstructor) {
|
||||
output.println("No matching constructor found:", Color.red)
|
||||
e.errors.forEach { output.println("- $it", Color.red) }
|
||||
}
|
||||
}
|
||||
|
||||
class NoApplicableConstructor(val errors: List<String>) : Exception() {
|
||||
override fun toString() = (listOf("No applicable constructor for flow. Problems were:") + errors).joinToString(System.lineSeparator())
|
||||
}
|
||||
|
||||
// TODO: This utility is generally useful and might be better moved to the node class, or an RPC, if we can commit to making it stable API.
|
||||
/**
|
||||
* Given a [FlowLogic] class and a string in one-line Yaml form, finds an applicable constructor and starts
|
||||
* the flow, returning the created flow logic. Useful for lightweight invocation where text is preferable
|
||||
* to statically typed, compiled code.
|
||||
*
|
||||
* See the [StringToMethodCallParser] class to learn more about limitations and acceptable syntax.
|
||||
*
|
||||
* @throws NoApplicableConstructor if no constructor could be found for the given set of types.
|
||||
*/
|
||||
@Throws(NoApplicableConstructor::class)
|
||||
fun runFlowFromString(invoke: (FlowLogic<*>) -> FlowStateMachine<*>,
|
||||
inputData: String, clazz: Class<out FlowLogic<*>>,
|
||||
om: ObjectMapper = shellObjectMapper): FlowStateMachine<*> {
|
||||
// For each constructor, attempt to parse the input data as a method call. Use the first that succeeds,
|
||||
// and keep track of the reasons we failed so we can print them out if no constructors are usable.
|
||||
val parser = StringToMethodCallParser(clazz, om)
|
||||
val errors = ArrayList<String>()
|
||||
for (ctor in clazz.constructors) {
|
||||
var paramNamesFromConstructor: List<String>? = null
|
||||
fun getPrototype(ctor: Constructor<*>): List<String> {
|
||||
val argTypes = ctor.parameterTypes.map { it.simpleName }
|
||||
val prototype = paramNamesFromConstructor!!.zip(argTypes).map { pair ->
|
||||
val (name, type) = pair
|
||||
"$name: $type"
|
||||
}
|
||||
return prototype
|
||||
}
|
||||
try {
|
||||
// Attempt construction with the given arguments.
|
||||
paramNamesFromConstructor = parser.paramNamesFromConstructor(ctor)
|
||||
val args = parser.parseArguments(clazz.name, paramNamesFromConstructor.zip(ctor.parameterTypes), inputData)
|
||||
if (args.size != ctor.parameterTypes.size) {
|
||||
errors.add("${getPrototype(ctor)}: Wrong number of arguments (${args.size} provided, ${ctor.parameterTypes.size} needed)")
|
||||
continue
|
||||
}
|
||||
val flow = ctor.newInstance(*args) as FlowLogic<*>
|
||||
return invoke(flow)
|
||||
} catch(e: StringToMethodCallParser.UnparseableCallException.MissingParameter) {
|
||||
errors.add("${getPrototype(ctor)}: missing parameter ${e.paramName}")
|
||||
} catch(e: StringToMethodCallParser.UnparseableCallException.TooManyParameters) {
|
||||
errors.add("${getPrototype(ctor)}: too many parameters")
|
||||
} catch(e: StringToMethodCallParser.UnparseableCallException.ReflectionDataMissing) {
|
||||
val argTypes = ctor.parameterTypes.map { it.simpleName }
|
||||
errors.add("$argTypes: <constructor missing parameter reflection data>")
|
||||
} catch(e: StringToMethodCallParser.UnparseableCallException) {
|
||||
val argTypes = ctor.parameterTypes.map { it.simpleName }
|
||||
errors.add("$argTypes: ${e.message}")
|
||||
}
|
||||
}
|
||||
throw NoApplicableConstructor(errors)
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
fun printAndFollowRPCResponse(outputFormat: RpcResponsePrintingFormat, response: Any?, toStream: PrintWriter): CompletableFuture<Unit>? {
|
||||
val printerFun = when (outputFormat) {
|
||||
RpcResponsePrintingFormat.yaml -> { obj: Any? -> yamlMapper.writeValueAsString(obj) }
|
||||
RpcResponsePrintingFormat.json -> { obj: Any? -> jsonMapper.writeValueAsString(obj) }
|
||||
RpcResponsePrintingFormat.tostring -> { obj: Any? -> Emoji.renderIfSupported { obj.toString() } }
|
||||
}
|
||||
toStream.println(printerFun(response))
|
||||
toStream.flush()
|
||||
return maybeFollow(response, printerFun, toStream)
|
||||
}
|
||||
|
||||
private class PrintingSubscriber(private val printerFun: (Any?) -> String, private val toStream: PrintWriter) : Subscriber<Any>() {
|
||||
private var count = 0;
|
||||
val future = CompletableFuture<Unit>()
|
||||
|
||||
init {
|
||||
// The future is public and can be completed by something else to indicate we don't wish to follow
|
||||
// anymore (e.g. the user pressing Ctrl-C).
|
||||
future.thenAccept {
|
||||
if (!isUnsubscribed)
|
||||
unsubscribe()
|
||||
}
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun onCompleted() {
|
||||
toStream.println("Observable has completed")
|
||||
future.complete(Unit)
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun onNext(t: Any?) {
|
||||
count++
|
||||
toStream.println("Observation $count: " + printerFun(t))
|
||||
toStream.flush()
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun onError(e: Throwable) {
|
||||
toStream.println("Observable completed with an error")
|
||||
e.printStackTrace()
|
||||
future.completeExceptionally(e)
|
||||
}
|
||||
}
|
||||
|
||||
// Kotlin bug: USELESS_CAST warning is generated below but the IDE won't let us remove it.
|
||||
@Suppress("USELESS_CAST", "UNCHECKED_CAST")
|
||||
private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, toStream: PrintWriter): CompletableFuture<Unit>? {
|
||||
// Match on a couple of common patterns for "important" observables. It's tough to do this in a generic
|
||||
// way because observables can be embedded anywhere in the object graph, and can emit other arbitrary
|
||||
// object graphs that contain yet more observables. So we just look for top level responses that follow
|
||||
// the standard "track" pattern, and print them until the user presses Ctrl-C
|
||||
if (response == null) return null
|
||||
|
||||
val observable: Observable<*> = when (response) {
|
||||
is Observable<*> -> response
|
||||
is Pair<*, *> -> when {
|
||||
response.first is Observable<*> -> response.first as Observable<*>
|
||||
response.second is Observable<*> -> response.second as Observable<*>
|
||||
else -> null
|
||||
}
|
||||
else -> null
|
||||
} ?: return null
|
||||
|
||||
val subscriber = PrintingSubscriber(printerFun, toStream)
|
||||
(observable as Observable<Any>).subscribe(subscriber)
|
||||
return subscriber.future
|
||||
}
|
||||
}
|
@ -0,0 +1,15 @@
|
||||
package net.corda.node
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import org.crsh.command.BaseCommand
|
||||
|
||||
/**
|
||||
* Simply extends CRaSH BaseCommand to add easy access to the RPC ops class.
|
||||
*/
|
||||
open class InteractiveShellCommand : BaseCommand() {
|
||||
fun ops() = context.attributes["ops"] as CordaRPCOps
|
||||
fun services() = context.attributes["services"] as ServiceHubInternal
|
||||
fun objectMapper() = context.attributes["mapper"] as ObjectMapper
|
||||
}
|
@ -164,6 +164,7 @@ fun <A> driver(
|
||||
isDebug: Boolean = false,
|
||||
driverDirectory: Path = Paths.get("build", getTimestampAsDirectoryName()),
|
||||
portAllocation: PortAllocation = PortAllocation.Incremental(10000),
|
||||
sshdPortAllocation: PortAllocation = PortAllocation.Incremental(20000),
|
||||
debugPortAllocation: PortAllocation = PortAllocation.Incremental(5005),
|
||||
systemProperties: Map<String, String> = emptyMap(),
|
||||
useTestClock: Boolean = false,
|
||||
@ -172,6 +173,7 @@ fun <A> driver(
|
||||
) = genericDriver(
|
||||
driverDsl = DriverDSL(
|
||||
portAllocation = portAllocation,
|
||||
sshdPortAllocation = sshdPortAllocation,
|
||||
debugPortAllocation = debugPortAllocation,
|
||||
systemProperties = systemProperties,
|
||||
driverDirectory = driverDirectory.toAbsolutePath(),
|
||||
@ -328,6 +330,7 @@ class ShutdownManager(private val executorService: ExecutorService) {
|
||||
|
||||
class DriverDSL(
|
||||
val portAllocation: PortAllocation,
|
||||
val sshdPortAllocation: PortAllocation,
|
||||
val debugPortAllocation: PortAllocation,
|
||||
val systemProperties: Map<String, String>,
|
||||
val driverDirectory: Path,
|
||||
@ -514,6 +517,7 @@ class DriverDSL(
|
||||
override fun startNetworkMapService() {
|
||||
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
|
||||
val apiAddress = portAllocation.nextHostAndPort().toString()
|
||||
val sshdAddress = portAllocation.nextHostAndPort().toString()
|
||||
val baseDirectory = driverDirectory / networkMapLegalName
|
||||
val config = ConfigHelper.loadConfig(
|
||||
baseDirectory = baseDirectory,
|
||||
@ -577,7 +581,8 @@ class DriverDSL(
|
||||
"-XX:+UseG1GC",
|
||||
"-cp", classpath, className,
|
||||
"--base-directory=${nodeConf.baseDirectory}",
|
||||
"--logging-level=$loggingLevel"
|
||||
"--logging-level=$loggingLevel",
|
||||
"--no-local-shell"
|
||||
).filter(String::isNotEmpty)
|
||||
val process = ProcessBuilder(javaArgs)
|
||||
.redirectError((nodeConf.baseDirectory / LOGS_DIRECTORY_NAME / "error.log").toFile())
|
||||
|
@ -13,6 +13,7 @@ import net.corda.core.crypto.X509Utilities
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowLogicRefFactory
|
||||
import net.corda.core.flows.FlowStateMachine
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.*
|
||||
@ -183,6 +184,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
@Volatile var started = false
|
||||
private set
|
||||
|
||||
/** The implementation of the [CordaRPCOps] interface used by this node. */
|
||||
open val rpcOps: CordaRPCOps by lazy { CordaRPCOpsImpl(services, smm, database) } // Lazy to avoid init ordering issue with the SMM.
|
||||
|
||||
open fun start(): AbstractNode {
|
||||
require(!started) { "Node has already been started" }
|
||||
|
||||
@ -222,7 +226,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
isPreviousCheckpointsPresent = true
|
||||
false
|
||||
}
|
||||
startMessagingService(CordaRPCOpsImpl(services, smm, database))
|
||||
startMessagingService(rpcOps)
|
||||
services.registerFlowInitiator(ContractUpgradeFlow.Instigator::class.java) { ContractUpgradeFlow.Acceptor(it) }
|
||||
runOnStop += Runnable { net.stop() }
|
||||
_networkMapRegistrationFuture.setFuture(registerWithNetworkMapIfConfigured())
|
||||
|
@ -33,6 +33,7 @@ import java.io.RandomAccessFile
|
||||
import java.lang.management.ManagementFactory
|
||||
import java.nio.channels.FileLock
|
||||
import java.time.Clock
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import javax.management.ObjectName
|
||||
import kotlin.concurrent.thread
|
||||
|
||||
@ -228,6 +229,8 @@ class Node(override val configuration: FullNodeConfiguration,
|
||||
super.initialiseDatabasePersistence(insideTransaction)
|
||||
}
|
||||
|
||||
val startupComplete = CompletableFuture<Unit>()
|
||||
|
||||
override fun start(): Node {
|
||||
alreadyRunningNodeCheck()
|
||||
super.start()
|
||||
@ -250,6 +253,8 @@ class Node(override val configuration: FullNodeConfiguration,
|
||||
}.
|
||||
build().
|
||||
start()
|
||||
|
||||
startupComplete.complete(Unit)
|
||||
}
|
||||
|
||||
shutdownThread = thread(start = false) {
|
||||
|
40
node/src/main/resources/net/corda/node/shell/base/flow.java
Normal file
40
node/src/main/resources/net/corda/node/shell/base/flow.java
Normal file
@ -0,0 +1,40 @@
|
||||
package net.corda.node;
|
||||
|
||||
// See the comments at the top of run.java
|
||||
|
||||
import org.crsh.cli.*;
|
||||
import org.crsh.command.*;
|
||||
import org.crsh.text.*;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import static net.corda.node.InteractiveShell.*;
|
||||
|
||||
@Man(
|
||||
"Allows you to list and start flows. This is the primary way in which you command the node to change the ledger.\n\n" +
|
||||
"This command is generic, so the right way to use it depends on the flow you wish to start. You can use the 'flow start'\n" +
|
||||
"command with either a full class name, or a substring of the class name that's unambiguous. The parameters to the \n" +
|
||||
"flow constructors (the right one is picked automatically) are then specified using the same syntax as for the run command."
|
||||
)
|
||||
@Usage("Start a (work)flow on the node. This is how you can change the ledger.")
|
||||
public class flow extends InteractiveShellCommand {
|
||||
@Command
|
||||
public void start(
|
||||
@Usage("The class name of the flow to run, or an unambiguous substring") @Argument String name,
|
||||
@Usage("The data to pass as input") @Argument(unquote = false) List<String> input
|
||||
) {
|
||||
if (name == null) {
|
||||
out.println("You must pass a name for the flow, see 'man flow'", Color.red);
|
||||
return;
|
||||
}
|
||||
String inp = input == null ? "" : String.join(" ", input).trim();
|
||||
runFlowByNameFragment(name, inp, out);
|
||||
}
|
||||
|
||||
@Command
|
||||
public void list(InvocationContext<String> context) throws Exception {
|
||||
for (String name : ops().registeredFlows()) {
|
||||
context.provide(name + System.lineSeparator());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,15 @@
|
||||
package net.corda.node.shell.base
|
||||
|
||||
// Note that this file MUST be in a sub-directory called "base" relative to the path
|
||||
// given in the configuration code in InteractiveShell.
|
||||
|
||||
welcome = """
|
||||
|
||||
Welcome to the Corda interactive shell.
|
||||
Useful commands include 'help' to see what is available, and 'bye' to shut down the node.
|
||||
|
||||
"""
|
||||
|
||||
prompt = { ->
|
||||
return "${new Date()}>>> ";
|
||||
}
|
102
node/src/main/resources/net/corda/node/shell/base/run.java
Normal file
102
node/src/main/resources/net/corda/node/shell/base/run.java
Normal file
@ -0,0 +1,102 @@
|
||||
package net.corda.node;
|
||||
|
||||
import net.corda.core.messaging.*;
|
||||
import net.corda.jackson.*;
|
||||
import org.crsh.cli.*;
|
||||
import org.crsh.command.*;
|
||||
import org.crsh.text.*;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
import static net.corda.node.InteractiveShell.*;
|
||||
|
||||
// This file is actually compiled at runtime with a bundled Java compiler by CRaSH. That's pretty weak: being able
|
||||
// to do this is a neat party trick and means people can write new commands in Java then just drop them into
|
||||
// their node directory, but it makes the first usage of the command slower for no good reason. There is a PR
|
||||
// in the upstream CRaSH project that adds an ExternalResolver which might be useful. Then we could convert this
|
||||
// file to Kotlin too.
|
||||
|
||||
public class run extends InteractiveShellCommand {
|
||||
@Command
|
||||
@Man(
|
||||
"Runs a method from the CordaRPCOps interface, which is the same interface exposed to RPC clients.\n\n" +
|
||||
|
||||
"You can learn more about what commands are available by typing 'run' just by itself, or by\n" +
|
||||
"consulting the developer guide at https://docs.corda.net/api/kotlin/corda/net.corda.core.messaging/-corda-r-p-c-ops/index.html"
|
||||
)
|
||||
@Usage("runs a method from the CordaRPCOps interface on the node.")
|
||||
public Object main(
|
||||
InvocationContext<Map> context,
|
||||
@Usage("The command to run") @Argument(unquote = false) List<String> command
|
||||
) {
|
||||
StringToMethodCallParser<CordaRPCOps> parser = new StringToMethodCallParser<>(CordaRPCOps.class, objectMapper());
|
||||
|
||||
if (command == null) {
|
||||
emitHelp(context, parser);
|
||||
return null;
|
||||
}
|
||||
|
||||
String cmd = String.join(" ", command).trim();
|
||||
if (cmd.toLowerCase().startsWith("startflow")) {
|
||||
// The flow command provides better support and startFlow requires special handling anyway due to
|
||||
// the generic startFlow RPC interface which offers no type information with which to parse the
|
||||
// string form of the command.
|
||||
out.println("Please use the 'flow' command to interact with flows rather than the 'run' command.", Color.yellow);
|
||||
return null;
|
||||
}
|
||||
|
||||
Object result = null;
|
||||
try {
|
||||
StringToMethodCallParser.ParsedMethodCall call = parser.parse(ops(), cmd);
|
||||
result = call.call();
|
||||
result = processResult(result);
|
||||
} catch (StringToMethodCallParser.UnparseableCallException e) {
|
||||
out.println(e.getMessage(), Color.red);
|
||||
out.println("Please try 'man run' to learn what syntax is acceptable", Color.red);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private Object processResult(Object result) {
|
||||
if (result != null && !(result instanceof kotlin.Unit) && !(result instanceof Void)) {
|
||||
result = printAndFollowRPCResponse(RpcResponsePrintingFormat.yaml, result, out);
|
||||
}
|
||||
if (result instanceof Future) {
|
||||
Future future = (Future) result;
|
||||
if (!future.isDone()) {
|
||||
out.println("Waiting for completion or Ctrl-C ... ");
|
||||
out.flush();
|
||||
}
|
||||
try {
|
||||
result = future.get();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private void emitHelp(InvocationContext<Map> context, StringToMethodCallParser<CordaRPCOps> parser) {
|
||||
// Sends data down the pipeline about what commands are available. CRaSH will render it nicely.
|
||||
// Each element we emit is a map of column -> content.
|
||||
Map<String, String> cmdsAndArgs = parser.getAvailableCommands();
|
||||
for (Map.Entry<String, String> entry : cmdsAndArgs.entrySet()) {
|
||||
// Skip these entries as they aren't really interesting for the user.
|
||||
if (entry.getKey().equals("startFlowDynamic")) continue;
|
||||
if (entry.getKey().equals("getProtocolVersion")) continue;
|
||||
|
||||
// Use a LinkedHashMap to ensure that the Command column comes first.
|
||||
Map<String, String> m = new LinkedHashMap<>();
|
||||
m.put("Command", entry.getKey());
|
||||
m.put("Parameter types", entry.getValue());
|
||||
try {
|
||||
context.provide(m);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -21,7 +21,9 @@ class ArgsParserTest {
|
||||
logToConsole = false,
|
||||
loggingLevel = Level.INFO,
|
||||
isRegistration = false,
|
||||
isVersion = false))
|
||||
isVersion = false,
|
||||
noLocalShell = false,
|
||||
sshdServer = false))
|
||||
}
|
||||
|
||||
@Test
|
||||
|
94
node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt
Normal file
94
node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt
Normal file
@ -0,0 +1,94 @@
|
||||
package net.corda.node
|
||||
|
||||
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import net.corda.core.contracts.Amount
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowStateMachine
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.DUMMY_PUBKEY_1
|
||||
import net.corda.core.utilities.UntrustworthyData
|
||||
import net.corda.jackson.JacksonSupport
|
||||
import net.corda.node.services.identity.InMemoryIdentityService
|
||||
import org.junit.Test
|
||||
import org.slf4j.Logger
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class InteractiveShellTest {
|
||||
@Suppress("UNUSED")
|
||||
class FlowA(val a: String) : FlowLogic<String>() {
|
||||
constructor(b: Int) : this(b.toString())
|
||||
constructor(b: Int, c: String) : this(b.toString() + c)
|
||||
constructor(amount: Amount<Currency>) : this(amount.toString())
|
||||
constructor(pair: Pair<Amount<Currency>, SecureHash.SHA256>) : this(pair.toString())
|
||||
constructor(party: Party) : this(party.name)
|
||||
override fun call() = a
|
||||
}
|
||||
|
||||
private val ids = InMemoryIdentityService().apply { registerIdentity(Party("SomeCorp", DUMMY_PUBKEY_1)) }
|
||||
private val om = JacksonSupport.createInMemoryMapper(ids, YAMLFactory())
|
||||
|
||||
private fun check(input: String, expected: String) {
|
||||
var output: DummyFSM? = null
|
||||
InteractiveShell.runFlowFromString({ DummyFSM(it as FlowA).apply { output = this } }, input, FlowA::class.java, om)
|
||||
assertEquals(expected, output!!.logic.a, input)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun success() {
|
||||
check("a: Hi there", "Hi there")
|
||||
check("b: 12", "12")
|
||||
check("b: 12, c: Yo", "12Yo")
|
||||
}
|
||||
|
||||
@Test fun complex1() = check("amount: £10", "10.00 GBP")
|
||||
|
||||
@Test fun complex2() = check(
|
||||
"pair: { first: $100.12, second: df489807f81c8c8829e509e1bcb92e6692b9dd9d624b7456435cb2f51dc82587 }",
|
||||
"($100.12, df489807f81c8c8829e509e1bcb92e6692b9dd9d624b7456435cb2f51dc82587)"
|
||||
)
|
||||
|
||||
@Test(expected = InteractiveShell.NoApplicableConstructor::class)
|
||||
fun noArgs() = check("", "")
|
||||
|
||||
@Test(expected = InteractiveShell.NoApplicableConstructor::class)
|
||||
fun missingParam() = check("c: Yo", "")
|
||||
|
||||
@Test(expected = InteractiveShell.NoApplicableConstructor::class)
|
||||
fun tooManyArgs() = check("b: 12, c: Yo, d: Bar", "")
|
||||
|
||||
@Test
|
||||
fun party() = check("party: SomeCorp", "SomeCorp")
|
||||
|
||||
class DummyFSM(val logic: FlowA) : FlowStateMachine<Any?> {
|
||||
override fun <T : Any> sendAndReceive(receiveType: Class<T>, otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>): UntrustworthyData<T> {
|
||||
throw UnsupportedOperationException("not implemented")
|
||||
}
|
||||
|
||||
override fun <T : Any> receive(receiveType: Class<T>, otherParty: Party, sessionFlow: FlowLogic<*>): UntrustworthyData<T> {
|
||||
throw UnsupportedOperationException("not implemented")
|
||||
}
|
||||
|
||||
override fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>) {
|
||||
throw UnsupportedOperationException("not implemented")
|
||||
}
|
||||
|
||||
override fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction {
|
||||
throw UnsupportedOperationException("not implemented")
|
||||
}
|
||||
|
||||
override val serviceHub: ServiceHub
|
||||
get() = throw UnsupportedOperationException()
|
||||
override val logger: Logger
|
||||
get() = throw UnsupportedOperationException()
|
||||
override val id: StateMachineRunId
|
||||
get() = throw UnsupportedOperationException()
|
||||
override val resultFuture: ListenableFuture<Any?>
|
||||
get() = throw UnsupportedOperationException()
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user