Add stateMachines CRaSH command (#800)

Add flow watch command to CRaSH shell.
This commit is contained in:
Katarzyna Streich 2017-06-08 12:54:51 +01:00 committed by GitHub
parent 65fe7d6b81
commit d7635bcbda
3 changed files with 174 additions and 3 deletions

View File

@ -5,20 +5,22 @@ package net.corda.node.shell;
import org.crsh.cli.*;
import org.crsh.command.*;
import org.crsh.text.*;
import org.crsh.text.ui.TableElement;
import java.util.*;
import static net.corda.node.shell.InteractiveShell.*;
@Man(
"Allows you to list and start flows. This is the primary way in which you command the node to change the ledger.\n\n" +
"Allows you to start flows, list the ones available and to watch flows currently running on the node.\n\n" +
"Starting flow is the primary way in which you command the node to change the ledger.\n\n" +
"This command is generic, so the right way to use it depends on the flow you wish to start. You can use the 'flow start'\n" +
"command with either a full class name, or a substring of the class name that's unambiguous. The parameters to the \n" +
"flow constructors (the right one is picked automatically) are then specified using the same syntax as for the run command."
)
@Usage("Start a (work)flow on the node. This is how you can change the ledger.")
public class FlowShellCommand extends InteractiveShellCommand {
@Command
@Usage("Start a (work)flow on the node. This is how you can change the ledger.")
public void start(
@Usage("The class name of the flow to run, or an unambiguous substring") @Argument String name,
@Usage("The data to pass as input") @Argument(unquote = false) List<String> input
@ -26,7 +28,16 @@ public class FlowShellCommand extends InteractiveShellCommand {
startFlow(name, input, out);
}
static void startFlow(@Usage("The class name of the flow to run, or an unambiguous substring") @Argument String name, @Usage("The data to pass as input") @Argument(unquote = false) List<String> input, RenderPrintWriter out) {
// TODO Limit number of flows shown option?
@Command
@Usage("watch information about state machines running on the node with result information")
public void watch(InvocationContext<TableElement> context) throws Exception {
runStateMachinesView(out, context);
}
static void startFlow(@Usage("The class name of the flow to run, or an unambiguous substring") @Argument String name,
@Usage("The data to pass as input") @Argument(unquote = false) List<String> input,
RenderPrintWriter out) {
if (name == null) {
out.println("You must pass a name for the flow, see 'man flow'", Color.red);
return;
@ -36,6 +47,7 @@ public class FlowShellCommand extends InteractiveShellCommand {
}
@Command
@Usage("list flows that user can start")
public void list(InvocationContext<String> context) throws Exception {
for (String name : ops().registeredFlows()) {
context.provide(name + System.lineSeparator());

View File

@ -0,0 +1,128 @@
package net.corda.node.shell
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.ErrorOr
import net.corda.core.crypto.commonName
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.then
import net.corda.core.transactions.SignedTransaction
import org.crsh.text.Color
import org.crsh.text.Decoration
import org.crsh.text.RenderPrintWriter
import org.crsh.text.ui.LabelElement
import org.crsh.text.ui.TableElement
import org.crsh.text.ui.Overflow
import org.crsh.text.ui.RowElement
import rx.Subscriber
class FlowWatchPrintingSubscriber(private val toStream: RenderPrintWriter) : Subscriber<Any>() {
private val indexMap = HashMap<StateMachineRunId, Int>()
private val table = createStateMachinesTable()
val future: SettableFuture<Unit> = SettableFuture.create()
init {
// The future is public and can be completed by something else to indicate we don't wish to follow
// anymore (e.g. the user pressing Ctrl-C).
future then { unsubscribe() }
}
@Synchronized
override fun onCompleted() {
// The observable of state machines will never complete.
future.set(Unit)
}
@Synchronized
override fun onNext(t: Any?) {
if (t is StateMachineUpdate) {
toStream.cls()
createStateMachinesRow(t)
toStream.print(table)
toStream.println("Waiting for completion or Ctrl-C ... ")
toStream.flush()
}
}
@Synchronized
override fun onError(e: Throwable) {
toStream.println("Observable completed with an error")
future.setException(e)
}
private fun stateColor(smmUpdate: StateMachineUpdate): Color {
return when(smmUpdate){
is StateMachineUpdate.Added -> Color.blue
is StateMachineUpdate.Removed -> smmUpdate.result.match({ Color.green } , { Color.red })
}
}
private fun createStateMachinesTable(): TableElement {
val table = TableElement(1,2,1,2).overflow(Overflow.HIDDEN).rightCellPadding(1)
val header = RowElement(true).add("Id", "Flow name", "Initiator", "Status").style(Decoration.bold.fg(Color.black).bg(Color.white))
table.add(header)
return table
}
// TODO Add progress tracker?
private fun createStateMachinesRow(smmUpdate: StateMachineUpdate) {
when (smmUpdate) {
is StateMachineUpdate.Added -> {
table.add(RowElement().add(
LabelElement(formatFlowId(smmUpdate.id)),
LabelElement(formatFlowName(smmUpdate.stateMachineInfo.flowLogicClassName)),
LabelElement(formatFlowInitiator(smmUpdate.stateMachineInfo.initiator)),
LabelElement("In progress")
).style(stateColor(smmUpdate).fg()))
indexMap[smmUpdate.id] = table.rows.size - 1
}
is StateMachineUpdate.Removed -> {
val idx = indexMap[smmUpdate.id]
if (idx != null) {
val oldRow = table.rows[idx]
val flowNameLabel = oldRow.getCol(1) as LabelElement
val flowInitiatorLabel = oldRow.getCol(2) as LabelElement
table.rows[idx] = RowElement().add(
LabelElement(formatFlowId(smmUpdate.id)),
LabelElement(flowNameLabel.value),
LabelElement(flowInitiatorLabel.value),
LabelElement(formatFlowResult(smmUpdate.result))
).style(stateColor(smmUpdate).fg())
}
}
}
}
private fun formatFlowName(flowName: String): String {
val camelCaseRegex = Regex("(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])")
val name = flowName.split('.', '$').last()
// Split CamelCase and get rid of "flow" at the end if present.
return camelCaseRegex.split(name).filter { it.compareTo("Flow", true) != 0 }.joinToString(" ")
}
private fun formatFlowId(flowId: StateMachineRunId): String {
return flowId.toString().removeSurrounding("[","]")
}
private fun formatFlowInitiator(flowInitiator: FlowInitiator): String {
return when (flowInitiator) {
is FlowInitiator.Scheduled -> flowInitiator.scheduledState.ref.toString()
is FlowInitiator.Shell -> "Shell" // TODO Change when we will have more information on shell user.
is FlowInitiator.Peer -> flowInitiator.party.name.commonName
is FlowInitiator.RPC -> "RPC: " + flowInitiator.username
}
}
private fun formatFlowResult(flowResult: ErrorOr<*>): String {
fun successFormat(value: Any?): String {
return when(value) {
is SignedTransaction -> "Tx ID: " + value.id.toString()
is kotlin.Unit -> "No return value"
null -> "No return value"
else -> value.toString()
}
}
return flowResult.match({ successFormat(it) }, { it.message ?: it.toString() })
}
}

View File

@ -14,6 +14,8 @@ import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStateMachine
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.StateMachineInfo
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.utilities.Emoji
import net.corda.core.utilities.loggerFor
import net.corda.jackson.JacksonSupport
@ -40,6 +42,7 @@ import org.crsh.shell.ShellFactory
import org.crsh.shell.impl.command.ExternalResolver
import org.crsh.text.Color
import org.crsh.text.RenderPrintWriter
import org.crsh.text.ui.TableElement
import org.crsh.util.InterruptHandler
import org.crsh.util.Utils
import org.crsh.vfs.FS
@ -304,6 +307,34 @@ object InteractiveShell {
throw NoApplicableConstructor(errors)
}
// TODO Filtering on error/success when we will have some sort of flow auditing, for now it doesn't make much sense.
@JvmStatic
fun runStateMachinesView(out: RenderPrintWriter, context: InvocationContext<TableElement>): Any? {
val proxy = node.rpcOps
val (stateMachines, stateMachineUpdates) = proxy.stateMachinesAndUpdates()
val currentStateMachines = stateMachines.map { StateMachineUpdate.Added(it) }
val subscriber = FlowWatchPrintingSubscriber(out)
stateMachineUpdates.startWith(currentStateMachines).subscribe(subscriber)
var result: Any? = subscriber.future
if (result is Future<*>) {
if (!result.isDone) {
out.cls()
out.println("Waiting for completion or Ctrl-C ... ")
out.flush()
}
try {
result = result.get()
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
} catch (e: ExecutionException) {
throw e.rootCause
} catch (e: InvocationTargetException) {
throw e.rootCause
}
}
return result
}
@JvmStatic
fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>): Any? {
val parser = StringToMethodCallParser(CordaRPCOps::class.java, context.attributes["mapper"] as ObjectMapper)