From e63b6d13869547f99c0f5d3b40feaa280491237a Mon Sep 17 00:00:00 2001 From: Maksymilian Pawlak <120831+m4ksio@users.noreply.github.com> Date: Mon, 20 Nov 2017 17:41:38 +0000 Subject: [PATCH] CORDA-311 Shell via SSH server (#2087) * SSH server integration --- build.gradle | 1 + constants.properties | 2 +- .../kotlin/net/corda/core/flows/FlowLogic.kt | 14 + .../net/corda/core/messaging/FlowHandle.kt | 14 +- .../corda/core/utilities/ProgressTracker.kt | 88 +++-- .../core/utilities/ProgressTrackerTest.kt | 123 +++++++ docs/source/changelog.rst | 3 + docs/source/corda-configuration-file.rst | 5 + docs/source/hello-world-running.rst | 8 +- docs/source/node-administration.rst | 5 + docs/source/shell.rst | 42 ++- .../finance/flows/CashIssueAndPaymentFlow.kt | 2 +- .../src/main/kotlin/net/corda/plugins/Node.kt | 10 +- node/build.gradle | 11 +- .../kotlin/net/corda/node/SSHServerTest.kt | 170 ++++++++++ .../corda/node/shell/FlowShellCommand.java | 14 +- .../corda/node/shell/StartShellCommand.java | 5 +- .../net/corda/node/internal/AbstractNode.kt | 11 + .../corda/node/internal/CordaRPCOpsImpl.kt | 4 +- .../kotlin/net/corda/node/internal/Node.kt | 1 - .../net/corda/node/internal/NodeStartup.kt | 14 +- .../node/services/config/NodeConfiguration.kt | 7 +- .../node/services/messaging/RpcAuthContext.kt | 1 + .../node/shell/CordaAuthenticationPlugin.kt | 35 ++ .../net/corda/node/shell/CordaSSHAuthInfo.kt | 9 + .../net/corda/node/shell/InteractiveShell.kt | 127 ++++---- .../node/shell/InteractiveShellCommand.kt | 4 +- .../net/corda/node/shell/RPCOpsWithContext.kt | 210 ++++++++++++ .../node/utilities/ANSIProgressRenderer.kt | 302 +++++++++++------- .../net/corda/node/InteractiveShellTest.kt | 24 +- .../kotlin/net/corda/testing/driver/Driver.kt | 3 +- .../kotlin/net/corda/testing/node/MockNode.kt | 14 + 32 files changed, 1048 insertions(+), 235 deletions(-) create mode 100644 node/src/integration-test/kotlin/net/corda/node/SSHServerTest.kt create mode 100644 node/src/main/kotlin/net/corda/node/shell/CordaAuthenticationPlugin.kt create mode 100644 node/src/main/kotlin/net/corda/node/shell/CordaSSHAuthInfo.kt create mode 100644 node/src/main/kotlin/net/corda/node/shell/RPCOpsWithContext.kt diff --git a/build.gradle b/build.gradle index 878df9405e..58c7c31ec2 100644 --- a/build.gradle +++ b/build.gradle @@ -48,6 +48,7 @@ buildscript { ext.dependency_checker_version = '3.0.1' ext.commons_collections_version = '4.1' ext.beanutils_version = '1.9.3' + ext.crash_version = 'faba68332800f21278c5b600bf14ad55cef5989e' // Update 121 is required for ObjectInputFilter and at time of writing 131 was latest: ext.java8_minUpdateVersion = '131' diff --git a/constants.properties b/constants.properties index 214be59311..f278032d63 100644 --- a/constants.properties +++ b/constants.properties @@ -1,4 +1,4 @@ -gradlePluginsVersion=2.0.8 +gradlePluginsVersion=2.0.9 kotlinVersion=1.1.60 guavaVersion=21.0 bouncycastleVersion=1.57 diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index 322a9f9925..8c9fba60de 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -348,6 +348,20 @@ abstract class FlowLogic { } } + fun trackStepsTreeIndex(): DataFeed? { + // TODO this is not threadsafe, needs an atomic get-step-and-subscribe + return progressTracker?.let { + DataFeed(it.stepsTreeIndex, it.stepsTreeIndexChanges) + } + } + + fun trackStepsTree(): DataFeed>, List>>? { + // TODO this is not threadsafe, needs an atomic get-step-and-subscribe + return progressTracker?.let { + DataFeed(it.allStepsLabels, it.stepsTreeChanges) + } + } + /** * Suspends the flow until the transaction with the specified ID is received, successfully verified and * sent to the vault for processing. Note that this call suspends until the transaction is considered diff --git a/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt b/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt index ee241d1924..90ba7e2596 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/FlowHandle.kt @@ -31,6 +31,9 @@ interface FlowHandle : AutoCloseable { interface FlowProgressHandle : FlowHandle { val progress: Observable + val stepsTreeIndexFeed: DataFeed? + + val stepsTreeFeed: DataFeed>, List>>? /** * Use this function for flows whose returnValue and progress are not going to be used or tracked, so as to free up * server resources. @@ -52,10 +55,17 @@ data class FlowHandleImpl( } @CordaSerializable -data class FlowProgressHandleImpl( +data class FlowProgressHandleImpl @JvmOverloads constructor( override val id: StateMachineRunId, override val returnValue: CordaFuture, - override val progress: Observable) : FlowProgressHandle { + override val progress: Observable, + override val stepsTreeIndexFeed: DataFeed? = null, + override val stepsTreeFeed: DataFeed>, List>>? = null) : FlowProgressHandle { + + // For API compatibility + fun copy(id: StateMachineRunId, returnValue: CordaFuture, progress: Observable): FlowProgressHandleImpl { + return copy(id = id, returnValue = returnValue, progress = progress, stepsTreeFeed = null, stepsTreeIndexFeed = null) + } // Remember to add @Throws to FlowProgressHandle.close() if this throws an exception. override fun close() { diff --git a/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt b/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt index 6cd9526795..22661eeb7b 100644 --- a/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt +++ b/core/src/main/kotlin/net/corda/core/utilities/ProgressTracker.kt @@ -6,9 +6,6 @@ import rx.Subscription import rx.subjects.PublishSubject import java.util.* -// TODO: Expose the concept of errors. -// TODO: It'd be helpful if this class was at least partly thread safe. - /** * A progress tracker helps surface information about the progress of an operation to a user interface or API of some * kind. It lets you define a set of _steps_ that represent an operation. A step is represented by an object (typically @@ -34,16 +31,16 @@ import java.util.* @CordaSerializable class ProgressTracker(vararg steps: Step) { @CordaSerializable - sealed class Change { - data class Position(val tracker: ProgressTracker, val newStep: Step) : Change() { + sealed class Change(val progressTracker: ProgressTracker) { + data class Position(val tracker: ProgressTracker, val newStep: Step) : Change(tracker) { override fun toString() = newStep.label } - data class Rendering(val tracker: ProgressTracker, val ofStep: Step) : Change() { + data class Rendering(val tracker: ProgressTracker, val ofStep: Step) : Change(tracker) { override fun toString() = ofStep.label } - data class Structural(val tracker: ProgressTracker, val parent: Step) : Change() { + data class Structural(val tracker: ProgressTracker, val parent: Step) : Change(tracker) { override fun toString() = "Structural step change in child of ${parent.label}" } } @@ -70,17 +67,23 @@ class ProgressTracker(vararg steps: Step) { override fun equals(other: Any?) = other is DONE } - /** The steps in this tracker, same as the steps passed to the constructor but with UNSTARTED and DONE inserted. */ - val steps = arrayOf(UNSTARTED, *steps, DONE) - - // This field won't be serialized. - private val _changes by transient { PublishSubject.create() } - @CordaSerializable private data class Child(val tracker: ProgressTracker, @Transient val subscription: Subscription?) private val childProgressTrackers = mutableMapOf() + /** The steps in this tracker, same as the steps passed to the constructor but with UNSTARTED and DONE inserted. */ + val steps = arrayOf(UNSTARTED, *steps, DONE) + + private var _allStepsCache: List> = _allSteps() + + // This field won't be serialized. + private val _changes by transient { PublishSubject.create() } + private val _stepsTreeChanges by transient { PublishSubject.create>>() } + private val _stepsTreeIndexChanges by transient { PublishSubject.create() } + + + init { steps.forEach { val childTracker = it.childProgressTracker() @@ -92,7 +95,15 @@ class ProgressTracker(vararg steps: Step) { /** The zero-based index of the current step in the [steps] array (i.e. with UNSTARTED and DONE) */ var stepIndex: Int = 0 - private set + private set(value) { + field = value + } + + var stepsTreeIndex: Int = -1 + private set(value) { + field = value + _stepsTreeIndexChanges.onNext(value) + } /** * Reading returns the value of steps[stepIndex], writing moves the position of the current tracker. Once moved to @@ -118,22 +129,39 @@ class ProgressTracker(vararg steps: Step) { curChangeSubscription?.unsubscribe() stepIndex = index _changes.onNext(Change.Position(this, steps[index])) - curChangeSubscription = currentStep.changes.subscribe({ _changes.onNext(it) }, { _changes.onError(it) }) + recalculateStepsTreeIndex() + curChangeSubscription = currentStep.changes.subscribe({ + _changes.onNext(it) + if (it is Change.Structural || it is Change.Rendering) rebuildStepsTree() else recalculateStepsTreeIndex() + }, { _changes.onError(it) }) - if (currentStep == DONE) _changes.onCompleted() + if (currentStep == DONE) { + _changes.onCompleted() + _stepsTreeIndexChanges.onCompleted() + _stepsTreeChanges.onCompleted() + } } /** Returns the current step, descending into children to find the deepest step we are up to. */ val currentStepRecursive: Step get() = getChildProgressTracker(currentStep)?.currentStepRecursive ?: currentStep + private fun currentStepRecursiveWithoutUnstarted(): Step { + val stepRecursive = getChildProgressTracker(currentStep)?.currentStepRecursive + return if (stepRecursive == null || stepRecursive == UNSTARTED) currentStep else stepRecursive + } + fun getChildProgressTracker(step: Step): ProgressTracker? = childProgressTrackers[step]?.tracker fun setChildProgressTracker(step: ProgressTracker.Step, childProgressTracker: ProgressTracker) { - val subscription = childProgressTracker.changes.subscribe({ _changes.onNext(it) }, { _changes.onError(it) }) + val subscription = childProgressTracker.changes.subscribe({ + _changes.onNext(it) + if (it is Change.Structural || it is Change.Rendering) rebuildStepsTree() else recalculateStepsTreeIndex() + }, { _changes.onError(it) }) childProgressTrackers[step] = Child(childProgressTracker, subscription) childProgressTracker.parent = this _changes.onNext(Change.Structural(this, step)) + rebuildStepsTree() } private fun removeChildProgressTracker(step: ProgressTracker.Step) { @@ -142,6 +170,7 @@ class ProgressTracker(vararg steps: Step) { it.subscription?.unsubscribe() } _changes.onNext(Change.Structural(this, step)) + rebuildStepsTree() } /** @@ -166,6 +195,18 @@ class ProgressTracker(vararg steps: Step) { return cursor } + private fun rebuildStepsTree() { + _allStepsCache = _allSteps() + _stepsTreeChanges.onNext(allStepsLabels) + + recalculateStepsTreeIndex() + } + + private fun recalculateStepsTreeIndex() { + val step = currentStepRecursiveWithoutUnstarted() + stepsTreeIndex = _allStepsCache.indexOfFirst { it.second == step } + } + private fun _allSteps(level: Int = 0): List> { val result = ArrayList>() for (step in steps) { @@ -177,11 +218,15 @@ class ProgressTracker(vararg steps: Step) { return result } + private fun _allStepsLabels(level: Int = 0): List> = _allSteps(level).map { Pair(it.first, it.second.label) } + /** * A list of all steps in this ProgressTracker and the children, with the indent level provided starting at zero. * Note that UNSTARTED is never counted, and DONE is only counted at the calling level. */ - val allSteps: List> get() = _allSteps() + val allSteps: List> get() = _allStepsCache + + val allStepsLabels: List> get() = _allStepsLabels() private var curChangeSubscription: Subscription? = null @@ -200,8 +245,15 @@ class ProgressTracker(vararg steps: Step) { */ val changes: Observable get() = _changes + val stepsTreeChanges: Observable>> get() = _stepsTreeChanges + + val stepsTreeIndexChanges: Observable get() = _stepsTreeIndexChanges + /** Returns true if the progress tracker has ended, either by reaching the [DONE] step or prematurely with an error */ val hasEnded: Boolean get() = _changes.hasCompleted() || _changes.hasThrowable() } +// TODO: Expose the concept of errors. +// TODO: It'd be helpful if this class was at least partly thread safe. + diff --git a/core/src/test/kotlin/net/corda/core/utilities/ProgressTrackerTest.kt b/core/src/test/kotlin/net/corda/core/utilities/ProgressTrackerTest.kt index f307a2375e..9bcf5c0fb5 100644 --- a/core/src/test/kotlin/net/corda/core/utilities/ProgressTrackerTest.kt +++ b/core/src/test/kotlin/net/corda/core/utilities/ProgressTrackerTest.kt @@ -5,6 +5,7 @@ import org.junit.Test import java.util.* import kotlin.test.assertEquals import kotlin.test.assertFails +import org.assertj.core.api.Assertions.* class ProgressTrackerTest { object SimpleSteps { @@ -24,13 +25,23 @@ class ProgressTrackerTest { fun tracker() = ProgressTracker(AYY, BEE, SEA) } + object BabySteps { + object UNOS : ProgressTracker.Step("unos") + object DOES : ProgressTracker.Step("does") + object TRES : ProgressTracker.Step("tres") + + fun tracker() = ProgressTracker(UNOS, DOES, TRES) + } + lateinit var pt: ProgressTracker lateinit var pt2: ProgressTracker + lateinit var pt3: ProgressTracker @Before fun before() { pt = SimpleSteps.tracker() pt2 = ChildSteps.tracker() + pt3 = BabySteps.tracker() } @Test @@ -81,6 +92,118 @@ class ProgressTrackerTest { assertEquals(ChildSteps.BEE, pt2.nextStep()) } + @Test + fun `steps tree index counts children steps`() { + pt.setChildProgressTracker(SimpleSteps.TWO, pt2) + + val allSteps = pt.allSteps + + //capture notifications + val stepsIndexNotifications = LinkedList() + pt.stepsTreeIndexChanges.subscribe { + stepsIndexNotifications += it + } + val stepsTreeNotification = LinkedList>>() + pt.stepsTreeChanges.subscribe { + stepsTreeNotification += it + } + + fun assertCurrentStepsTree(index:Int, step: ProgressTracker.Step) { + assertEquals(index, pt.stepsTreeIndex) + assertEquals(step, allSteps[pt.stepsTreeIndex].second) + } + + //travel tree + pt.currentStep = SimpleSteps.ONE + assertCurrentStepsTree(0, SimpleSteps.ONE) + + pt.currentStep = SimpleSteps.TWO + assertCurrentStepsTree(1, SimpleSteps.TWO) + + pt2.currentStep = ChildSteps.BEE + assertCurrentStepsTree(3, ChildSteps.BEE) + + pt.currentStep = SimpleSteps.THREE + assertCurrentStepsTree(5, SimpleSteps.THREE) + + //assert no structure changes and proper steps propagation + assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(0, 1, 3, 5)) + assertThat(stepsTreeNotification).isEmpty() + } + + @Test + fun `structure changes are pushed down when progress trackers are added`() { + pt.setChildProgressTracker(SimpleSteps.TWO, pt2) + + //capture notifications + val stepsIndexNotifications = LinkedList() + pt.stepsTreeIndexChanges.subscribe { + stepsIndexNotifications += it + } + + //put current state as a first change for simplicity when asserting + val stepsTreeNotification = mutableListOf(pt.allStepsLabels) + println(pt.allStepsLabels) + pt.stepsTreeChanges.subscribe { + stepsTreeNotification += it + } + + fun assertCurrentStepsTree(index:Int, step: ProgressTracker.Step) { + assertEquals(index, pt.stepsTreeIndex) + assertEquals(step.label, stepsTreeNotification.last()[pt.stepsTreeIndex].second) + } + + pt.currentStep = SimpleSteps.TWO + assertCurrentStepsTree(1, SimpleSteps.TWO) + + pt.currentStep = SimpleSteps.FOUR + assertCurrentStepsTree(6, SimpleSteps.FOUR) + + + pt.setChildProgressTracker(SimpleSteps.THREE, pt3) + + assertCurrentStepsTree(9, SimpleSteps.FOUR) + + //assert no structure changes and proper steps propagation + assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(1, 6, 9)) + assertThat(stepsTreeNotification).hasSize(2) // 1 change + 1 our initial state + } + + @Test + fun `structure changes are pushed down when progress trackers are removed`() { + pt.setChildProgressTracker(SimpleSteps.TWO, pt2) + + //capture notifications + val stepsIndexNotifications = LinkedList() + pt.stepsTreeIndexChanges.subscribe { + stepsIndexNotifications += it + } + + //put current state as a first change for simplicity when asserting + val stepsTreeNotification = mutableListOf(pt.allStepsLabels) + pt.stepsTreeChanges.subscribe { + stepsTreeNotification += it + } + + fun assertCurrentStepsTree(index:Int, step: ProgressTracker.Step) { + assertEquals(index, pt.stepsTreeIndex) + assertEquals(step.label, stepsTreeNotification.last()[pt.stepsTreeIndex].second) + } + + pt.currentStep = SimpleSteps.TWO + pt2.currentStep = ChildSteps.SEA + pt3.currentStep = BabySteps.UNOS + assertCurrentStepsTree(4, ChildSteps.SEA) + + pt.setChildProgressTracker(SimpleSteps.TWO, pt3) + + assertCurrentStepsTree(2, BabySteps.UNOS) + + //assert no structure changes and proper steps propagation + assertThat(stepsIndexNotifications).containsExactlyElementsOf(listOf(1, 4, 2)) + assertThat(stepsTreeNotification).hasSize(2) // 1 change + 1 our initial state + } + @Test fun `can be rewound`() { pt.setChildProgressTracker(SimpleSteps.TWO, pt2) diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 11b2632071..a8986d15eb 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -6,9 +6,12 @@ from the previous milestone release. UNRELEASED ---------- + * ``AttachmentStorage`` now allows providing metadata on attachments upload - username and filename, currently as plain strings. Those can be then used for querying, utilizing ``queryAttachments`` method of the same interface. +* ``SSH Server`` - The node can now expose shell via SSH server with proper authorization and permissioning built in. + * ``CordaRPCOps`` implementation now checks permissions for any function invocation, rather than just when starting flows. * ``wellKnownPartyFromAnonymous()`` now always resolve the key to a ``Party``, then the party to the well known party. diff --git a/docs/source/corda-configuration-file.rst b/docs/source/corda-configuration-file.rst index 4df92c64b4..0531431cc5 100644 --- a/docs/source/corda-configuration-file.rst +++ b/docs/source/corda-configuration-file.rst @@ -152,3 +152,8 @@ path to the node's base directory. :jarDirs: An optional list of file system directories containing JARs to include in the classpath when launching via ``corda.jar`` only. Each should be a string. Only the JARs in the directories are added, not the directories themselves. This is useful for including JDBC drivers and the like. e.g. ``jarDirs = [ 'lib' ]`` + +:sshd: If provided, node will start internal SSH server which will provide a management shell. It uses the same credentials + and permissions as RPC subsystem. It has one required parameter. + + :port: - the port to start SSH server on diff --git a/docs/source/hello-world-running.rst b/docs/source/hello-world-running.rst index 5504b83b65..17d506471d 100644 --- a/docs/source/hello-world-running.rst +++ b/docs/source/hello-world-running.rst @@ -33,15 +33,16 @@ service. rpcPort 10006 webPort 10007 cordapps = ["net.corda:corda-finance:$corda_release_version"] - rpcUsers = [[ user: "user1", "password": "test", "permissions": []]] + rpcUsers = [[ user: "user1", "password": "test", "permissions": ["ALL]]] } node { name "O=PartyB,L=New York,C=US" p2pPort 10008 rpcPort 10009 webPort 10010 + sshdPort 10024 cordapps = ["net.corda:corda-finance:$corda_release_version"] - rpcUsers = [[ user: "user1", "password": "test", "permissions": []]] + rpcUsers = [[ user: "user1", "password": "test", "permissions": ["ALL"]]] } } @@ -101,6 +102,9 @@ node via its built-in CRaSH shell. Go to the terminal window displaying the CRaSH shell of PartyA. Typing ``help`` will display a list of the available commands. +.. note:: Local terminal shell is available only in a development mode. In production environment SSH server can be enabled. + More about SSH and how to connect can be found on :doc:`Shell` page. + We want to create an IOU of 100 with PartyB. We start the ``IOUFlow`` by typing: .. code:: bash diff --git a/docs/source/node-administration.rst b/docs/source/node-administration.rst index 1b69ad9eae..7a677c34d2 100644 --- a/docs/source/node-administration.rst +++ b/docs/source/node-administration.rst @@ -53,6 +53,11 @@ reserve the right to move and rename it as it's not part of the public API as ye logging name construction. If you can't find what you need to refer to, use the ``--logging-level`` option as above and then determine the logging module name from the console output. +SSH access +---------- + +Node can be configured to run SSH server. See :doc:`shell` for details. + Database access --------------- diff --git a/docs/source/shell.rst b/docs/source/shell.rst index cdddc4548e..fe74e44f29 100644 --- a/docs/source/shell.rst +++ b/docs/source/shell.rst @@ -18,11 +18,47 @@ Some of its features include: * View JMX metrics and monitoring exports. * UNIX style pipes for both text and objects, an ``egrep`` command and a command for working with columnular data. -.. note:: A future version of Corda will add SSH access to the node. - It is based on the popular `CRaSH`_ shell used in various other projects and supports many of the same features. -The shell may be disabled by passing the ``--no-local-shell`` flag to the node. +Local terminal shell runs only in development mode. It may be disabled by passing the ``--no-local-shell`` flag to the node. + +SSH server +---------- + +Shell can also be accessible via SSH. By default SSH server is *disabled*. To enable it port must be configured - in ``node.conf`` file + +.. code:: bash + + sshd { + port = 2222 + } + +Authentication and authorization +-------------------------------- +SSH require user to login first - using the same users as RPC system. In fact, shell serves as a proxy to RPC and communicates +with node using RPC calls. This also means that RPC permissions are enforced. No permissions are required to allow the connection +and login in. +Watching flows (``flow watch``) requires ``InvokeRpc.stateMachinesFeed`` while starting flows requires +``InvokeRpc.startTrackedFlowDynamic`` and ``InvokeRpc.registeredFlows`` in addition to a permission for a particular flow. + +Host key +-------- + +The host key is loaded from ``sshkey/hostkey.pem`` file. If the file does not exist, it will be generated randomly, however +in the development mode seed may be tuned to give the same results on the same computer - in order to avoid host checking +errors. + +Connecting +---------- + +Linux and MacOS computers usually come with SSH client preinstalled. On Windows it usually require extra download. +Usual connection syntax is ``ssh user@host -p 2222`` - where ``user`` is a RPC username, and ``-p`` specifies a port parameters - +it's the same as setup in ``node.conf`` file. ``host`` should point to a node hostname, usually ``localhost`` if connecting and +running node on the same computer. Password will be asked after establishing connection. + +:note: While developing, checking multiple samples or simply restarting a node frequently host key may be regenerated. SSH usually + saved once trusted hosts and will refuse to connect in case of a change. Then check may be disabled with extra options + ``ssh -o StrictHostKeyChecking=no user@host -p2222``. This option should never be used in production environment! Getting help ------------ diff --git a/finance/src/main/kotlin/net/corda/finance/flows/CashIssueAndPaymentFlow.kt b/finance/src/main/kotlin/net/corda/finance/flows/CashIssueAndPaymentFlow.kt index 95a58e8cfb..04a3c1cd90 100644 --- a/finance/src/main/kotlin/net/corda/finance/flows/CashIssueAndPaymentFlow.kt +++ b/finance/src/main/kotlin/net/corda/finance/flows/CashIssueAndPaymentFlow.kt @@ -27,7 +27,7 @@ class CashIssueAndPaymentFlow(val amount: Amount, val anonymous: Boolean, val notary: Party, progressTracker: ProgressTracker) : AbstractCashFlow(progressTracker) { - constructor(amount: Amount, + constructor(amount: Amount, issueRef: OpaqueBytes, recipient: Party, anonymous: Boolean, diff --git a/gradle-plugins/cordformation/src/main/kotlin/net/corda/plugins/Node.kt b/gradle-plugins/cordformation/src/main/kotlin/net/corda/plugins/Node.kt index f03e650772..9358a293b5 100644 --- a/gradle-plugins/cordformation/src/main/kotlin/net/corda/plugins/Node.kt +++ b/gradle-plugins/cordformation/src/main/kotlin/net/corda/plugins/Node.kt @@ -96,15 +96,15 @@ class Node(private val project: Project) : CordformNode() { } /** - * Set the SSHD port for this node. + * Enables SSH access on given port * - * @param sshdPort The SSHD port. + * @param sshdPort The port for SSH server to listen on */ - fun sshdPort(sshdPort: Int) { - config = config.withValue("sshdAddress", - ConfigValueFactory.fromAnyRef("$DEFAULT_HOST:$sshdPort")) + fun sshdPort(sshdPort: Int?) { + config = config.withValue("sshd.port", ConfigValueFactory.fromAnyRef(sshdPort)) } + internal fun build() { configureProperties() installCordaJar() diff --git a/node/build.gradle b/node/build.gradle index e1896cfd28..7e9adec300 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -139,8 +139,14 @@ dependencies { compile "io.netty:netty-all:$netty_version" // CRaSH: An embeddable monitoring and admin shell with support for adding new commands written in Groovy. - compile("com.github.corda.crash:crash.shell:d5da86ba1b38e9c33af2a621dd15ba286307bec4") { + compile("com.github.corda.crash:crash.shell:$crash_version") { exclude group: "org.slf4j", module: "slf4j-jdk14" + exclude group: "org.bouncycastle" + } + + compile("com.github.corda.crash:crash.connectors.ssh:$crash_version") { + exclude group: "org.slf4j", module: "slf4j-jdk14" + exclude group: "org.bouncycastle" } // OkHTTP: Simple HTTP library. @@ -157,6 +163,9 @@ dependencies { integrationTestCompile "junit:junit:$junit_version" integrationTestCompile "org.assertj:assertj-core:${assertj_version}" + // Jsh: Testing SSH server + integrationTestCompile group: 'com.jcraft', name: 'jsch', version: '0.1.54' + // Jetty dependencies for NetworkMapClient test. // Web stuff: for HTTP[S] servlets testCompile "org.eclipse.jetty:jetty-servlet:${jetty_version}" diff --git a/node/src/integration-test/kotlin/net/corda/node/SSHServerTest.kt b/node/src/integration-test/kotlin/net/corda/node/SSHServerTest.kt new file mode 100644 index 0000000000..e7346a1932 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/SSHServerTest.kt @@ -0,0 +1,170 @@ +package net.corda.node + +import co.paralleluniverse.fibers.Suspendable +import com.jcraft.jsch.ChannelExec +import com.jcraft.jsch.JSch +import com.jcraft.jsch.JSchException +import net.corda.core.flows.* +import net.corda.core.identity.Party +import net.corda.core.utilities.ProgressTracker +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.unwrap +import net.corda.nodeapi.User +import net.corda.testing.ALICE +import net.corda.testing.driver.driver +import org.bouncycastle.util.io.Streams +import org.junit.Test +import net.corda.node.services.Permissions.Companion.startFlow +import java.net.ConnectException +import kotlin.test.assertTrue +import kotlin.test.fail +import org.assertj.core.api.Assertions.assertThat +import java.util.regex.Pattern + +class SSHServerTest { + + @Test() + fun `ssh server does not start be default`() { + val user = User("u", "p", setOf()) + // The driver will automatically pick up the annotated flows below + driver() { + val node = startNode(providedName = ALICE.name, rpcUsers = listOf(user)) + node.getOrThrow() + + val session = JSch().getSession("u", "localhost", 2222) + session.setConfig("StrictHostKeyChecking", "no") + session.setPassword("p") + + try { + session.connect() + fail() + } catch (e:JSchException) { + assertTrue(e.cause is ConnectException) + } + } + } + + @Test + fun `ssh server starts when configured`() { + val user = User("u", "p", setOf()) + // The driver will automatically pick up the annotated flows below + driver { + val node = startNode(providedName = ALICE.name, rpcUsers = listOf(user), + customOverrides = mapOf("sshd" to mapOf("port" to 2222))) + node.getOrThrow() + + val session = JSch().getSession("u", "localhost", 2222) + session.setConfig("StrictHostKeyChecking", "no") + session.setPassword("p") + + session.connect() + + assertTrue(session.isConnected) + } + } + + + @Test + fun `ssh server verify credentials`() { + val user = User("u", "p", setOf()) + // The driver will automatically pick up the annotated flows below + driver { + val node = startNode(providedName = ALICE.name, rpcUsers = listOf(user), + customOverrides = mapOf("sshd" to mapOf("port" to 2222))) + node.getOrThrow() + + val session = JSch().getSession("u", "localhost", 2222) + session.setConfig("StrictHostKeyChecking", "no") + session.setPassword("p_is_bad_password") + + try { + session.connect() + fail("Server should reject invalid credentials") + } catch (e: JSchException) { + //There is no specialized exception for this + assertTrue(e.message == "Auth fail") + } + } + } + + @Test + fun `ssh respects permissions`() { + val user = User("u", "p", setOf(startFlow())) + // The driver will automatically pick up the annotated flows below + driver(isDebug = true) { + val node = startNode(providedName = ALICE.name, rpcUsers = listOf(user), + customOverrides = mapOf("sshd" to mapOf("port" to 2222))) + node.getOrThrow() + + val session = JSch().getSession("u", "localhost", 2222) + session.setConfig("StrictHostKeyChecking", "no") + session.setPassword("p") + session.connect() + + assertTrue(session.isConnected) + + val channel = session.openChannel("exec") as ChannelExec + channel.setCommand("start FlowICannotRun otherParty: \"O=Alice Corp,L=Madrid,C=ES\"") + channel.connect() + val response = String(Streams.readAll(channel.inputStream)) + + val flowNameEscaped = Pattern.quote("StartFlow.${SSHServerTest::class.qualifiedName}$${FlowICannotRun::class.simpleName}") + + channel.disconnect() + session.disconnect() + + assertThat(response).matches("(?s)User not permissioned with any of \\[[^]]*${flowNameEscaped}.*") + } + } + + @Test + fun `ssh runs flows`() { + val user = User("u", "p", setOf(startFlow())) + // The driver will automatically pick up the annotated flows below + driver(isDebug = true) { + val node = startNode(providedName = ALICE.name, rpcUsers = listOf(user), + customOverrides = mapOf("sshd" to mapOf("port" to 2222))) + node.getOrThrow() + + val session = JSch().getSession("u", "localhost", 2222) + session.setConfig("StrictHostKeyChecking", "no") + session.setPassword("p") + session.connect() + + assertTrue(session.isConnected) + + val channel = session.openChannel("exec") as ChannelExec + channel.setCommand("start FlowICanRun") + channel.connect() + + val response = String(Streams.readAll(channel.inputStream)) + + //There are ANSI control characters involved, so we want to avoid direct byte to byte matching + assertThat(response.lines()).filteredOn( { it.contains("✓") && it.contains("Done")}).hasSize(1) + } + } + + @StartableByRPC + @InitiatingFlow + class FlowICanRun : FlowLogic() { + + private val HELLO_STEP = ProgressTracker.Step("Hello") + + @Suspendable + override fun call(): String { + progressTracker?.currentStep = HELLO_STEP + return "bambam" + } + + override val progressTracker: ProgressTracker? = ProgressTracker(HELLO_STEP) + } + + @StartableByRPC + @InitiatingFlow + class FlowICannotRun(val otherParty: Party) : FlowLogic() { + @Suspendable + override fun call(): String = initiateFlow(otherParty).receive().unwrap { it } + + override val progressTracker: ProgressTracker? = ProgressTracker() + } +} \ No newline at end of file diff --git a/node/src/main/java/net/corda/node/shell/FlowShellCommand.java b/node/src/main/java/net/corda/node/shell/FlowShellCommand.java index 4bac0ce5b5..d3ed752531 100644 --- a/node/src/main/java/net/corda/node/shell/FlowShellCommand.java +++ b/node/src/main/java/net/corda/node/shell/FlowShellCommand.java @@ -2,6 +2,9 @@ package net.corda.node.shell; // See the comments at the top of run.java +import net.corda.core.messaging.CordaRPCOps; +import net.corda.node.utilities.ANSIProgressRenderer; +import net.corda.node.utilities.CRaSHNSIProgressRenderer; import org.crsh.cli.*; import org.crsh.command.*; import org.crsh.text.*; @@ -9,6 +12,7 @@ import org.crsh.text.ui.TableElement; import java.util.*; +import static net.corda.node.services.messaging.RPCServerKt.CURRENT_RPC_CONTEXT; import static net.corda.node.shell.InteractiveShell.*; @Man( @@ -25,25 +29,27 @@ public class FlowShellCommand extends InteractiveShellCommand { @Usage("The class name of the flow to run, or an unambiguous substring") @Argument String name, @Usage("The data to pass as input") @Argument(unquote = false) List input ) { - startFlow(name, input, out); + startFlow(name, input, out, ops(), ansiProgressRenderer()); } // TODO Limit number of flows shown option? @Command @Usage("watch information about state machines running on the node with result information") public void watch(InvocationContext context) throws Exception { - runStateMachinesView(out); + runStateMachinesView(out, ops()); } static void startFlow(@Usage("The class name of the flow to run, or an unambiguous substring") @Argument String name, @Usage("The data to pass as input") @Argument(unquote = false) List input, - RenderPrintWriter out) { + RenderPrintWriter out, + CordaRPCOps rpcOps, + ANSIProgressRenderer ansiProgressRenderer) { if (name == null) { out.println("You must pass a name for the flow, see 'man flow'", Color.red); return; } String inp = input == null ? "" : String.join(" ", input).trim(); - runFlowByNameFragment(name, inp, out); + runFlowByNameFragment(name, inp, out, rpcOps, ansiProgressRenderer != null ? ansiProgressRenderer : new CRaSHNSIProgressRenderer(out) ); } @Command diff --git a/node/src/main/java/net/corda/node/shell/StartShellCommand.java b/node/src/main/java/net/corda/node/shell/StartShellCommand.java index 3ec2e8e2ee..e1c91ebb75 100644 --- a/node/src/main/java/net/corda/node/shell/StartShellCommand.java +++ b/node/src/main/java/net/corda/node/shell/StartShellCommand.java @@ -2,6 +2,8 @@ package net.corda.node.shell; // A simple forwarder to the "flow start" command, for easier typing. +import net.corda.node.utilities.ANSIProgressRenderer; +import net.corda.node.utilities.CRaSHNSIProgressRenderer; import org.crsh.cli.*; import java.util.*; @@ -11,6 +13,7 @@ public class StartShellCommand extends InteractiveShellCommand { @Man("An alias for 'flow start'. Example: \"start Yo target: Some other company\"") public void main(@Usage("The class name of the flow to run, or an unambiguous substring") @Argument String name, @Usage("The data to pass as input") @Argument(unquote = false) List input) { - FlowShellCommand.startFlow(name, input, out); + ANSIProgressRenderer ansiProgressRenderer = ansiProgressRenderer(); + FlowShellCommand.startFlow(name, input, out, ops(), ansiProgressRenderer != null ? ansiProgressRenderer : new CRaSHNSIProgressRenderer(out)); } } diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 14a8179d19..d6aad5cf45 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -36,6 +36,7 @@ import net.corda.node.internal.cordapp.CordappProviderInternal import net.corda.node.services.ContractUpgradeHandler import net.corda.node.services.FinalityHandler import net.corda.node.services.NotaryChangeHandler +import net.corda.node.services.RPCUserService import net.corda.node.services.api.* import net.corda.node.services.config.BFTSMaRtConfiguration import net.corda.node.services.config.NodeConfiguration @@ -55,6 +56,7 @@ import net.corda.node.services.transactions.* import net.corda.node.services.upgrade.ContractUpgradeServiceImpl import net.corda.node.services.vault.NodeVaultService import net.corda.node.services.vault.VaultSoftLockManager +import net.corda.node.shell.InteractiveShell import net.corda.node.utilities.* import org.apache.activemq.artemis.utils.ReusableLatch import org.slf4j.Logger @@ -130,6 +132,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, protected val _nodeReadyFuture = openFuture() protected val networkMapClient: NetworkMapClient? by lazy { configuration.compatibilityZoneURL?.let(::NetworkMapClient) } + lateinit var userService: RPCUserService get + /** Completes once the node has successfully registered with the network map service * or has loaded network map data from local database */ val nodeReadyFuture: CordaFuture @@ -213,6 +217,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration, FlowLogicRefFactoryImpl.classloader = cordappLoader.appClassLoader runOnStop += network::stop + + startShell(rpcOps) + Pair(StartedNodeImpl(this, _services, info, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService) } @@ -243,6 +250,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } } + open fun startShell(rpcOps: CordaRPCOps) { + InteractiveShell.startShell(configuration, rpcOps, userService, _services.identityService, _services.database) + } + private fun initNodeInfo(): Pair, NodeInfo> { val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null) val keyPairs = mutableSetOf(identityKeyPair) diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index d28c8b67ab..660ce4b767 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -142,7 +142,9 @@ internal class CordaRPCOpsImpl( return FlowProgressHandleImpl( id = stateMachine.id, returnValue = stateMachine.resultFuture, - progress = stateMachine.logic.track()?.updates ?: Observable.empty() + progress = stateMachine.logic.track()?.updates ?: Observable.empty(), + stepsTreeIndexFeed = stateMachine.logic.trackStepsTreeIndex(), + stepsTreeFeed = stateMachine.logic.trackStepsTree() ) } diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 99af1e7965..2f378ed7ba 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -131,7 +131,6 @@ open class Node(configuration: NodeConfiguration, private var shutdownHook: ShutdownHook? = null - private lateinit var userService: RPCUserService override fun makeMessagingService(database: CordaPersistence, info: NodeInfo): MessagingService { userService = RPCUserServiceImpl(configuration.rpcUsers) diff --git a/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt b/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt index 4ed9546140..2947700025 100644 --- a/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt +++ b/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt @@ -117,12 +117,13 @@ open class NodeStartup(val args: Array) { Node.printBasicNodeInfo("Node for \"$name\" started up and registered in $elapsed sec") // Don't start the shell if there's no console attached. - val runShell = !cmdlineOptions.noLocalShell && System.console() != null - startedNode.internals.startupComplete.then { - try { - InteractiveShell.startShell(cmdlineOptions.baseDirectory, runShell, cmdlineOptions.sshdServer, startedNode) - } catch (e: Throwable) { - logger.error("Shell failed to start", e) + if (!cmdlineOptions.noLocalShell && System.console() != null && conf.devMode) { + startedNode.internals.startupComplete.then { + try { + InteractiveShell.runLocalShell(startedNode) + } catch (e: Throwable) { + logger.error("Shell failed to start", e) + } } } }, @@ -317,7 +318,6 @@ open class NodeStartup(val args: Array) { a("--- ${versionInfo.vendor} ${versionInfo.releaseVersion} (${versionInfo.revision.take(7)}) -----------------------------------------------"). newline(). newline(). - newline(). reset()) } } diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index cffde21e21..607f3927b8 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -39,6 +39,7 @@ interface NodeConfiguration : NodeSSLConfiguration { // TODO Move into DevModeOptions val useTestClock: Boolean get() = false val detectPublicIp: Boolean get() = true + val sshd: SSHDConfiguration? } fun NodeConfiguration.shouldCheckCheckpoints(): Boolean { @@ -109,7 +110,9 @@ data class NodeConfigurationImpl( override val detectPublicIp: Boolean = true, override val activeMQServer: ActiveMqServerConfiguration, // TODO See TODO above. Rename this to nodeInfoPollingFrequency and make it of type Duration - override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis() + override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis(), + override val sshd: SSHDConfiguration? = null + ) : NodeConfiguration { override val exportJMXto: String get() = "http" @@ -144,3 +147,5 @@ data class CertChainPolicyConfig(val role: String, private val policy: CertChain } } } + +data class SSHDConfiguration(val port: Int) \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RpcAuthContext.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RpcAuthContext.kt index 8daa20f2a3..58cd73de22 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RpcAuthContext.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RpcAuthContext.kt @@ -23,6 +23,7 @@ data class RpcPermissions(private val values: Set = emptySet()) { companion object { val NONE = RpcPermissions() + val ALL = RpcPermissions(setOf("ALL")) } fun coverAny(permissions: Set) = !values.intersect(permissions + Permissions.all()).isEmpty() diff --git a/node/src/main/kotlin/net/corda/node/shell/CordaAuthenticationPlugin.kt b/node/src/main/kotlin/net/corda/node/shell/CordaAuthenticationPlugin.kt new file mode 100644 index 0000000000..9854a9caf4 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/shell/CordaAuthenticationPlugin.kt @@ -0,0 +1,35 @@ +package net.corda.node.shell + +import net.corda.core.context.Actor +import net.corda.core.context.InvocationContext +import net.corda.core.identity.CordaX500Name +import net.corda.core.messaging.CordaRPCOps +import net.corda.node.services.RPCUserService +import net.corda.node.services.messaging.RpcPermissions +import org.crsh.auth.AuthInfo +import org.crsh.auth.AuthenticationPlugin +import org.crsh.plugin.CRaSHPlugin + +class CordaAuthenticationPlugin(val rpcOps:CordaRPCOps, val userService:RPCUserService, val nodeLegalName:CordaX500Name) : CRaSHPlugin>(), AuthenticationPlugin { + + override fun getImplementation(): AuthenticationPlugin = this + + override fun getName(): String = "corda" + + override fun authenticate(username: String?, credential: String?): AuthInfo { + if (username == null || credential == null) { + return AuthInfo.UNSUCCESSFUL + } + + val user = userService.getUser(username) + + if (user != null && user.password == credential) { + val actor = Actor(Actor.Id(username), userService.id, nodeLegalName) + return CordaSSHAuthInfo(true, RPCOpsWithContext(rpcOps, InvocationContext.rpc(actor), RpcPermissions(user.permissions))) + } + + return AuthInfo.UNSUCCESSFUL; + } + + override fun getCredentialType(): Class = String::class.java +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/shell/CordaSSHAuthInfo.kt b/node/src/main/kotlin/net/corda/node/shell/CordaSSHAuthInfo.kt new file mode 100644 index 0000000000..04bda1a4bb --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/shell/CordaSSHAuthInfo.kt @@ -0,0 +1,9 @@ +package net.corda.node.shell + +import net.corda.core.messaging.CordaRPCOps +import net.corda.node.utilities.ANSIProgressRenderer +import org.crsh.auth.AuthInfo + +class CordaSSHAuthInfo(val successful: Boolean, val rpcOps: CordaRPCOps, val ansiProgressRenderer: ANSIProgressRenderer? = null) : AuthInfo { + override fun isSuccessful(): Boolean = successful +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt b/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt index 9e66fd4195..d2ca683740 100644 --- a/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt +++ b/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt @@ -9,25 +9,32 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory import com.google.common.io.Closeables import net.corda.client.jackson.JacksonSupport import net.corda.client.jackson.StringToMethodCallParser +import net.corda.client.rpc.PermissionException import net.corda.core.CordaException import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.UniqueIdentifier import net.corda.core.flows.FlowLogic +import net.corda.core.identity.CordaX500Name import net.corda.core.internal.* -import net.corda.core.internal.concurrent.OpenFuture +import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.DataFeed +import net.corda.core.messaging.FlowProgressHandle import net.corda.core.messaging.StateMachineUpdate import net.corda.core.utilities.getOrThrow +import net.corda.core.node.services.IdentityService +import net.corda.core.utilities.loggerFor import net.corda.node.internal.Node import net.corda.node.internal.StartedNode +import net.corda.node.services.RPCUserService +import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.messaging.CURRENT_RPC_CONTEXT import net.corda.node.services.messaging.RpcAuthContext import net.corda.node.services.messaging.RpcPermissions -import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.node.utilities.ANSIProgressRenderer import net.corda.node.utilities.CordaPersistence +import net.corda.node.utilities.StdoutANSIProgressRenderer import org.crsh.command.InvocationContext import org.crsh.console.jline.JLineProcessor import org.crsh.console.jline.TerminalFactory @@ -77,59 +84,55 @@ object InteractiveShell { private lateinit var node: StartedNode @VisibleForTesting internal lateinit var database: CordaPersistence + private lateinit var rpcOps:CordaRPCOps + private lateinit var userService:RPCUserService + private lateinit var identityService:IdentityService + private var shell:Shell? = null + private lateinit var nodeLegalName: CordaX500Name /** * Starts an interactive shell connected to the local terminal. This shell gives administrator access to the node * internals. */ - fun startShell(dir: Path, runLocalShell: Boolean, runSSHServer: Boolean, node: StartedNode) { - this.node = node - this.database = node.database - var runSSH = runSSHServer + fun startShell(configuration:NodeConfiguration, cordaRPCOps: CordaRPCOps, userService: RPCUserService, identityService: IdentityService, database:CordaPersistence) { + this.rpcOps = cordaRPCOps + this.userService = userService + this.identityService = identityService + this.nodeLegalName = configuration.myLegalName + this.database = database + val dir = configuration.baseDirectory + val runSshDeamon = configuration.sshd != null val config = Properties() - if (runSSH) { - // TODO: Finish and enable SSH access. - // This means bringing the CRaSH SSH plugin into the Corda tree and applying Marek's patches - // found in https://github.com/marekdapps/crash/commit/8a37ce1c7ef4d32ca18f6396a1a9d9841f7ff643 - // to that local copy, as CRaSH is no longer well maintained by the upstream and the SSH plugin - // that it comes with is based on a very old version of Apache SSHD which can't handle connections - // from newer SSH clients. It also means hooking things up to the authentication system. - Node.printBasicNodeInfo("SSH server access is not fully implemented, sorry.") - runSSH = false - } + if (runSshDeamon) { + val sshKeysDir = dir / "sshkey" + sshKeysDir.toFile().mkdirs() - if (runSSH) { // Enable SSH access. Note: these have to be strings, even though raw object assignments also work. - config["crash.ssh.keypath"] = (dir / "sshkey").toString() + config["crash.ssh.keypath"] = (sshKeysDir / "hostkey.pem").toString() config["crash.ssh.keygen"] = "true" - // config["crash.ssh.port"] = node.configuration.sshdAddress.port.toString() - config["crash.auth"] = "simple" - config["crash.auth.simple.username"] = "admin" - config["crash.auth.simple.password"] = "admin" + config["crash.ssh.port"] = configuration.sshd?.port.toString() + config["crash.auth"] = "corda" } ExternalResolver.INSTANCE.addCommand("run", "Runs a method from the CordaRPCOps interface on the node.", RunShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("flow", "Commands to work with flows. Flows are how you can change the ledger.", FlowShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("start", "An alias for 'flow start'", StartShellCommand::class.java) - val shell = ShellLifecycle(dir).start(config) + shell = ShellLifecycle(dir).start(config) - if (runSSH) { - // printBasicNodeInfo("SSH server listening on address", node.configuration.sshdAddress.toString()) + if (runSshDeamon) { + Node.printBasicNodeInfo("SSH server listening on port", configuration.sshd!!.port.toString()) } + } - // Possibly bring up a local shell in the launching terminal window, unless it's disabled. - if (!runLocalShell) - return - // TODO: Automatically set up the JDBC sub-command with a connection to the database. + fun runLocalShell(node:StartedNode) { val terminal = TerminalFactory.create() val consoleReader = ConsoleReader("Corda", FileInputStream(FileDescriptor.`in`), System.out, terminal) val jlineProcessor = JLineProcessor(terminal.isAnsiSupported, shell, consoleReader, System.out) InterruptHandler { jlineProcessor.interrupt() }.install() thread(name = "Command line shell processor", isDaemon = true) { // Give whoever has local shell access administrator access to the node. - // TODO remove this after Shell switches to RPC - val context = RpcAuthContext(net.corda.core.context.InvocationContext.shell(), RpcPermissions.NONE) + val context = RpcAuthContext(net.corda.core.context.InvocationContext.shell(), RpcPermissions.ALL) CURRENT_RPC_CONTEXT.set(context) Emoji.renderIfSupported { jlineProcessor.run() @@ -168,27 +171,25 @@ object InteractiveShell { // Don't use the Java language plugin (we may not have tools.jar available at runtime), this // will cause any commands using JIT Java compilation to be suppressed. In CRaSH upstream that // is only the 'jmx' command. - return super.getPlugins().filterNot { it is JavaLanguage } + return super.getPlugins().filterNot { it is JavaLanguage } + CordaAuthenticationPlugin(rpcOps, userService, nodeLegalName) } } val attributes = mapOf( - "node" to node.internals, - "services" to node.services, - "ops" to node.rpcOps, + "ops" to rpcOps, "mapper" to yamlInputMapper ) val context = PluginContext(discovery, attributes, commandsFS, confFS, classLoader) context.refresh() this.config = config start(context) - return context.getPlugin(ShellFactory::class.java).create(null) + return context.getPlugin(ShellFactory::class.java).create(null, CordaSSHAuthInfo(false, RPCOpsWithContext(rpcOps, net.corda.core.context.InvocationContext.shell(), RpcPermissions.ALL), StdoutANSIProgressRenderer)) } } private val yamlInputMapper: ObjectMapper by lazy { // Return a standard Corda Jackson object mapper, configured to use YAML by default and with extra // serializers. - JacksonSupport.createInMemoryMapper(node.services.identityService, YAMLFactory(), true).apply { + JacksonSupport.createInMemoryMapper(identityService, YAMLFactory(), true).apply { val rpcModule = SimpleModule() rpcModule.addDeserializer(InputStream::class.java, InputStreamDeserializer) rpcModule.addDeserializer(UniqueIdentifier::class.java, UniqueIdentifierDeserializer) @@ -217,42 +218,41 @@ object InteractiveShell { /** * Called from the 'flow' shell command. Takes a name fragment and finds a matching flow, or prints out * the list of options if the request is ambiguous. Then parses [inputData] as constructor arguments using - * the [runFlowFromString] method and starts the requested flow using the [ANSIProgressRenderer] to draw - * the progress tracker. Ctrl-C can be used to cancel. + * the [runFlowFromString] method and starts the requested flow. Ctrl-C can be used to cancel. */ @JvmStatic - fun runFlowByNameFragment(nameFragment: String, inputData: String, output: RenderPrintWriter) { - val matches = node.services.rpcFlows.filter { nameFragment in it.name } + fun runFlowByNameFragment(nameFragment: String, inputData: String, output: RenderPrintWriter, rpcOps: CordaRPCOps, ansiProgressRenderer: ANSIProgressRenderer) { + val matches = rpcOps.registeredFlows().filter { nameFragment in it } if (matches.isEmpty()) { output.println("No matching flow found, run 'flow list' to see your options.", Color.red) return } else if (matches.size > 1) { - output.println("Ambigous name provided, please be more specific. Your options are:") + output.println("Ambiguous name provided, please be more specific. Your options are:") matches.forEachIndexed { i, s -> output.println("${i + 1}. $s", Color.yellow) } return } - val clazz: Class> = uncheckedCast(matches.single()) + val clazz: Class> = uncheckedCast(Class.forName(matches.single())) try { - // TODO Flow invocation should use startFlowDynamic. - val context = net.corda.core.context.InvocationContext.shell() - val fsm = runFlowFromString({ node.services.startFlow(it, context).getOrThrow() }, inputData, clazz) // Show the progress tracker on the console until the flow completes or is interrupted with a // Ctrl-C keypress. + val stateObservable = runFlowFromString({ clazz,args -> rpcOps.startTrackedFlowDynamic (clazz, *args) }, inputData, clazz) + val latch = CountDownLatch(1) - ANSIProgressRenderer.onDone = { latch.countDown() } - ANSIProgressRenderer.progressTracker = (fsm as FlowStateMachineImpl).logic.progressTracker + ansiProgressRenderer.render(stateObservable, { latch.countDown() }) try { // Wait for the flow to end and the progress tracker to notice. By the time the latch is released // the tracker is done with the screen. latch.await() } catch (e: InterruptedException) { - ANSIProgressRenderer.progressTracker = null // TODO: When the flow framework allows us to kill flows mid-flight, do so here. } + } catch (e: NoApplicableConstructor) { output.println("No matching constructor found:", Color.red) e.errors.forEach { output.println("- $it", Color.red) } + } catch (e:PermissionException) { + output.println(e.message ?: "Access denied", Color.red) } finally { InputStreamDeserializer.closeAll() } @@ -273,10 +273,10 @@ object InteractiveShell { * @throws NoApplicableConstructor if no constructor could be found for the given set of types. */ @Throws(NoApplicableConstructor::class) - fun runFlowFromString(invoke: (FlowLogic<*>) -> FlowStateMachine<*>, + fun runFlowFromString(invoke: (Class>, Array) -> FlowProgressHandle, inputData: String, - clazz: Class>, - om: ObjectMapper = yamlInputMapper): FlowStateMachine<*> { + clazz: Class>, + om: ObjectMapper = yamlInputMapper): FlowProgressHandle { // For each constructor, attempt to parse the input data as a method call. Use the first that succeeds, // and keep track of the reasons we failed so we can print them out if no constructors are usable. val parser = StringToMethodCallParser(clazz, om) @@ -303,7 +303,7 @@ object InteractiveShell { errors.add("A flow must override the progress tracker in order to be run from the shell") continue } - return invoke(flow) + return invoke(clazz, args) } catch (e: StringToMethodCallParser.UnparseableCallException.MissingParameter) { errors.add("${getPrototype()}: missing parameter ${e.paramName}") } catch (e: StringToMethodCallParser.UnparseableCallException.TooManyParameters) { @@ -321,8 +321,8 @@ object InteractiveShell { // TODO Filtering on error/success when we will have some sort of flow auditing, for now it doesn't make much sense. @JvmStatic - fun runStateMachinesView(out: RenderPrintWriter): Any? { - val proxy = node.rpcOps + fun runStateMachinesView(out: RenderPrintWriter, rpcOps: CordaRPCOps): Any? { + val proxy = rpcOps val (stateMachines, stateMachineUpdates) = proxy.stateMachinesFeed() val currentStateMachines = stateMachines.map { StateMachineUpdate.Added(it) } val subscriber = FlowWatchPrintingSubscriber(out) @@ -395,7 +395,7 @@ object InteractiveShell { return result } - private fun printAndFollowRPCResponse(response: Any?, toStream: PrintWriter): CordaFuture? { + private fun printAndFollowRPCResponse(response: Any?, toStream: PrintWriter): CordaFuture { val printerFun = yamlMapper::writeValueAsString toStream.println(printerFun(response)) toStream.flush() @@ -422,28 +422,31 @@ object InteractiveShell { override fun onNext(t: Any?) { count++ toStream.println("Observation $count: " + printerFun(t)) - toStream.flush() } @Synchronized override fun onError(e: Throwable) { toStream.println("Observable completed with an error") - e.printStackTrace() + e.printStackTrace(toStream) future.setException(e) } } - private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, toStream: PrintWriter): OpenFuture? { + private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, toStream: PrintWriter): CordaFuture { // Match on a couple of common patterns for "important" observables. It's tough to do this in a generic // way because observables can be embedded anywhere in the object graph, and can emit other arbitrary // object graphs that contain yet more observables. So we just look for top level responses that follow // the standard "track" pattern, and print them until the user presses Ctrl-C - if (response == null) return null + if (response == null) return doneFuture(Unit) val observable: Observable<*> = when (response) { is Observable<*> -> response - is DataFeed<*, *> -> response.updates - else -> return null + is DataFeed<*, *> -> { + toStream.println("Snapshot") + toStream.println(response.snapshot) + response.updates + } + else -> return doneFuture(Unit) } val subscriber = PrintingSubscriber(printerFun, toStream) diff --git a/node/src/main/kotlin/net/corda/node/shell/InteractiveShellCommand.kt b/node/src/main/kotlin/net/corda/node/shell/InteractiveShellCommand.kt index 1c352c9bd4..9278fffb95 100644 --- a/node/src/main/kotlin/net/corda/node/shell/InteractiveShellCommand.kt +++ b/node/src/main/kotlin/net/corda/node/shell/InteractiveShellCommand.kt @@ -4,12 +4,14 @@ import com.fasterxml.jackson.databind.ObjectMapper import net.corda.core.messaging.CordaRPCOps import net.corda.node.services.api.ServiceHubInternal import org.crsh.command.BaseCommand +import org.crsh.shell.impl.command.CRaSHSession /** * Simply extends CRaSH BaseCommand to add easy access to the RPC ops class. */ open class InteractiveShellCommand : BaseCommand() { - fun ops() = context.attributes["ops"] as CordaRPCOps + fun ops() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).rpcOps + fun ansiProgressRenderer() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).ansiProgressRenderer fun services() = context.attributes["services"] as ServiceHubInternal fun objectMapper() = context.attributes["mapper"] as ObjectMapper } diff --git a/node/src/main/kotlin/net/corda/node/shell/RPCOpsWithContext.kt b/node/src/main/kotlin/net/corda/node/shell/RPCOpsWithContext.kt new file mode 100644 index 0000000000..75a17d7486 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/shell/RPCOpsWithContext.kt @@ -0,0 +1,210 @@ +package net.corda.node.shell + +import net.corda.core.concurrent.CordaFuture +import net.corda.core.context.InvocationContext +import net.corda.core.contracts.ContractState +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FlowLogic +import net.corda.core.identity.AbstractParty +import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party +import net.corda.core.messaging.* +import net.corda.core.node.NodeInfo +import net.corda.core.node.services.AttachmentId +import net.corda.core.node.services.NetworkMapCache +import net.corda.core.node.services.Vault +import net.corda.core.node.services.vault.* +import net.corda.core.transactions.SignedTransaction +import net.corda.core.utilities.getOrThrow +import net.corda.node.services.messaging.CURRENT_RPC_CONTEXT +import net.corda.node.services.messaging.RpcAuthContext +import net.corda.node.services.messaging.RpcPermissions +import rx.Observable +import java.io.InputStream +import java.security.PublicKey +import java.time.Instant +import java.util.concurrent.CompletableFuture +import java.util.concurrent.Future + +class RPCOpsWithContext(val cordaRPCOps: CordaRPCOps, val invocationContext:InvocationContext, val rpcPermissions: RpcPermissions) : CordaRPCOps { + + + class RPCContextRunner(val invocationContext:InvocationContext, val permissions:RpcPermissions, val block:() -> T) : Thread() { + private var result: CompletableFuture = CompletableFuture() + override fun run() { + CURRENT_RPC_CONTEXT.set(RpcAuthContext(invocationContext, permissions)) + try { + result.complete(block()) + } catch (e:Throwable) { + result.completeExceptionally(e) + } + CURRENT_RPC_CONTEXT.remove() + } + + fun get(): Future { + start() + join() + return result + } + } + + override fun uploadAttachmentWithMetadata(jar: InputStream, uploader: String, filename: String): SecureHash { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.uploadAttachmentWithMetadata(jar, uploader, filename) }.get().getOrThrow() + } + + override fun queryAttachments(query: AttachmentQueryCriteria, sorting: AttachmentSort?): List { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.queryAttachments(query, sorting) }.get().getOrThrow() + } + + override fun nodeStateObservable(): Observable { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.nodeStateObservable() }.get().getOrThrow() + } + + override fun vaultTrackByWithSorting(contractStateType: Class, criteria: QueryCriteria, sorting: Sort): DataFeed, Vault.Update> { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultTrackByWithSorting(contractStateType, criteria, sorting) }.get().getOrThrow() + } + + override fun vaultTrackByWithPagingSpec(contractStateType: Class, criteria: QueryCriteria, paging: PageSpecification): DataFeed, Vault.Update> { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultTrackByWithPagingSpec(contractStateType, criteria, paging) }.get().getOrThrow() + } + + override fun vaultTrackByCriteria(contractStateType: Class, criteria: QueryCriteria): DataFeed, Vault.Update> { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultTrackByCriteria(contractStateType, criteria) }.get().getOrThrow() + } + + override fun vaultTrack(contractStateType: Class): DataFeed, Vault.Update> { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultTrack(contractStateType) }.get().getOrThrow() + } + + override fun vaultQueryByWithSorting(contractStateType: Class, criteria: QueryCriteria, sorting: Sort): Vault.Page { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultQueryByWithSorting(contractStateType, criteria, sorting) }.get().getOrThrow() + } + + override fun vaultQueryByWithPagingSpec(contractStateType: Class, criteria: QueryCriteria, paging: PageSpecification): Vault.Page { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultQueryByWithPagingSpec(contractStateType, criteria, paging) }.get().getOrThrow() + } + + override fun vaultQueryByCriteria(criteria: QueryCriteria, contractStateType: Class): Vault.Page { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultQueryByCriteria(criteria, contractStateType) }.get().getOrThrow() + } + + override fun vaultQuery(contractStateType: Class): Vault.Page { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultQuery(contractStateType) }.get().getOrThrow() + } + + override fun stateMachinesSnapshot(): List { + return RPCContextRunner(invocationContext, rpcPermissions, cordaRPCOps::stateMachinesSnapshot).get().getOrThrow() + } + + override fun stateMachinesFeed(): DataFeed, StateMachineUpdate> { + return RPCContextRunner(invocationContext, rpcPermissions, cordaRPCOps::stateMachinesFeed).get().getOrThrow() + } + + override fun vaultQueryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class): Vault.Page { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultQueryBy(criteria, paging, sorting, contractStateType) }.get().getOrThrow() + } + + override fun vaultTrackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class): DataFeed, Vault.Update> { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.vaultTrackBy(criteria, paging, sorting, contractStateType) }.get().getOrThrow() + } + + override fun internalVerifiedTransactionsSnapshot(): List { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.internalVerifiedTransactionsSnapshot() }.get().getOrThrow() + } + + override fun internalVerifiedTransactionsFeed(): DataFeed, SignedTransaction> { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.internalVerifiedTransactionsFeed() }.get().getOrThrow() + } + + override fun stateMachineRecordedTransactionMappingSnapshot(): List { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.stateMachineRecordedTransactionMappingSnapshot() }.get().getOrThrow() + } + + override fun stateMachineRecordedTransactionMappingFeed(): DataFeed, StateMachineTransactionMapping> { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.stateMachineRecordedTransactionMappingFeed() }.get().getOrThrow() + } + + override fun networkMapSnapshot(): List { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.networkMapSnapshot() }.get().getOrThrow() + } + + override fun networkMapFeed(): DataFeed, NetworkMapCache.MapChange> { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.networkMapFeed() }.get().getOrThrow() + } + + override fun startFlowDynamic(logicType: Class>, vararg args: Any?): FlowHandle { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.startFlowDynamic(logicType, *args) }.get().getOrThrow() + } + + override fun startTrackedFlowDynamic(logicType: Class>, vararg args: Any?): FlowProgressHandle { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.startTrackedFlowDynamic(logicType, *args) }.get().getOrThrow() + } + + override fun nodeInfo(): NodeInfo { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.nodeInfo() }.get().getOrThrow() + } + + override fun notaryIdentities(): List { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.notaryIdentities() }.get().getOrThrow() + } + + override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.addVaultTransactionNote(txnId, txnNote) }.get().getOrThrow() + } + + override fun getVaultTransactionNotes(txnId: SecureHash): Iterable { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.getVaultTransactionNotes(txnId) }.get().getOrThrow() + } + + override fun attachmentExists(id: SecureHash): Boolean { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.attachmentExists(id) }.get().getOrThrow() + } + + override fun openAttachment(id: SecureHash): InputStream { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.openAttachment(id) }.get().getOrThrow() + } + + override fun uploadAttachment(jar: InputStream): SecureHash { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.uploadAttachment(jar) }.get().getOrThrow() + } + + override fun currentNodeTime(): Instant { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.currentNodeTime() }.get().getOrThrow() + } + + override fun waitUntilNetworkReady(): CordaFuture { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.waitUntilNetworkReady() }.get().getOrThrow() + } + + override fun wellKnownPartyFromAnonymous(party: AbstractParty): Party? { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.wellKnownPartyFromAnonymous(party) }.get().getOrThrow() + } + + override fun partyFromKey(key: PublicKey): Party? { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.partyFromKey(key) }.get().getOrThrow() + } + + override fun wellKnownPartyFromX500Name(x500Name: CordaX500Name): Party? { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.wellKnownPartyFromX500Name(x500Name) }.get().getOrThrow() + } + + override fun notaryPartyFromX500Name(x500Name: CordaX500Name): Party? { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.notaryPartyFromX500Name(x500Name) }.get().getOrThrow() + } + + override fun partiesFromName(query: String, exactMatch: Boolean): Set { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.partiesFromName(query, exactMatch) }.get().getOrThrow() + } + + override fun registeredFlows(): List { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.registeredFlows() }.get().getOrThrow() + } + + override fun nodeInfoFromParty(party: AbstractParty): NodeInfo? { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.nodeInfoFromParty(party) }.get().getOrThrow() + } + + override fun clearNetworkMapCache() { + return RPCContextRunner(invocationContext, rpcPermissions) { cordaRPCOps.clearNetworkMapCache() }.get().getOrThrow() + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/utilities/ANSIProgressRenderer.kt b/node/src/main/kotlin/net/corda/node/utilities/ANSIProgressRenderer.kt index 8339d0ae22..303f6e91c0 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/ANSIProgressRenderer.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/ANSIProgressRenderer.kt @@ -1,138 +1,113 @@ package net.corda.node.utilities import net.corda.core.internal.Emoji -import net.corda.core.utilities.ProgressTracker -import net.corda.node.utilities.ANSIProgressRenderer.progressTracker +import net.corda.core.messaging.FlowProgressHandle import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.core.LogEvent import org.apache.logging.log4j.core.LoggerContext import org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender import org.apache.logging.log4j.core.appender.ConsoleAppender import org.apache.logging.log4j.core.appender.OutputStreamManager +import org.crsh.text.RenderPrintWriter import org.fusesource.jansi.Ansi import org.fusesource.jansi.AnsiConsole import org.fusesource.jansi.AnsiOutputStream import rx.Subscription -/** - * Knows how to render a [ProgressTracker] to the terminal using coloured, emoji-fied output. Useful when writing small - * command line tools, demos, tests etc. Just set the [progressTracker] field and it will go ahead and start drawing - * if the terminal supports it. Otherwise it just prints out the name of the step whenever it changes. - * - * When a progress tracker is on the screen, it takes over the bottom part and reconfigures logging so that, assuming - * 1 log event == 1 line, the progress tracker is always glued to the bottom and logging scrolls above it. - * - * TODO: More thread safety - */ -object ANSIProgressRenderer { +abstract class ANSIProgressRenderer { + + private var subscriptionIndex: Subscription? = null + private var subscriptionTree: Subscription? = null + + protected var usingANSI = false + protected var checkEmoji = false + + protected var treeIndex: Int = 0 + protected var tree: List> = listOf() + private var installedYet = false - private var subscription: Subscription? = null - private var usingANSI = false - - var progressTracker: ProgressTracker? = null - set(value) { - subscription?.unsubscribe() - - field = value - if (!installedYet) { - setup() - } - - // Reset the state when a new tracker is wired up. - if (value != null) { - prevMessagePrinted = null - prevLinesDrawn = 0 - draw(true) - subscription = value.changes.subscribe({ draw(true) }, { done(it) }, { done(null) }) - } - } - - var onDone: () -> Unit = {} - - private fun done(error: Throwable?) { - if (error == null) progressTracker = null - draw(true, error) - onDone() - } - - private fun setup() { - AnsiConsole.systemInstall() - - // This line looks weird as hell because the magic code to decide if we really have a TTY or not isn't - // actually exposed anywhere as a function (weak sauce). So we have to rely on our knowledge of jansi - // implementation details. - usingANSI = AnsiConsole.wrapOutputStream(System.out) !is AnsiOutputStream - - if (usingANSI) { - // This super ugly code hacks into log4j and swaps out its console appender for our own. It's a bit simpler - // than doing things the official way with a dedicated plugin, etc, as it avoids mucking around with all - // the config XML and lifecycle goop. - val manager = LogManager.getContext(false) as LoggerContext - val consoleAppender = manager.configuration.appenders.values.filterIsInstance().single { it.name == "Console-Appender" } - val scrollingAppender = object : AbstractOutputStreamAppender( - consoleAppender.name, consoleAppender.layout, consoleAppender.filter, - consoleAppender.ignoreExceptions(), true, consoleAppender.manager) { - override fun append(event: LogEvent) { - // We lock on the renderer to avoid threads that are logging to the screen simultaneously messing - // things up. Of course this slows stuff down a bit, but only whilst this little utility is in use. - // Eventually it will be replaced with a real GUI and we can delete all this. - synchronized(ANSIProgressRenderer) { - if (progressTracker != null) { - val ansi = Ansi.ansi() - repeat(prevLinesDrawn) { ansi.eraseLine().cursorUp(1).eraseLine() } - System.out.print(ansi) - System.out.flush() - } - - super.append(event) - - if (progressTracker != null) - draw(false) - } - } - } - scrollingAppender.start() - manager.configuration.appenders[consoleAppender.name] = scrollingAppender - val loggerConfigs = manager.configuration.loggers.values - for (config in loggerConfigs) { - val appenderRefs = config.appenderRefs - val consoleAppenders = config.appenders.filter { it.value is ConsoleAppender }.keys - consoleAppenders.forEach { config.removeAppender(it) } - appenderRefs.forEach { config.addAppender(manager.configuration.appenders[it.ref], it.level, it.filter) } - } - manager.updateLoggers() - } - - installedYet = true - } + private var onDone: () -> Unit = {} // prevMessagePrinted is just for non-ANSI mode. private var prevMessagePrinted: String? = null // prevLinesDraw is just for ANSI mode. - private var prevLinesDrawn = 0 + protected var prevLinesDrawn = 0 - @Synchronized private fun draw(moveUp: Boolean, error: Throwable? = null) { + private fun done(error: Throwable?) { + if (error == null) _render(null) + draw(true, error) + onDone() + } + + fun render(flowProgressHandle: FlowProgressHandle<*>, onDone: () -> Unit = {}) { + this.onDone = onDone + _render(flowProgressHandle) + } + + protected abstract fun printLine(line:String) + + protected abstract fun printAnsi(ansi:Ansi) + + protected abstract fun setup() + + private fun _render(flowProgressHandle: FlowProgressHandle<*>?) { + subscriptionIndex?.unsubscribe() + subscriptionTree?.unsubscribe() + treeIndex = 0 + tree = listOf() + + if (!installedYet) { + setup() + installedYet = true + } + + prevMessagePrinted = null + prevLinesDrawn = 0 + draw(true) + + + flowProgressHandle?.apply { + stepsTreeIndexFeed?.apply { + treeIndex = snapshot + subscriptionIndex = updates.subscribe({ + treeIndex = it + draw(true) + }, { done(it) }, { done(null) }) + } + stepsTreeFeed?.apply { + tree = snapshot + subscriptionTree = updates.subscribe({ + tree = it + draw(true) + }, { done(it) }, { done(null) }) + } + } + } + + + + @Synchronized protected fun draw(moveUp: Boolean, error: Throwable? = null) { if (!usingANSI) { - val currentMessage = progressTracker?.currentStepRecursive?.label + val currentMessage = tree.getOrNull(treeIndex)?.second if (currentMessage != null && currentMessage != prevMessagePrinted) { - println(currentMessage) + printLine(currentMessage) prevMessagePrinted = currentMessage } return } - Emoji.renderIfSupported { + fun printingBody() { // Handle the case where the number of steps in a progress tracker is changed during execution. - val ansi = Ansi.ansi() + val ansi = Ansi() if (prevLinesDrawn > 0 && moveUp) ansi.cursorUp(prevLinesDrawn) // Put a blank line between any logging and us. ansi.eraseLine() ansi.newline() - val pt = progressTracker ?: return - var newLinesDrawn = 1 + pt.renderLevel(ansi, 0, error != null) + if (tree.isEmpty()) return + var newLinesDrawn = 1 + renderLevel(ansi, error != null) if (error != null) { ansi.a("${Emoji.skullAndCrossbones} ${error.message}") @@ -152,46 +127,137 @@ object ANSIProgressRenderer { } prevLinesDrawn = newLinesDrawn - // Need to force a flush here in order to ensure stderr/stdout sync up properly. - System.out.print(ansi) - System.out.flush() + printAnsi(ansi) } + if (checkEmoji) { + Emoji.renderIfSupported(::printingBody) + } else { + printingBody() + } } // Returns number of lines rendered. - private fun ProgressTracker.renderLevel(ansi: Ansi, indent: Int, error: Boolean): Int { + private fun renderLevel(ansi: Ansi, error: Boolean): Int { with(ansi) { var lines = 0 - for ((index, step) in steps.withIndex()) { - // Don't bother rendering these special steps in some cases. - if (step == ProgressTracker.UNSTARTED) continue - if (indent > 0 && step == ProgressTracker.DONE) continue + for ((index, step) in tree.withIndex()) { val marker = when { - index < stepIndex -> "${Emoji.greenTick} " - index == stepIndex && step == ProgressTracker.DONE -> "${Emoji.greenTick} " - index == stepIndex -> "${Emoji.rightArrow} " + index < treeIndex -> "${Emoji.greenTick} " + treeIndex == tree.lastIndex -> "${Emoji.greenTick} " + index == treeIndex -> "${Emoji.rightArrow} " error -> "${Emoji.noEntry} " else -> " " // Not reached yet. } - a(" ".repeat(indent)) + a(" ".repeat(step.first)) a(marker) - val active = index == stepIndex && step != ProgressTracker.DONE + val active = index == treeIndex if (active) bold() - a(step.label) + a(step.second) if (active) boldOff() eraseLine(Ansi.Erase.FORWARD) newline() lines++ - - val child = getChildProgressTracker(step) - if (child != null) - lines += child.renderLevel(ansi, indent + 1, error) } return lines } } + + +} + +class CRaSHNSIProgressRenderer(val renderPrintWriter:RenderPrintWriter) : ANSIProgressRenderer() { + + override fun printLine(line: String) { + renderPrintWriter.println(line) + } + + override fun printAnsi(ansi: Ansi) { + renderPrintWriter.print(ansi) + renderPrintWriter.flush() + } + + override fun setup() { + //we assume SSH always use ansi + usingANSI = true + } + + +} + +/** + * Knows how to render a [FlowProgressHandle] to the terminal using coloured, emoji-fied output. Useful when writing small + * command line tools, demos, tests etc. Just call [draw] method and it will go ahead and start drawing + * if the terminal supports it. Otherwise it just prints out the name of the step whenever it changes. + * + * When a progress tracker is on the screen, it takes over the bottom part and reconfigures logging so that, assuming + * 1 log event == 1 line, the progress tracker is always glued to the bottom and logging scrolls above it. + * + * TODO: More thread safety + */ +object StdoutANSIProgressRenderer : ANSIProgressRenderer() { + + override fun setup() { + AnsiConsole.systemInstall() + + checkEmoji = true + + // This line looks weird as hell because the magic code to decide if we really have a TTY or not isn't + // actually exposed anywhere as a function (weak sauce). So we have to rely on our knowledge of jansi + // implementation details. + usingANSI = AnsiConsole.wrapOutputStream(System.out) !is AnsiOutputStream + + if (usingANSI) { + // This super ugly code hacks into log4j and swaps out its console appender for our own. It's a bit simpler + // than doing things the official way with a dedicated plugin, etc, as it avoids mucking around with all + // the config XML and lifecycle goop. + val manager = LogManager.getContext(false) as LoggerContext + val consoleAppender = manager.configuration.appenders.values.filterIsInstance().single { it.name == "Console-Appender" } + val scrollingAppender = object : AbstractOutputStreamAppender( + consoleAppender.name, consoleAppender.layout, consoleAppender.filter, + consoleAppender.ignoreExceptions(), true, consoleAppender.manager) { + override fun append(event: LogEvent) { + // We lock on the renderer to avoid threads that are logging to the screen simultaneously messing + // things up. Of course this slows stuff down a bit, but only whilst this little utility is in use. + // Eventually it will be replaced with a real GUI and we can delete all this. + synchronized(StdoutANSIProgressRenderer) { + if (tree.isNotEmpty()) { + val ansi = Ansi.ansi() + repeat(prevLinesDrawn) { ansi.eraseLine().cursorUp(1).eraseLine() } + System.out.print(ansi) + System.out.flush() + } + + super.append(event) + + if (tree.isNotEmpty()) + draw(false) + } + } + } + scrollingAppender.start() + manager.configuration.appenders[consoleAppender.name] = scrollingAppender + val loggerConfigs = manager.configuration.loggers.values + for (config in loggerConfigs) { + val appenderRefs = config.appenderRefs + val consoleAppenders = config.appenders.filter { it.value is ConsoleAppender }.keys + consoleAppenders.forEach { config.removeAppender(it) } + appenderRefs.forEach { config.addAppender(manager.configuration.appenders[it.ref], it.level, it.filter) } + } + manager.updateLoggers() + } + } + + override fun printLine(line:String) { + System.out.println(line) + } + + override fun printAnsi(ansi: Ansi) { + // Need to force a flush here in order to ensure stderr/stdout sync up properly. + System.out.print(ansi) + System.out.flush() + } } diff --git a/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt b/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt index 622fc9667c..5f55195c22 100644 --- a/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt +++ b/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt @@ -2,11 +2,17 @@ package net.corda.node import com.fasterxml.jackson.dataformat.yaml.YAMLFactory import net.corda.client.jackson.JacksonSupport +import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.Amount import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.concurrent.openFuture +import net.corda.core.internal.objectOrNewInstance +import net.corda.core.messaging.FlowProgressHandle +import net.corda.core.messaging.FlowProgressHandleImpl import net.corda.core.utilities.ProgressTracker import net.corda.node.services.identity.InMemoryIdentityService import net.corda.node.shell.InteractiveShell @@ -20,6 +26,7 @@ import net.corda.testing.rigorousMock import org.junit.After import org.junit.Before import org.junit.Test +import rx.Observable import java.util.* import kotlin.test.assertEquals @@ -36,8 +43,8 @@ class InteractiveShellTest { @Suppress("UNUSED") class FlowA(val a: String) : FlowLogic() { - constructor(b: Int) : this(b.toString()) - constructor(b: Int, c: String) : this(b.toString() + c) + constructor(b: Int?) : this(b.toString()) + constructor(b: Int?, c: String) : this(b.toString() + c) constructor(amount: Amount) : this(amount.toString()) constructor(pair: Pair, SecureHash.SHA256>) : this(pair.toString()) constructor(party: Party) : this(party.name.toString()) @@ -50,9 +57,16 @@ class InteractiveShellTest { private val om = JacksonSupport.createInMemoryMapper(ids, YAMLFactory()) private fun check(input: String, expected: String) { - var output: DummyFSM? = null - InteractiveShell.runFlowFromString({ DummyFSM(it as FlowA).apply { output = this } }, input, FlowA::class.java, om) - assertEquals(expected, output!!.logic.a, input) + var output: String? = null + InteractiveShell.runFlowFromString( { clazz, args -> + + val instance = clazz.getConstructor(*args.map { it!!::class.java }.toTypedArray()).newInstance(*args) as FlowA + output = instance.a + val future = openFuture() + future.set("ABC") + FlowProgressHandleImpl(StateMachineRunId.createRandom(), future, Observable.just("Some string")) + }, input, FlowA::class.java, om) + assertEquals(expected, output!!, input) } @Test diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt index 269db1b6d0..b00bebad91 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt @@ -85,7 +85,8 @@ private val DRIVER_REQUIRED_PERMISSIONS = setOf( invokeRpc(CordaRPCOps::nodeInfoFromParty), invokeRpc(CordaRPCOps::internalVerifiedTransactionsFeed), invokeRpc("vaultQueryBy"), - invokeRpc("vaultTrackBy") + invokeRpc("vaultTrackBy"), + invokeRpc(CordaRPCOps::registeredFlows) ) /** diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt index 7aed47e4c6..aea7accf55 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -12,6 +12,7 @@ import net.corda.core.identity.PartyAndCertificate import net.corda.core.internal.createDirectories import net.corda.core.internal.createDirectory import net.corda.core.internal.uncheckedCast +import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.RPCOps import net.corda.core.messaging.SingleMessageRecipient @@ -43,6 +44,8 @@ import net.corda.testing.node.MockServices.Companion.makeTestDataSourcePropertie import net.corda.testing.setGlobalSerialization import net.corda.testing.testNodeConfiguration import org.apache.activemq.artemis.utils.ReusableLatch +import org.apache.sshd.common.util.security.SecurityUtils +import org.slf4j.Logger import java.io.Closeable import java.math.BigInteger import java.nio.file.Path @@ -124,6 +127,13 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete /** Helper constructor for creating a [MockNetwork] with custom parameters from Java. */ constructor(parameters: MockNetworkParameters) : this(defaultParameters = parameters) + init { + // Apache SSHD for whatever reason registers a SFTP FileSystemProvider - which gets loaded by JimFS. + // This SFTP support loads BouncyCastle, which we want to avoid. + // Please see https://issues.apache.org/jira/browse/SSHD-736 - it's easier then to create our own fork of SSHD + SecurityUtils.setAPrioriDisabledProvider("BC", true) + } + var nextNodeId = 0 private set private val filesystem = Jimfs.newFileSystem(unix()) @@ -269,6 +279,10 @@ class MockNetwork(defaultParameters: MockNetworkParameters = MockNetworkParamete return E2ETestKeyManagementService(identityService, keyPairs) } + override fun startShell(rpcOps: CordaRPCOps) { + //No mock shell + } + override fun startMessagingService(rpcOps: RPCOps) { // Nothing to do }