CORDA-311 Shell via SSH server (#2087)

* SSH server integration
This commit is contained in:
Maksymilian Pawlak
2017-11-20 17:41:38 +00:00
committed by GitHub
parent 6a2c170b82
commit e63b6d1386
32 changed files with 1048 additions and 235 deletions

View File

@ -0,0 +1,170 @@
package net.corda.node
import co.paralleluniverse.fibers.Suspendable
import com.jcraft.jsch.ChannelExec
import com.jcraft.jsch.JSch
import com.jcraft.jsch.JSchException
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.nodeapi.User
import net.corda.testing.ALICE
import net.corda.testing.driver.driver
import org.bouncycastle.util.io.Streams
import org.junit.Test
import net.corda.node.services.Permissions.Companion.startFlow
import java.net.ConnectException
import kotlin.test.assertTrue
import kotlin.test.fail
import org.assertj.core.api.Assertions.assertThat
import java.util.regex.Pattern
class SSHServerTest {
@Test()
fun `ssh server does not start be default`() {
val user = User("u", "p", setOf())
// The driver will automatically pick up the annotated flows below
driver() {
val node = startNode(providedName = ALICE.name, rpcUsers = listOf(user))
node.getOrThrow()
val session = JSch().getSession("u", "localhost", 2222)
session.setConfig("StrictHostKeyChecking", "no")
session.setPassword("p")
try {
session.connect()
fail()
} catch (e:JSchException) {
assertTrue(e.cause is ConnectException)
}
}
}
@Test
fun `ssh server starts when configured`() {
val user = User("u", "p", setOf())
// The driver will automatically pick up the annotated flows below
driver {
val node = startNode(providedName = ALICE.name, rpcUsers = listOf(user),
customOverrides = mapOf("sshd" to mapOf("port" to 2222)))
node.getOrThrow()
val session = JSch().getSession("u", "localhost", 2222)
session.setConfig("StrictHostKeyChecking", "no")
session.setPassword("p")
session.connect()
assertTrue(session.isConnected)
}
}
@Test
fun `ssh server verify credentials`() {
val user = User("u", "p", setOf())
// The driver will automatically pick up the annotated flows below
driver {
val node = startNode(providedName = ALICE.name, rpcUsers = listOf(user),
customOverrides = mapOf("sshd" to mapOf("port" to 2222)))
node.getOrThrow()
val session = JSch().getSession("u", "localhost", 2222)
session.setConfig("StrictHostKeyChecking", "no")
session.setPassword("p_is_bad_password")
try {
session.connect()
fail("Server should reject invalid credentials")
} catch (e: JSchException) {
//There is no specialized exception for this
assertTrue(e.message == "Auth fail")
}
}
}
@Test
fun `ssh respects permissions`() {
val user = User("u", "p", setOf(startFlow<FlowICanRun>()))
// The driver will automatically pick up the annotated flows below
driver(isDebug = true) {
val node = startNode(providedName = ALICE.name, rpcUsers = listOf(user),
customOverrides = mapOf("sshd" to mapOf("port" to 2222)))
node.getOrThrow()
val session = JSch().getSession("u", "localhost", 2222)
session.setConfig("StrictHostKeyChecking", "no")
session.setPassword("p")
session.connect()
assertTrue(session.isConnected)
val channel = session.openChannel("exec") as ChannelExec
channel.setCommand("start FlowICannotRun otherParty: \"O=Alice Corp,L=Madrid,C=ES\"")
channel.connect()
val response = String(Streams.readAll(channel.inputStream))
val flowNameEscaped = Pattern.quote("StartFlow.${SSHServerTest::class.qualifiedName}$${FlowICannotRun::class.simpleName}")
channel.disconnect()
session.disconnect()
assertThat(response).matches("(?s)User not permissioned with any of \\[[^]]*${flowNameEscaped}.*")
}
}
@Test
fun `ssh runs flows`() {
val user = User("u", "p", setOf(startFlow<FlowICanRun>()))
// The driver will automatically pick up the annotated flows below
driver(isDebug = true) {
val node = startNode(providedName = ALICE.name, rpcUsers = listOf(user),
customOverrides = mapOf("sshd" to mapOf("port" to 2222)))
node.getOrThrow()
val session = JSch().getSession("u", "localhost", 2222)
session.setConfig("StrictHostKeyChecking", "no")
session.setPassword("p")
session.connect()
assertTrue(session.isConnected)
val channel = session.openChannel("exec") as ChannelExec
channel.setCommand("start FlowICanRun")
channel.connect()
val response = String(Streams.readAll(channel.inputStream))
//There are ANSI control characters involved, so we want to avoid direct byte to byte matching
assertThat(response.lines()).filteredOn( { it.contains("") && it.contains("Done")}).hasSize(1)
}
}
@StartableByRPC
@InitiatingFlow
class FlowICanRun : FlowLogic<String>() {
private val HELLO_STEP = ProgressTracker.Step("Hello")
@Suspendable
override fun call(): String {
progressTracker?.currentStep = HELLO_STEP
return "bambam"
}
override val progressTracker: ProgressTracker? = ProgressTracker(HELLO_STEP)
}
@StartableByRPC
@InitiatingFlow
class FlowICannotRun(val otherParty: Party) : FlowLogic<String>() {
@Suspendable
override fun call(): String = initiateFlow(otherParty).receive<String>().unwrap { it }
override val progressTracker: ProgressTracker? = ProgressTracker()
}
}

View File

@ -2,6 +2,9 @@ package net.corda.node.shell;
// See the comments at the top of run.java
import net.corda.core.messaging.CordaRPCOps;
import net.corda.node.utilities.ANSIProgressRenderer;
import net.corda.node.utilities.CRaSHNSIProgressRenderer;
import org.crsh.cli.*;
import org.crsh.command.*;
import org.crsh.text.*;
@ -9,6 +12,7 @@ import org.crsh.text.ui.TableElement;
import java.util.*;
import static net.corda.node.services.messaging.RPCServerKt.CURRENT_RPC_CONTEXT;
import static net.corda.node.shell.InteractiveShell.*;
@Man(
@ -25,25 +29,27 @@ public class FlowShellCommand extends InteractiveShellCommand {
@Usage("The class name of the flow to run, or an unambiguous substring") @Argument String name,
@Usage("The data to pass as input") @Argument(unquote = false) List<String> input
) {
startFlow(name, input, out);
startFlow(name, input, out, ops(), ansiProgressRenderer());
}
// TODO Limit number of flows shown option?
@Command
@Usage("watch information about state machines running on the node with result information")
public void watch(InvocationContext<TableElement> context) throws Exception {
runStateMachinesView(out);
runStateMachinesView(out, ops());
}
static void startFlow(@Usage("The class name of the flow to run, or an unambiguous substring") @Argument String name,
@Usage("The data to pass as input") @Argument(unquote = false) List<String> input,
RenderPrintWriter out) {
RenderPrintWriter out,
CordaRPCOps rpcOps,
ANSIProgressRenderer ansiProgressRenderer) {
if (name == null) {
out.println("You must pass a name for the flow, see 'man flow'", Color.red);
return;
}
String inp = input == null ? "" : String.join(" ", input).trim();
runFlowByNameFragment(name, inp, out);
runFlowByNameFragment(name, inp, out, rpcOps, ansiProgressRenderer != null ? ansiProgressRenderer : new CRaSHNSIProgressRenderer(out) );
}
@Command

View File

@ -2,6 +2,8 @@ package net.corda.node.shell;
// A simple forwarder to the "flow start" command, for easier typing.
import net.corda.node.utilities.ANSIProgressRenderer;
import net.corda.node.utilities.CRaSHNSIProgressRenderer;
import org.crsh.cli.*;
import java.util.*;
@ -11,6 +13,7 @@ public class StartShellCommand extends InteractiveShellCommand {
@Man("An alias for 'flow start'. Example: \"start Yo target: Some other company\"")
public void main(@Usage("The class name of the flow to run, or an unambiguous substring") @Argument String name,
@Usage("The data to pass as input") @Argument(unquote = false) List<String> input) {
FlowShellCommand.startFlow(name, input, out);
ANSIProgressRenderer ansiProgressRenderer = ansiProgressRenderer();
FlowShellCommand.startFlow(name, input, out, ops(), ansiProgressRenderer != null ? ansiProgressRenderer : new CRaSHNSIProgressRenderer(out));
}
}

View File

@ -36,6 +36,7 @@ import net.corda.node.internal.cordapp.CordappProviderInternal
import net.corda.node.services.ContractUpgradeHandler
import net.corda.node.services.FinalityHandler
import net.corda.node.services.NotaryChangeHandler
import net.corda.node.services.RPCUserService
import net.corda.node.services.api.*
import net.corda.node.services.config.BFTSMaRtConfiguration
import net.corda.node.services.config.NodeConfiguration
@ -55,6 +56,7 @@ import net.corda.node.services.transactions.*
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.services.vault.VaultSoftLockManager
import net.corda.node.shell.InteractiveShell
import net.corda.node.utilities.*
import org.apache.activemq.artemis.utils.ReusableLatch
import org.slf4j.Logger
@ -130,6 +132,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
protected val _nodeReadyFuture = openFuture<Unit>()
protected val networkMapClient: NetworkMapClient? by lazy { configuration.compatibilityZoneURL?.let(::NetworkMapClient) }
lateinit var userService: RPCUserService get
/** Completes once the node has successfully registered with the network map service
* or has loaded network map data from local database */
val nodeReadyFuture: CordaFuture<Unit>
@ -213,6 +217,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
FlowLogicRefFactoryImpl.classloader = cordappLoader.appClassLoader
runOnStop += network::stop
startShell(rpcOps)
Pair(StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
}
@ -243,6 +250,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
open fun startShell(rpcOps: CordaRPCOps) {
InteractiveShell.startShell(configuration, rpcOps, userService, _services.identityService, _services.database)
}
private fun initNodeInfo(): Pair<Set<KeyPair>, NodeInfo> {
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
val keyPairs = mutableSetOf(identityKeyPair)

View File

@ -142,7 +142,9 @@ internal class CordaRPCOpsImpl(
return FlowProgressHandleImpl(
id = stateMachine.id,
returnValue = stateMachine.resultFuture,
progress = stateMachine.logic.track()?.updates ?: Observable.empty()
progress = stateMachine.logic.track()?.updates ?: Observable.empty(),
stepsTreeIndexFeed = stateMachine.logic.trackStepsTreeIndex(),
stepsTreeFeed = stateMachine.logic.trackStepsTree()
)
}

View File

@ -131,7 +131,6 @@ open class Node(configuration: NodeConfiguration,
private var shutdownHook: ShutdownHook? = null
private lateinit var userService: RPCUserService
override fun makeMessagingService(database: CordaPersistence, info: NodeInfo): MessagingService {
userService = RPCUserServiceImpl(configuration.rpcUsers)

View File

@ -117,12 +117,13 @@ open class NodeStartup(val args: Array<String>) {
Node.printBasicNodeInfo("Node for \"$name\" started up and registered in $elapsed sec")
// Don't start the shell if there's no console attached.
val runShell = !cmdlineOptions.noLocalShell && System.console() != null
startedNode.internals.startupComplete.then {
try {
InteractiveShell.startShell(cmdlineOptions.baseDirectory, runShell, cmdlineOptions.sshdServer, startedNode)
} catch (e: Throwable) {
logger.error("Shell failed to start", e)
if (!cmdlineOptions.noLocalShell && System.console() != null && conf.devMode) {
startedNode.internals.startupComplete.then {
try {
InteractiveShell.runLocalShell(startedNode)
} catch (e: Throwable) {
logger.error("Shell failed to start", e)
}
}
}
},
@ -317,7 +318,6 @@ open class NodeStartup(val args: Array<String>) {
a("--- ${versionInfo.vendor} ${versionInfo.releaseVersion} (${versionInfo.revision.take(7)}) -----------------------------------------------").
newline().
newline().
newline().
reset())
}
}

View File

@ -39,6 +39,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
// TODO Move into DevModeOptions
val useTestClock: Boolean get() = false
val detectPublicIp: Boolean get() = true
val sshd: SSHDConfiguration?
}
fun NodeConfiguration.shouldCheckCheckpoints(): Boolean {
@ -109,7 +110,9 @@ data class NodeConfigurationImpl(
override val detectPublicIp: Boolean = true,
override val activeMQServer: ActiveMqServerConfiguration,
// TODO See TODO above. Rename this to nodeInfoPollingFrequency and make it of type Duration
override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis()
override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis(),
override val sshd: SSHDConfiguration? = null
) : NodeConfiguration {
override val exportJMXto: String get() = "http"
@ -144,3 +147,5 @@ data class CertChainPolicyConfig(val role: String, private val policy: CertChain
}
}
}
data class SSHDConfiguration(val port: Int)

View File

@ -23,6 +23,7 @@ data class RpcPermissions(private val values: Set<String> = emptySet()) {
companion object {
val NONE = RpcPermissions()
val ALL = RpcPermissions(setOf("ALL"))
}
fun coverAny(permissions: Set<String>) = !values.intersect(permissions + Permissions.all()).isEmpty()

View File

@ -0,0 +1,35 @@
package net.corda.node.shell
import net.corda.core.context.Actor
import net.corda.core.context.InvocationContext
import net.corda.core.identity.CordaX500Name
import net.corda.core.messaging.CordaRPCOps
import net.corda.node.services.RPCUserService
import net.corda.node.services.messaging.RpcPermissions
import org.crsh.auth.AuthInfo
import org.crsh.auth.AuthenticationPlugin
import org.crsh.plugin.CRaSHPlugin
class CordaAuthenticationPlugin(val rpcOps:CordaRPCOps, val userService:RPCUserService, val nodeLegalName:CordaX500Name) : CRaSHPlugin<AuthenticationPlugin<String>>(), AuthenticationPlugin<String> {
override fun getImplementation(): AuthenticationPlugin<String> = this
override fun getName(): String = "corda"
override fun authenticate(username: String?, credential: String?): AuthInfo {
if (username == null || credential == null) {
return AuthInfo.UNSUCCESSFUL
}
val user = userService.getUser(username)
if (user != null && user.password == credential) {
val actor = Actor(Actor.Id(username), userService.id, nodeLegalName)
return CordaSSHAuthInfo(true, RPCOpsWithContext(rpcOps, InvocationContext.rpc(actor), RpcPermissions(user.permissions)))
}
return AuthInfo.UNSUCCESSFUL;
}
override fun getCredentialType(): Class<String> = String::class.java
}

View File

@ -0,0 +1,9 @@
package net.corda.node.shell
import net.corda.core.messaging.CordaRPCOps
import net.corda.node.utilities.ANSIProgressRenderer
import org.crsh.auth.AuthInfo
class CordaSSHAuthInfo(val successful: Boolean, val rpcOps: CordaRPCOps, val ansiProgressRenderer: ANSIProgressRenderer? = null) : AuthInfo {
override fun isSuccessful(): Boolean = successful
}

View File

@ -9,25 +9,32 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import com.google.common.io.Closeables
import net.corda.client.jackson.JacksonSupport
import net.corda.client.jackson.StringToMethodCallParser
import net.corda.client.rpc.PermissionException
import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.utilities.getOrThrow
import net.corda.core.node.services.IdentityService
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.node.services.RPCUserService
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.CURRENT_RPC_CONTEXT
import net.corda.node.services.messaging.RpcAuthContext
import net.corda.node.services.messaging.RpcPermissions
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.utilities.ANSIProgressRenderer
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.StdoutANSIProgressRenderer
import org.crsh.command.InvocationContext
import org.crsh.console.jline.JLineProcessor
import org.crsh.console.jline.TerminalFactory
@ -77,59 +84,55 @@ object InteractiveShell {
private lateinit var node: StartedNode<Node>
@VisibleForTesting
internal lateinit var database: CordaPersistence
private lateinit var rpcOps:CordaRPCOps
private lateinit var userService:RPCUserService
private lateinit var identityService:IdentityService
private var shell:Shell? = null
private lateinit var nodeLegalName: CordaX500Name
/**
* Starts an interactive shell connected to the local terminal. This shell gives administrator access to the node
* internals.
*/
fun startShell(dir: Path, runLocalShell: Boolean, runSSHServer: Boolean, node: StartedNode<Node>) {
this.node = node
this.database = node.database
var runSSH = runSSHServer
fun startShell(configuration:NodeConfiguration, cordaRPCOps: CordaRPCOps, userService: RPCUserService, identityService: IdentityService, database:CordaPersistence) {
this.rpcOps = cordaRPCOps
this.userService = userService
this.identityService = identityService
this.nodeLegalName = configuration.myLegalName
this.database = database
val dir = configuration.baseDirectory
val runSshDeamon = configuration.sshd != null
val config = Properties()
if (runSSH) {
// TODO: Finish and enable SSH access.
// This means bringing the CRaSH SSH plugin into the Corda tree and applying Marek's patches
// found in https://github.com/marekdapps/crash/commit/8a37ce1c7ef4d32ca18f6396a1a9d9841f7ff643
// to that local copy, as CRaSH is no longer well maintained by the upstream and the SSH plugin
// that it comes with is based on a very old version of Apache SSHD which can't handle connections
// from newer SSH clients. It also means hooking things up to the authentication system.
Node.printBasicNodeInfo("SSH server access is not fully implemented, sorry.")
runSSH = false
}
if (runSshDeamon) {
val sshKeysDir = dir / "sshkey"
sshKeysDir.toFile().mkdirs()
if (runSSH) {
// Enable SSH access. Note: these have to be strings, even though raw object assignments also work.
config["crash.ssh.keypath"] = (dir / "sshkey").toString()
config["crash.ssh.keypath"] = (sshKeysDir / "hostkey.pem").toString()
config["crash.ssh.keygen"] = "true"
// config["crash.ssh.port"] = node.configuration.sshdAddress.port.toString()
config["crash.auth"] = "simple"
config["crash.auth.simple.username"] = "admin"
config["crash.auth.simple.password"] = "admin"
config["crash.ssh.port"] = configuration.sshd?.port.toString()
config["crash.auth"] = "corda"
}
ExternalResolver.INSTANCE.addCommand("run", "Runs a method from the CordaRPCOps interface on the node.", RunShellCommand::class.java)
ExternalResolver.INSTANCE.addCommand("flow", "Commands to work with flows. Flows are how you can change the ledger.", FlowShellCommand::class.java)
ExternalResolver.INSTANCE.addCommand("start", "An alias for 'flow start'", StartShellCommand::class.java)
val shell = ShellLifecycle(dir).start(config)
shell = ShellLifecycle(dir).start(config)
if (runSSH) {
// printBasicNodeInfo("SSH server listening on address", node.configuration.sshdAddress.toString())
if (runSshDeamon) {
Node.printBasicNodeInfo("SSH server listening on port", configuration.sshd!!.port.toString())
}
}
// Possibly bring up a local shell in the launching terminal window, unless it's disabled.
if (!runLocalShell)
return
// TODO: Automatically set up the JDBC sub-command with a connection to the database.
fun runLocalShell(node:StartedNode<Node>) {
val terminal = TerminalFactory.create()
val consoleReader = ConsoleReader("Corda", FileInputStream(FileDescriptor.`in`), System.out, terminal)
val jlineProcessor = JLineProcessor(terminal.isAnsiSupported, shell, consoleReader, System.out)
InterruptHandler { jlineProcessor.interrupt() }.install()
thread(name = "Command line shell processor", isDaemon = true) {
// Give whoever has local shell access administrator access to the node.
// TODO remove this after Shell switches to RPC
val context = RpcAuthContext(net.corda.core.context.InvocationContext.shell(), RpcPermissions.NONE)
val context = RpcAuthContext(net.corda.core.context.InvocationContext.shell(), RpcPermissions.ALL)
CURRENT_RPC_CONTEXT.set(context)
Emoji.renderIfSupported {
jlineProcessor.run()
@ -168,27 +171,25 @@ object InteractiveShell {
// Don't use the Java language plugin (we may not have tools.jar available at runtime), this
// will cause any commands using JIT Java compilation to be suppressed. In CRaSH upstream that
// is only the 'jmx' command.
return super.getPlugins().filterNot { it is JavaLanguage }
return super.getPlugins().filterNot { it is JavaLanguage } + CordaAuthenticationPlugin(rpcOps, userService, nodeLegalName)
}
}
val attributes = mapOf(
"node" to node.internals,
"services" to node.services,
"ops" to node.rpcOps,
"ops" to rpcOps,
"mapper" to yamlInputMapper
)
val context = PluginContext(discovery, attributes, commandsFS, confFS, classLoader)
context.refresh()
this.config = config
start(context)
return context.getPlugin(ShellFactory::class.java).create(null)
return context.getPlugin(ShellFactory::class.java).create(null, CordaSSHAuthInfo(false, RPCOpsWithContext(rpcOps, net.corda.core.context.InvocationContext.shell(), RpcPermissions.ALL), StdoutANSIProgressRenderer))
}
}
private val yamlInputMapper: ObjectMapper by lazy {
// Return a standard Corda Jackson object mapper, configured to use YAML by default and with extra
// serializers.
JacksonSupport.createInMemoryMapper(node.services.identityService, YAMLFactory(), true).apply {
JacksonSupport.createInMemoryMapper(identityService, YAMLFactory(), true).apply {
val rpcModule = SimpleModule()
rpcModule.addDeserializer(InputStream::class.java, InputStreamDeserializer)
rpcModule.addDeserializer(UniqueIdentifier::class.java, UniqueIdentifierDeserializer)
@ -217,42 +218,41 @@ object InteractiveShell {
/**
* Called from the 'flow' shell command. Takes a name fragment and finds a matching flow, or prints out
* the list of options if the request is ambiguous. Then parses [inputData] as constructor arguments using
* the [runFlowFromString] method and starts the requested flow using the [ANSIProgressRenderer] to draw
* the progress tracker. Ctrl-C can be used to cancel.
* the [runFlowFromString] method and starts the requested flow. Ctrl-C can be used to cancel.
*/
@JvmStatic
fun runFlowByNameFragment(nameFragment: String, inputData: String, output: RenderPrintWriter) {
val matches = node.services.rpcFlows.filter { nameFragment in it.name }
fun runFlowByNameFragment(nameFragment: String, inputData: String, output: RenderPrintWriter, rpcOps: CordaRPCOps, ansiProgressRenderer: ANSIProgressRenderer) {
val matches = rpcOps.registeredFlows().filter { nameFragment in it }
if (matches.isEmpty()) {
output.println("No matching flow found, run 'flow list' to see your options.", Color.red)
return
} else if (matches.size > 1) {
output.println("Ambigous name provided, please be more specific. Your options are:")
output.println("Ambiguous name provided, please be more specific. Your options are:")
matches.forEachIndexed { i, s -> output.println("${i + 1}. $s", Color.yellow) }
return
}
val clazz: Class<FlowLogic<*>> = uncheckedCast(matches.single())
val clazz: Class<FlowLogic<*>> = uncheckedCast(Class.forName(matches.single()))
try {
// TODO Flow invocation should use startFlowDynamic.
val context = net.corda.core.context.InvocationContext.shell()
val fsm = runFlowFromString({ node.services.startFlow(it, context).getOrThrow() }, inputData, clazz)
// Show the progress tracker on the console until the flow completes or is interrupted with a
// Ctrl-C keypress.
val stateObservable = runFlowFromString({ clazz,args -> rpcOps.startTrackedFlowDynamic (clazz, *args) }, inputData, clazz)
val latch = CountDownLatch(1)
ANSIProgressRenderer.onDone = { latch.countDown() }
ANSIProgressRenderer.progressTracker = (fsm as FlowStateMachineImpl).logic.progressTracker
ansiProgressRenderer.render(stateObservable, { latch.countDown() })
try {
// Wait for the flow to end and the progress tracker to notice. By the time the latch is released
// the tracker is done with the screen.
latch.await()
} catch (e: InterruptedException) {
ANSIProgressRenderer.progressTracker = null
// TODO: When the flow framework allows us to kill flows mid-flight, do so here.
}
} catch (e: NoApplicableConstructor) {
output.println("No matching constructor found:", Color.red)
e.errors.forEach { output.println("- $it", Color.red) }
} catch (e:PermissionException) {
output.println(e.message ?: "Access denied", Color.red)
} finally {
InputStreamDeserializer.closeAll()
}
@ -273,10 +273,10 @@ object InteractiveShell {
* @throws NoApplicableConstructor if no constructor could be found for the given set of types.
*/
@Throws(NoApplicableConstructor::class)
fun runFlowFromString(invoke: (FlowLogic<*>) -> FlowStateMachine<*>,
fun <T> runFlowFromString(invoke: (Class<out FlowLogic<T>>, Array<out Any?>) -> FlowProgressHandle<T>,
inputData: String,
clazz: Class<out FlowLogic<*>>,
om: ObjectMapper = yamlInputMapper): FlowStateMachine<*> {
clazz: Class<out FlowLogic<T>>,
om: ObjectMapper = yamlInputMapper): FlowProgressHandle<T> {
// For each constructor, attempt to parse the input data as a method call. Use the first that succeeds,
// and keep track of the reasons we failed so we can print them out if no constructors are usable.
val parser = StringToMethodCallParser(clazz, om)
@ -303,7 +303,7 @@ object InteractiveShell {
errors.add("A flow must override the progress tracker in order to be run from the shell")
continue
}
return invoke(flow)
return invoke(clazz, args)
} catch (e: StringToMethodCallParser.UnparseableCallException.MissingParameter) {
errors.add("${getPrototype()}: missing parameter ${e.paramName}")
} catch (e: StringToMethodCallParser.UnparseableCallException.TooManyParameters) {
@ -321,8 +321,8 @@ object InteractiveShell {
// TODO Filtering on error/success when we will have some sort of flow auditing, for now it doesn't make much sense.
@JvmStatic
fun runStateMachinesView(out: RenderPrintWriter): Any? {
val proxy = node.rpcOps
fun runStateMachinesView(out: RenderPrintWriter, rpcOps: CordaRPCOps): Any? {
val proxy = rpcOps
val (stateMachines, stateMachineUpdates) = proxy.stateMachinesFeed()
val currentStateMachines = stateMachines.map { StateMachineUpdate.Added(it) }
val subscriber = FlowWatchPrintingSubscriber(out)
@ -395,7 +395,7 @@ object InteractiveShell {
return result
}
private fun printAndFollowRPCResponse(response: Any?, toStream: PrintWriter): CordaFuture<Unit>? {
private fun printAndFollowRPCResponse(response: Any?, toStream: PrintWriter): CordaFuture<Unit> {
val printerFun = yamlMapper::writeValueAsString
toStream.println(printerFun(response))
toStream.flush()
@ -422,28 +422,31 @@ object InteractiveShell {
override fun onNext(t: Any?) {
count++
toStream.println("Observation $count: " + printerFun(t))
toStream.flush()
}
@Synchronized
override fun onError(e: Throwable) {
toStream.println("Observable completed with an error")
e.printStackTrace()
e.printStackTrace(toStream)
future.setException(e)
}
}
private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, toStream: PrintWriter): OpenFuture<Unit>? {
private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, toStream: PrintWriter): CordaFuture<Unit> {
// Match on a couple of common patterns for "important" observables. It's tough to do this in a generic
// way because observables can be embedded anywhere in the object graph, and can emit other arbitrary
// object graphs that contain yet more observables. So we just look for top level responses that follow
// the standard "track" pattern, and print them until the user presses Ctrl-C
if (response == null) return null
if (response == null) return doneFuture(Unit)
val observable: Observable<*> = when (response) {
is Observable<*> -> response
is DataFeed<*, *> -> response.updates
else -> return null
is DataFeed<*, *> -> {
toStream.println("Snapshot")
toStream.println(response.snapshot)
response.updates
}
else -> return doneFuture(Unit)
}
val subscriber = PrintingSubscriber(printerFun, toStream)

View File

@ -4,12 +4,14 @@ import com.fasterxml.jackson.databind.ObjectMapper
import net.corda.core.messaging.CordaRPCOps
import net.corda.node.services.api.ServiceHubInternal
import org.crsh.command.BaseCommand
import org.crsh.shell.impl.command.CRaSHSession
/**
* Simply extends CRaSH BaseCommand to add easy access to the RPC ops class.
*/
open class InteractiveShellCommand : BaseCommand() {
fun ops() = context.attributes["ops"] as CordaRPCOps
fun ops() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).rpcOps
fun ansiProgressRenderer() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).ansiProgressRenderer
fun services() = context.attributes["services"] as ServiceHubInternal
fun objectMapper() = context.attributes["mapper"] as ObjectMapper
}

View File

@ -0,0 +1,210 @@
package net.corda.node.shell
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.contracts.ContractState
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.messaging.*
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.AttachmentId
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.*
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.messaging.CURRENT_RPC_CONTEXT
import net.corda.node.services.messaging.RpcAuthContext
import net.corda.node.services.messaging.RpcPermissions
import rx.Observable
import java.io.InputStream
import java.security.PublicKey
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future
class RPCOpsWithContext(val cordaRPCOps: CordaRPCOps, val invocationContext:InvocationContext, val rpcPermissions: RpcPermissions) : CordaRPCOps {
class RPCContextRunner<T>(val invocationContext:InvocationContext, val permissions:RpcPermissions, val block:() -> T) : Thread() {
private var result: CompletableFuture<T> = CompletableFuture()
override fun run() {
CURRENT_RPC_CONTEXT.set(RpcAuthContext(invocationContext, permissions))
try {
result.complete(block())
} catch (e:Throwable) {
result.completeExceptionally(e)
}
CURRENT_RPC_CONTEXT.remove()
}
fun get(): Future<T> {
start()
join()
return result
}
}
override fun uploadAttachmentWithMetadata(jar: InputStream, uploader: String, filename: String): SecureHash {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.uploadAttachmentWithMetadata(jar, uploader, filename) }.get().getOrThrow()
}
override fun queryAttachments(query: AttachmentQueryCriteria, sorting: AttachmentSort?): List<AttachmentId> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.queryAttachments(query, sorting) }.get().getOrThrow()
}
override fun nodeStateObservable(): Observable<NodeState> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.nodeStateObservable() }.get().getOrThrow()
}
override fun <T : ContractState> vaultTrackByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultTrackByWithSorting(contractStateType, criteria, sorting) }.get().getOrThrow()
}
override fun <T : ContractState> vaultTrackByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultTrackByWithPagingSpec(contractStateType, criteria, paging) }.get().getOrThrow()
}
override fun <T : ContractState> vaultTrackByCriteria(contractStateType: Class<out T>, criteria: QueryCriteria): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultTrackByCriteria(contractStateType, criteria) }.get().getOrThrow()
}
override fun <T : ContractState> vaultTrack(contractStateType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultTrack(contractStateType) }.get().getOrThrow()
}
override fun <T : ContractState> vaultQueryByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort): Vault.Page<T> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultQueryByWithSorting(contractStateType, criteria, sorting) }.get().getOrThrow()
}
override fun <T : ContractState> vaultQueryByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification): Vault.Page<T> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultQueryByWithPagingSpec(contractStateType, criteria, paging) }.get().getOrThrow()
}
override fun <T : ContractState> vaultQueryByCriteria(criteria: QueryCriteria, contractStateType: Class<out T>): Vault.Page<T> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultQueryByCriteria(criteria, contractStateType) }.get().getOrThrow()
}
override fun <T : ContractState> vaultQuery(contractStateType: Class<out T>): Vault.Page<T> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultQuery(contractStateType) }.get().getOrThrow()
}
override fun stateMachinesSnapshot(): List<StateMachineInfo> {
return RPCContextRunner(invocationContext, rpcPermissions, cordaRPCOps::stateMachinesSnapshot).get().getOrThrow()
}
override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
return RPCContextRunner(invocationContext, rpcPermissions, cordaRPCOps::stateMachinesFeed).get().getOrThrow()
}
override fun <T : ContractState> vaultQueryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>): Vault.Page<T> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultQueryBy(criteria, paging, sorting, contractStateType) }.get().getOrThrow()
}
override fun <T : ContractState> vaultTrackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultTrackBy(criteria, paging, sorting, contractStateType) }.get().getOrThrow()
}
override fun internalVerifiedTransactionsSnapshot(): List<SignedTransaction> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.internalVerifiedTransactionsSnapshot() }.get().getOrThrow()
}
override fun internalVerifiedTransactionsFeed(): DataFeed<List<SignedTransaction>, SignedTransaction> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.internalVerifiedTransactionsFeed() }.get().getOrThrow()
}
override fun stateMachineRecordedTransactionMappingSnapshot(): List<StateMachineTransactionMapping> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.stateMachineRecordedTransactionMappingSnapshot() }.get().getOrThrow()
}
override fun stateMachineRecordedTransactionMappingFeed(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.stateMachineRecordedTransactionMappingFeed() }.get().getOrThrow()
}
override fun networkMapSnapshot(): List<NodeInfo> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.networkMapSnapshot() }.get().getOrThrow()
}
override fun networkMapFeed(): DataFeed<List<NodeInfo>, NetworkMapCache.MapChange> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.networkMapFeed() }.get().getOrThrow()
}
override fun <T> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.startFlowDynamic(logicType, *args) }.get().getOrThrow()
}
override fun <T> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowProgressHandle<T> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.startTrackedFlowDynamic(logicType, *args) }.get().getOrThrow()
}
override fun nodeInfo(): NodeInfo {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.nodeInfo() }.get().getOrThrow()
}
override fun notaryIdentities(): List<Party> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.notaryIdentities() }.get().getOrThrow()
}
override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.addVaultTransactionNote(txnId, txnNote) }.get().getOrThrow()
}
override fun getVaultTransactionNotes(txnId: SecureHash): Iterable<String> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.getVaultTransactionNotes(txnId) }.get().getOrThrow()
}
override fun attachmentExists(id: SecureHash): Boolean {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.attachmentExists(id) }.get().getOrThrow()
}
override fun openAttachment(id: SecureHash): InputStream {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.openAttachment(id) }.get().getOrThrow()
}
override fun uploadAttachment(jar: InputStream): SecureHash {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.uploadAttachment(jar) }.get().getOrThrow()
}
override fun currentNodeTime(): Instant {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.currentNodeTime() }.get().getOrThrow()
}
override fun waitUntilNetworkReady(): CordaFuture<Void?> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.waitUntilNetworkReady() }.get().getOrThrow()
}
override fun wellKnownPartyFromAnonymous(party: AbstractParty): Party? {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.wellKnownPartyFromAnonymous(party) }.get().getOrThrow()
}
override fun partyFromKey(key: PublicKey): Party? {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.partyFromKey(key) }.get().getOrThrow()
}
override fun wellKnownPartyFromX500Name(x500Name: CordaX500Name): Party? {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.wellKnownPartyFromX500Name(x500Name) }.get().getOrThrow()
}
override fun notaryPartyFromX500Name(x500Name: CordaX500Name): Party? {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.notaryPartyFromX500Name(x500Name) }.get().getOrThrow()
}
override fun partiesFromName(query: String, exactMatch: Boolean): Set<Party> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.partiesFromName(query, exactMatch) }.get().getOrThrow()
}
override fun registeredFlows(): List<String> {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.registeredFlows() }.get().getOrThrow()
}
override fun nodeInfoFromParty(party: AbstractParty): NodeInfo? {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.nodeInfoFromParty(party) }.get().getOrThrow()
}
override fun clearNetworkMapCache() {
return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.clearNetworkMapCache() }.get().getOrThrow()
}
}

View File

@ -1,138 +1,113 @@
package net.corda.node.utilities
import net.corda.core.internal.Emoji
import net.corda.core.utilities.ProgressTracker
import net.corda.node.utilities.ANSIProgressRenderer.progressTracker
import net.corda.core.messaging.FlowProgressHandle
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.core.LogEvent
import org.apache.logging.log4j.core.LoggerContext
import org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender
import org.apache.logging.log4j.core.appender.ConsoleAppender
import org.apache.logging.log4j.core.appender.OutputStreamManager
import org.crsh.text.RenderPrintWriter
import org.fusesource.jansi.Ansi
import org.fusesource.jansi.AnsiConsole
import org.fusesource.jansi.AnsiOutputStream
import rx.Subscription
/**
* Knows how to render a [ProgressTracker] to the terminal using coloured, emoji-fied output. Useful when writing small
* command line tools, demos, tests etc. Just set the [progressTracker] field and it will go ahead and start drawing
* if the terminal supports it. Otherwise it just prints out the name of the step whenever it changes.
*
* When a progress tracker is on the screen, it takes over the bottom part and reconfigures logging so that, assuming
* 1 log event == 1 line, the progress tracker is always glued to the bottom and logging scrolls above it.
*
* TODO: More thread safety
*/
object ANSIProgressRenderer {
abstract class ANSIProgressRenderer {
private var subscriptionIndex: Subscription? = null
private var subscriptionTree: Subscription? = null
protected var usingANSI = false
protected var checkEmoji = false
protected var treeIndex: Int = 0
protected var tree: List<Pair<Int,String>> = listOf()
private var installedYet = false
private var subscription: Subscription? = null
private var usingANSI = false
var progressTracker: ProgressTracker? = null
set(value) {
subscription?.unsubscribe()
field = value
if (!installedYet) {
setup()
}
// Reset the state when a new tracker is wired up.
if (value != null) {
prevMessagePrinted = null
prevLinesDrawn = 0
draw(true)
subscription = value.changes.subscribe({ draw(true) }, { done(it) }, { done(null) })
}
}
var onDone: () -> Unit = {}
private fun done(error: Throwable?) {
if (error == null) progressTracker = null
draw(true, error)
onDone()
}
private fun setup() {
AnsiConsole.systemInstall()
// This line looks weird as hell because the magic code to decide if we really have a TTY or not isn't
// actually exposed anywhere as a function (weak sauce). So we have to rely on our knowledge of jansi
// implementation details.
usingANSI = AnsiConsole.wrapOutputStream(System.out) !is AnsiOutputStream
if (usingANSI) {
// This super ugly code hacks into log4j and swaps out its console appender for our own. It's a bit simpler
// than doing things the official way with a dedicated plugin, etc, as it avoids mucking around with all
// the config XML and lifecycle goop.
val manager = LogManager.getContext(false) as LoggerContext
val consoleAppender = manager.configuration.appenders.values.filterIsInstance<ConsoleAppender>().single { it.name == "Console-Appender" }
val scrollingAppender = object : AbstractOutputStreamAppender<OutputStreamManager>(
consoleAppender.name, consoleAppender.layout, consoleAppender.filter,
consoleAppender.ignoreExceptions(), true, consoleAppender.manager) {
override fun append(event: LogEvent) {
// We lock on the renderer to avoid threads that are logging to the screen simultaneously messing
// things up. Of course this slows stuff down a bit, but only whilst this little utility is in use.
// Eventually it will be replaced with a real GUI and we can delete all this.
synchronized(ANSIProgressRenderer) {
if (progressTracker != null) {
val ansi = Ansi.ansi()
repeat(prevLinesDrawn) { ansi.eraseLine().cursorUp(1).eraseLine() }
System.out.print(ansi)
System.out.flush()
}
super.append(event)
if (progressTracker != null)
draw(false)
}
}
}
scrollingAppender.start()
manager.configuration.appenders[consoleAppender.name] = scrollingAppender
val loggerConfigs = manager.configuration.loggers.values
for (config in loggerConfigs) {
val appenderRefs = config.appenderRefs
val consoleAppenders = config.appenders.filter { it.value is ConsoleAppender }.keys
consoleAppenders.forEach { config.removeAppender(it) }
appenderRefs.forEach { config.addAppender(manager.configuration.appenders[it.ref], it.level, it.filter) }
}
manager.updateLoggers()
}
installedYet = true
}
private var onDone: () -> Unit = {}
// prevMessagePrinted is just for non-ANSI mode.
private var prevMessagePrinted: String? = null
// prevLinesDraw is just for ANSI mode.
private var prevLinesDrawn = 0
protected var prevLinesDrawn = 0
@Synchronized private fun draw(moveUp: Boolean, error: Throwable? = null) {
private fun done(error: Throwable?) {
if (error == null) _render(null)
draw(true, error)
onDone()
}
fun render(flowProgressHandle: FlowProgressHandle<*>, onDone: () -> Unit = {}) {
this.onDone = onDone
_render(flowProgressHandle)
}
protected abstract fun printLine(line:String)
protected abstract fun printAnsi(ansi:Ansi)
protected abstract fun setup()
private fun _render(flowProgressHandle: FlowProgressHandle<*>?) {
subscriptionIndex?.unsubscribe()
subscriptionTree?.unsubscribe()
treeIndex = 0
tree = listOf()
if (!installedYet) {
setup()
installedYet = true
}
prevMessagePrinted = null
prevLinesDrawn = 0
draw(true)
flowProgressHandle?.apply {
stepsTreeIndexFeed?.apply {
treeIndex = snapshot
subscriptionIndex = updates.subscribe({
treeIndex = it
draw(true)
}, { done(it) }, { done(null) })
}
stepsTreeFeed?.apply {
tree = snapshot
subscriptionTree = updates.subscribe({
tree = it
draw(true)
}, { done(it) }, { done(null) })
}
}
}
@Synchronized protected fun draw(moveUp: Boolean, error: Throwable? = null) {
if (!usingANSI) {
val currentMessage = progressTracker?.currentStepRecursive?.label
val currentMessage = tree.getOrNull(treeIndex)?.second
if (currentMessage != null && currentMessage != prevMessagePrinted) {
println(currentMessage)
printLine(currentMessage)
prevMessagePrinted = currentMessage
}
return
}
Emoji.renderIfSupported {
fun printingBody() {
// Handle the case where the number of steps in a progress tracker is changed during execution.
val ansi = Ansi.ansi()
val ansi = Ansi()
if (prevLinesDrawn > 0 && moveUp)
ansi.cursorUp(prevLinesDrawn)
// Put a blank line between any logging and us.
ansi.eraseLine()
ansi.newline()
val pt = progressTracker ?: return
var newLinesDrawn = 1 + pt.renderLevel(ansi, 0, error != null)
if (tree.isEmpty()) return
var newLinesDrawn = 1 + renderLevel(ansi, error != null)
if (error != null) {
ansi.a("${Emoji.skullAndCrossbones} ${error.message}")
@ -152,46 +127,137 @@ object ANSIProgressRenderer {
}
prevLinesDrawn = newLinesDrawn
// Need to force a flush here in order to ensure stderr/stdout sync up properly.
System.out.print(ansi)
System.out.flush()
printAnsi(ansi)
}
if (checkEmoji) {
Emoji.renderIfSupported(::printingBody)
} else {
printingBody()
}
}
// Returns number of lines rendered.
private fun ProgressTracker.renderLevel(ansi: Ansi, indent: Int, error: Boolean): Int {
private fun renderLevel(ansi: Ansi, error: Boolean): Int {
with(ansi) {
var lines = 0
for ((index, step) in steps.withIndex()) {
// Don't bother rendering these special steps in some cases.
if (step == ProgressTracker.UNSTARTED) continue
if (indent > 0 && step == ProgressTracker.DONE) continue
for ((index, step) in tree.withIndex()) {
val marker = when {
index < stepIndex -> "${Emoji.greenTick} "
index == stepIndex && step == ProgressTracker.DONE -> "${Emoji.greenTick} "
index == stepIndex -> "${Emoji.rightArrow} "
index < treeIndex -> "${Emoji.greenTick} "
treeIndex == tree.lastIndex -> "${Emoji.greenTick} "
index == treeIndex -> "${Emoji.rightArrow} "
error -> "${Emoji.noEntry} "
else -> " " // Not reached yet.
}
a(" ".repeat(indent))
a(" ".repeat(step.first))
a(marker)
val active = index == stepIndex && step != ProgressTracker.DONE
val active = index == treeIndex
if (active) bold()
a(step.label)
a(step.second)
if (active) boldOff()
eraseLine(Ansi.Erase.FORWARD)
newline()
lines++
val child = getChildProgressTracker(step)
if (child != null)
lines += child.renderLevel(ansi, indent + 1, error)
}
return lines
}
}
}
class CRaSHNSIProgressRenderer(val renderPrintWriter:RenderPrintWriter) : ANSIProgressRenderer() {
override fun printLine(line: String) {
renderPrintWriter.println(line)
}
override fun printAnsi(ansi: Ansi) {
renderPrintWriter.print(ansi)
renderPrintWriter.flush()
}
override fun setup() {
//we assume SSH always use ansi
usingANSI = true
}
}
/**
* Knows how to render a [FlowProgressHandle] to the terminal using coloured, emoji-fied output. Useful when writing small
* command line tools, demos, tests etc. Just call [draw] method and it will go ahead and start drawing
* if the terminal supports it. Otherwise it just prints out the name of the step whenever it changes.
*
* When a progress tracker is on the screen, it takes over the bottom part and reconfigures logging so that, assuming
* 1 log event == 1 line, the progress tracker is always glued to the bottom and logging scrolls above it.
*
* TODO: More thread safety
*/
object StdoutANSIProgressRenderer : ANSIProgressRenderer() {
override fun setup() {
AnsiConsole.systemInstall()
checkEmoji = true
// This line looks weird as hell because the magic code to decide if we really have a TTY or not isn't
// actually exposed anywhere as a function (weak sauce). So we have to rely on our knowledge of jansi
// implementation details.
usingANSI = AnsiConsole.wrapOutputStream(System.out) !is AnsiOutputStream
if (usingANSI) {
// This super ugly code hacks into log4j and swaps out its console appender for our own. It's a bit simpler
// than doing things the official way with a dedicated plugin, etc, as it avoids mucking around with all
// the config XML and lifecycle goop.
val manager = LogManager.getContext(false) as LoggerContext
val consoleAppender = manager.configuration.appenders.values.filterIsInstance<ConsoleAppender>().single { it.name == "Console-Appender" }
val scrollingAppender = object : AbstractOutputStreamAppender<OutputStreamManager>(
consoleAppender.name, consoleAppender.layout, consoleAppender.filter,
consoleAppender.ignoreExceptions(), true, consoleAppender.manager) {
override fun append(event: LogEvent) {
// We lock on the renderer to avoid threads that are logging to the screen simultaneously messing
// things up. Of course this slows stuff down a bit, but only whilst this little utility is in use.
// Eventually it will be replaced with a real GUI and we can delete all this.
synchronized(StdoutANSIProgressRenderer) {
if (tree.isNotEmpty()) {
val ansi = Ansi.ansi()
repeat(prevLinesDrawn) { ansi.eraseLine().cursorUp(1).eraseLine() }
System.out.print(ansi)
System.out.flush()
}
super.append(event)
if (tree.isNotEmpty())
draw(false)
}
}
}
scrollingAppender.start()
manager.configuration.appenders[consoleAppender.name] = scrollingAppender
val loggerConfigs = manager.configuration.loggers.values
for (config in loggerConfigs) {
val appenderRefs = config.appenderRefs
val consoleAppenders = config.appenders.filter { it.value is ConsoleAppender }.keys
consoleAppenders.forEach { config.removeAppender(it) }
appenderRefs.forEach { config.addAppender(manager.configuration.appenders[it.ref], it.level, it.filter) }
}
manager.updateLoggers()
}
}
override fun printLine(line:String) {
System.out.println(line)
}
override fun printAnsi(ansi: Ansi) {
// Need to force a flush here in order to ensure stderr/stdout sync up properly.
System.out.print(ansi)
System.out.flush()
}
}

View File

@ -2,11 +2,17 @@ package net.corda.node
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import net.corda.client.jackson.JacksonSupport
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.Amount
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.objectOrNewInstance
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.FlowProgressHandleImpl
import net.corda.core.utilities.ProgressTracker
import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.shell.InteractiveShell
@ -20,6 +26,7 @@ import net.corda.testing.rigorousMock
import org.junit.After
import org.junit.Before
import org.junit.Test
import rx.Observable
import java.util.*
import kotlin.test.assertEquals
@ -36,8 +43,8 @@ class InteractiveShellTest {
@Suppress("UNUSED")
class FlowA(val a: String) : FlowLogic<String>() {
constructor(b: Int) : this(b.toString())
constructor(b: Int, c: String) : this(b.toString() + c)
constructor(b: Int?) : this(b.toString())
constructor(b: Int?, c: String) : this(b.toString() + c)
constructor(amount: Amount<Currency>) : this(amount.toString())
constructor(pair: Pair<Amount<Currency>, SecureHash.SHA256>) : this(pair.toString())
constructor(party: Party) : this(party.name.toString())
@ -50,9 +57,16 @@ class InteractiveShellTest {
private val om = JacksonSupport.createInMemoryMapper(ids, YAMLFactory())
private fun check(input: String, expected: String) {
var output: DummyFSM? = null
InteractiveShell.runFlowFromString({ DummyFSM(it as FlowA).apply { output = this } }, input, FlowA::class.java, om)
assertEquals(expected, output!!.logic.a, input)
var output: String? = null
InteractiveShell.runFlowFromString( { clazz, args ->
val instance = clazz.getConstructor(*args.map { it!!::class.java }.toTypedArray()).newInstance(*args) as FlowA
output = instance.a
val future = openFuture<String>()
future.set("ABC")
FlowProgressHandleImpl(StateMachineRunId.createRandom(), future, Observable.just("Some string"))
}, input, FlowA::class.java, om)
assertEquals(expected, output!!, input)
}
@Test