CORDA-2569: Add "flow kill <ID>" command to Corda's shell. (#4861)

* CORDA-2569: Add "flow kill <ID>" command to Corda's shell.

* Add testing and documentation for RPC killFlow operation.
This commit is contained in:
Chris Rankin 2019-03-08 16:39:22 +00:00 committed by Tommy Lillehagen
parent b3b184c93e
commit ea263b3e54
18 changed files with 226 additions and 80 deletions

View File

@ -15,16 +15,20 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize
import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier
import com.fasterxml.jackson.databind.deser.ContextualDeserializer
import com.fasterxml.jackson.databind.deser.std.DelegatingDeserializer
import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.databind.node.IntNode
import com.fasterxml.jackson.databind.node.ObjectNode
import com.fasterxml.jackson.databind.ser.BeanPropertyWriter
import com.fasterxml.jackson.databind.ser.BeanSerializerModifier
import com.fasterxml.jackson.databind.ser.std.StdScalarSerializer
import com.fasterxml.jackson.databind.ser.std.UUIDSerializer
import com.google.common.primitives.Booleans
import net.corda.client.jackson.JacksonSupport
import net.corda.core.contracts.*
import net.corda.core.crypto.*
import net.corda.core.crypto.PartialMerkleTree.PartialTree
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.*
import net.corda.core.internal.DigitalSignatureWithCert
import net.corda.core.internal.createComponentGroups
@ -40,7 +44,6 @@ import net.corda.core.utilities.parseAsHex
import net.corda.core.utilities.toHexString
import net.corda.serialization.internal.AllWhitelist
import net.corda.serialization.internal.amqp.*
import net.corda.serialization.internal.model.LocalTypeInformation
import java.math.BigDecimal
import java.security.PublicKey
import java.security.cert.CertPath
@ -79,6 +82,7 @@ class CordaModule : SimpleModule("corda-core") {
context.setMixInAnnotations(SignatureMetadata::class.java, SignatureMetadataMixin::class.java)
context.setMixInAnnotations(PartialTree::class.java, PartialTreeMixin::class.java)
context.setMixInAnnotations(NodeInfo::class.java, NodeInfoMixin::class.java)
context.setMixInAnnotations(StateMachineRunId::class.java, StateMachineRunIdMixin::class.java)
}
}
@ -418,6 +422,28 @@ private interface SecureHashSHA256Mixin
@JsonDeserialize(using = JacksonSupport.PublicKeyDeserializer::class)
private interface PublicKeyMixin
@JsonSerialize(using = StateMachineRunIdSerializer::class)
@JsonDeserialize(using = StateMachineRunIdDeserializer::class)
private interface StateMachineRunIdMixin
private class StateMachineRunIdSerializer : StdScalarSerializer<StateMachineRunId>(StateMachineRunId::class.java) {
private val uuidSerializer = UUIDSerializer()
override fun isEmpty(provider: SerializerProvider?, value: StateMachineRunId): Boolean {
return uuidSerializer.isEmpty(provider, value.uuid)
}
override fun serialize(value: StateMachineRunId, gen: JsonGenerator?, provider: SerializerProvider?) {
uuidSerializer.serialize(value.uuid, gen, provider)
}
}
private class StateMachineRunIdDeserializer : FromStringDeserializer<StateMachineRunId>(StateMachineRunId::class.java) {
override fun _deserialize(value: String, ctxt: DeserializationContext?): StateMachineRunId {
return StateMachineRunId(UUID.fromString(value))
}
}
@Suppress("unused_parameter")
@ToStringSerialize
private abstract class AmountMixin @JsonCreator(mode = DISABLED) constructor(

View File

@ -0,0 +1,29 @@
package net.corda.client.jackson
import com.fasterxml.jackson.databind.ObjectMapper
import net.corda.client.jackson.internal.CordaModule
import net.corda.core.flows.StateMachineRunId
import org.junit.Assert.assertEquals
import org.junit.Test
import java.util.*
class StateMachineRunIdTest {
private companion object {
private const val ID = "a9da3d32-a08d-4add-a633-66bc6bf6183d"
private val jsonMapper: ObjectMapper = ObjectMapper().registerModule(CordaModule())
}
@Test
fun `state machine run ID deserialise`() {
val str = """"$ID""""
val runID = jsonMapper.readValue(str, StateMachineRunId::class.java)
assertEquals(StateMachineRunId(UUID.fromString(ID)), runID)
}
@Test
fun `state machine run ID serialise`() {
val runId = StateMachineRunId(UUID.fromString(ID))
val str = jsonMapper.writeValueAsString(runId)
assertEquals(""""$ID"""", str)
}
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import static java.util.Collections.singletonList;
import static kotlin.test.AssertionsKt.assertEquals;
import static kotlin.test.AssertionsKt.fail;
import static net.corda.finance.workflows.GetBalances.getCashBalance;
@ -51,13 +52,12 @@ public class StandaloneCordaRPCJavaClientTest {
}
}
private List<String> perms = Collections.singletonList("ALL");
private List<String> perms = singletonList("ALL");
private Set<String> permSet = new HashSet<>(perms);
private User rpcUser = new User("user1", "test", permSet);
private User superUser = new User("superUser", "test", permSet);
private AtomicInteger port = new AtomicInteger(15000);
private NodeProcess.Factory factory;
private NodeProcess notary;
private CordaRPCOps rpcProxy;
private CordaRPCConnection connection;
@ -69,16 +69,16 @@ public class StandaloneCordaRPCJavaClientTest {
port.getAndIncrement(),
port.getAndIncrement(),
true,
Collections.singletonList(rpcUser),
singletonList(superUser),
true
);
@Before
public void setUp() {
factory = new NodeProcess.Factory();
NodeProcess.Factory factory = new NodeProcess.Factory();
copyCordapps(factory, notaryConfig);
notary = factory.create(notaryConfig);
connection = notary.connect();
connection = notary.connect(superUser);
rpcProxy = connection.getProxy();
notaryNodeIdentity = rpcProxy.nodeInfo().getLegalIdentities().get(0);
}
@ -95,7 +95,7 @@ public class StandaloneCordaRPCJavaClientTest {
}
@Test
public void testCashBalances() throws NoSuchFieldException, ExecutionException, InterruptedException {
public void testCashBalances() throws ExecutionException, InterruptedException {
Amount<Currency> dollars123 = new Amount<>(123, Currency.getInstance("USD"));
FlowHandle<AbstractCashFlow.Result> flowHandle = rpcProxy.startFlowDynamic(CashIssueFlow.class,
@ -105,7 +105,7 @@ public class StandaloneCordaRPCJavaClientTest {
flowHandle.getReturnValue().get();
Amount<Currency> balance = getCashBalance(rpcProxy, Currency.getInstance("USD"));
System.out.print("Balance: " + balance + "\n");
System.out.println("Balance: " + balance);
assertEquals(dollars123, balance, "matching");
}

View File

@ -3,6 +3,7 @@ package net.corda.kotlin.rpc
import com.google.common.hash.Hashing
import com.google.common.hash.HashingInputStream
import net.corda.client.rpc.CordaRPCConnection
import net.corda.client.rpc.PermissionException
import net.corda.core.crypto.SecureHash
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
@ -29,10 +30,8 @@ import net.corda.nodeapi.internal.config.User
import net.corda.smoketesting.NodeConfig
import net.corda.smoketesting.NodeProcess
import org.apache.commons.io.output.NullOutputStream
import org.junit.After
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import org.junit.*
import org.junit.rules.ExpectedException
import java.io.FilterInputStream
import java.io.InputStream
import java.util.*
@ -46,7 +45,10 @@ import kotlin.test.assertTrue
class StandaloneCordaRPClientTest {
private companion object {
private val log = contextLogger()
val user = User("user1", "test", permissions = setOf("ALL"))
val superUser = User("superUser", "test", permissions = setOf("ALL"))
val nonUser = User("nonUser", "test", permissions = emptySet())
val rpcUser = User("rpcUser", "test", permissions = setOf("InvokeRpc.startFlow", "InvokeRpc.killFlow"))
val flowUser = User("flowUser", "test", permissions = setOf("StartFlow.net.corda.finance.flows.CashIssueFlow"))
val port = AtomicInteger(15200)
const val attachmentSize = 2116
val timeout = 60.seconds
@ -65,15 +67,18 @@ class StandaloneCordaRPClientTest {
rpcPort = port.andIncrement,
rpcAdminPort = port.andIncrement,
isNotary = true,
users = listOf(user)
users = listOf(superUser, nonUser, rpcUser, flowUser)
)
@get:Rule
val exception: ExpectedException = ExpectedException.none()
@Before
fun setUp() {
factory = NodeProcess.Factory()
StandaloneCordaRPCJavaClientTest.copyCordapps(factory, notaryConfig)
notary = factory.create(notaryConfig)
connection = notary.connect()
connection = notary.connect(superUser)
rpcProxy = connection.proxy
notaryNode = fetchNotaryIdentity()
notaryNodeIdentity = rpcProxy.nodeInfo().legalIdentitiesAndCerts.first().party
@ -81,9 +86,7 @@ class StandaloneCordaRPClientTest {
@After
fun done() {
try {
connection.close()
} finally {
connection.use {
notary.close()
}
}
@ -232,6 +235,27 @@ class StandaloneCordaRPClientTest {
assertEquals(629.DOLLARS, balance)
}
@Test
fun `test kill flow without killFlow permission`() {
exception.expect(PermissionException::class.java)
exception.expectMessage("User not authorized to perform RPC call killFlow")
val flowHandle = rpcProxy.startFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), notaryNodeIdentity)
notary.connect(nonUser).use { connection ->
val rpcProxy = connection.proxy
rpcProxy.killFlow(flowHandle.id)
}
}
@Test
fun `test kill flow with killFlow permission`() {
val flowHandle = rpcProxy.startFlow(::CashIssueFlow, 83.DOLLARS, OpaqueBytes.of(0), notaryNodeIdentity)
notary.connect(rpcUser).use { connection ->
val rpcProxy = connection.proxy
assertTrue(rpcProxy.killFlow(flowHandle.id))
}
}
private fun fetchNotaryIdentity(): NodeInfo {
val nodeInfo = rpcProxy.networkMapSnapshot()
assertEquals(1, nodeInfo.size)

View File

@ -21,7 +21,7 @@ import kotlin.streams.toList
class NodeVersioningTest {
private companion object {
val user = User("user1", "test", permissions = setOf("ALL"))
val superUser = User("superUser", "test", permissions = setOf("ALL"))
val port = AtomicInteger(15100)
}
@ -33,7 +33,7 @@ class NodeVersioningTest {
rpcPort = port.andIncrement,
rpcAdminPort = port.andIncrement,
isNotary = true,
users = listOf(user)
users = listOf(superUser)
)
private val aliceConfig = NodeConfig(
@ -42,7 +42,7 @@ class NodeVersioningTest {
rpcPort = port.andIncrement,
rpcAdminPort = port.andIncrement,
isNotary = false,
users = listOf(user)
users = listOf(superUser)
)
private lateinit var notary: NodeProcess
@ -73,7 +73,7 @@ class NodeVersioningTest {
selfCordapp.copyToDirectory(cordappsDir)
factory.create(aliceConfig).use { alice ->
alice.connect().use {
alice.connect(superUser).use {
val rpc = it.proxy
assertThat(rpc.protocolVersion).isEqualTo(PLATFORM_VERSION)
assertThat(rpc.nodeInfo().platformVersion).isEqualTo(PLATFORM_VERSION)

View File

@ -38,7 +38,7 @@ import kotlin.streams.toList
class CordappSmokeTest {
private companion object {
val user = User("user1", "test", permissions = setOf("ALL"))
val superUser = User("superUser", "test", permissions = setOf("ALL"))
val port = AtomicInteger(15100)
}
@ -50,7 +50,7 @@ class CordappSmokeTest {
rpcPort = port.andIncrement,
rpcAdminPort = port.andIncrement,
isNotary = true,
users = listOf(user)
users = listOf(superUser)
)
private val aliceConfig = NodeConfig(
@ -59,7 +59,7 @@ class CordappSmokeTest {
rpcPort = port.andIncrement,
rpcAdminPort = port.andIncrement,
isNotary = false,
users = listOf(user)
users = listOf(superUser)
)
private lateinit var notary: NodeProcess
@ -92,7 +92,7 @@ class CordappSmokeTest {
createDummyNodeInfo(additionalNodeInfoDir)
factory.create(aliceConfig).use { alice ->
alice.connect().use { connectionToAlice ->
alice.connect(superUser).use { connectionToAlice ->
val aliceIdentity = connectionToAlice.proxy.nodeInfo().legalIdentitiesAndCerts.first().party
val future = connectionToAlice.proxy.startFlow(::GatherContextsFlow, aliceIdentity).returnValue
val (sessionInitContext, sessionConfirmContext) = future.getOrThrow()

View File

@ -26,9 +26,11 @@ Permissions
When accessing the shell (embedded, standalone, via SSH) RPC permissions are required. This is because the shell actually communicates
with the node using RPC calls.
* Watching flows (``flow watch``) requires ``InvokeRpc.stateMachinesFeed``
* Watching flows (``flow watch``) requires ``InvokeRpc.stateMachinesFeed``.
* Starting flows requires ``InvokeRpc.startTrackedFlowDynamic``, ``InvokeRpc.registeredFlows`` and ``InvokeRpc.wellKnownPartyFromX500Name``, as well as a
permission for the flow being started
permission for the flow being started.
* Killing flows (``flow kill``) requires ``InvokeRpc.killFlow``. This currently
allows the user to kill *any* flow, so please be careful when granting it!
The shell via the local terminal
--------------------------------
@ -231,6 +233,7 @@ The shell also has special commands for working with flows:
names and types of the arguments will be used to try and automatically determine which one to use. If the match
against available constructors is unclear, the reasons each available constructor failed to match will be printed
out. In the case of an ambiguous match, the first applicable constructor will be used
* ``flow kill`` kills a single flow, as identified by its UUID.
Parameter syntax
****************

View File

@ -39,14 +39,14 @@ internal class AuthenticatedRpcOpsProxy(private val delegate: CordaRPCOps) : Cor
}
private class PermissionsEnforcingInvocationHandler(override val delegate: CordaRPCOps, private val context: () -> RpcAuthContext) : InvocationHandlerTemplate {
override fun invoke(proxy: Any, method: Method, arguments: Array<out Any?>?) = guard(method.name, context, { super.invoke(proxy, method, arguments) })
override fun invoke(proxy: Any, method: Method, arguments: Array<out Any?>?) = guard(method.name, context) { super.invoke(proxy, method, arguments) }
}
}
private fun <RESULT> guard(methodName: String, context: () -> RpcAuthContext, action: () -> RESULT) = guard(methodName, emptyList(), context, action)
private fun <RESULT> guard(methodName: String, args: List<Class<*>>, context: () -> RpcAuthContext, action: () -> RESULT): RESULT {
if (!context().isPermitted(methodName, *(args.map { it.name }.toTypedArray()))) {
if (!context().isPermitted(methodName, *(args.map(Class<*>::getName).toTypedArray()))) {
throw PermissionException("User not authorized to perform RPC call $methodName with target $args")
} else {
return action()

View File

@ -26,7 +26,6 @@ import org.apache.shiro.realm.AuthorizingRealm
import org.apache.shiro.realm.jdbc.JdbcRealm
import org.apache.shiro.subject.PrincipalCollection
import org.apache.shiro.subject.SimplePrincipalCollection
import java.io.Closeable
import java.util.concurrent.ConcurrentHashMap
import javax.security.auth.login.FailedLoginException
@ -66,7 +65,7 @@ class RPCSecurityManagerImpl(config: AuthServiceConfig, cacheFactory: NamedCache
manager = manager)
override fun close() {
manager.realms?.filterIsInstance<Closeable>()?.forEach { it.close() }
manager.realms?.filterIsInstance<AutoCloseable>()?.forEach(AutoCloseable::close)
manager.destroy()
}
@ -197,31 +196,31 @@ private fun buildCredentialMatcher(type: PasswordEncryption) = when (type) {
}
class InMemoryRealm(users: List<User>,
realmId: String,
passwordEncryption: PasswordEncryption = PasswordEncryption.NONE) : AuthorizingRealm() {
realmId: String,
passwordEncryption: PasswordEncryption = PasswordEncryption.NONE) : AuthorizingRealm() {
private val authorizationInfoByUser: Map<String, AuthorizationInfo>
private val authenticationInfoByUser: Map<String, AuthenticationInfo>
init {
permissionResolver = RPCPermissionResolver
users.forEach {
require(it.username.matches("\\w+".toRegex())) {
"Username ${it.username} contains invalid characters"
users.forEach { user ->
require(user.username.matches("\\w+".toRegex())) {
"Username ${user.username} contains invalid characters"
}
}
val resolvePermission = { s: String -> permissionResolver.resolvePermission(s) }
authorizationInfoByUser = users.associate {
it.username to SimpleAuthorizationInfo().apply {
objectPermissions = it.permissions.map { resolvePermission(it) }.toSet()
val resolvePermission = permissionResolver::resolvePermission
authorizationInfoByUser = users.associate { user ->
user.username to SimpleAuthorizationInfo().apply {
objectPermissions = user.permissions.map(resolvePermission).toSet()
roles = emptySet<String>()
stringPermissions = emptySet<String>()
}
}
authenticationInfoByUser = users.associate {
it.username to SimpleAuthenticationInfo().apply {
credentials = it.password
principals = SimplePrincipalCollection(it.username, realmId)
authenticationInfoByUser = users.associate { user ->
user.username to SimpleAuthenticationInfo().apply {
credentials = user.password
principals = SimplePrincipalCollection(user.username, realmId)
}
}
credentialsMatcher = buildCredentialMatcher(passwordEncryption)
@ -236,7 +235,7 @@ class InMemoryRealm(users: List<User>,
authorizationInfoByUser[principals.primaryPrincipal as String]
}
private class NodeJdbcRealm(config: SecurityConfiguration.AuthService.DataSource) : JdbcRealm(), Closeable {
private class NodeJdbcRealm(config: SecurityConfiguration.AuthService.DataSource) : JdbcRealm(), AutoCloseable {
init {
credentialsMatcher = buildCredentialMatcher(config.passwordEncryption)
@ -246,7 +245,7 @@ private class NodeJdbcRealm(config: SecurityConfiguration.AuthService.DataSource
}
override fun close() {
(dataSource as? Closeable)?.close()
(dataSource as? AutoCloseable)?.close()
}
}

View File

@ -7,6 +7,7 @@ import net.corda.node.internal.security.Password
import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.internal.security.tryAuthenticate
import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.node.services.Permissions.Companion.startFlow
import net.corda.node.services.config.SecurityConfiguration
import net.corda.nodeapi.internal.config.User
import net.corda.testing.internal.TestingNamedCacheFactory
@ -31,7 +32,7 @@ class RPCSecurityManagerTest {
@Test
fun `Generic RPC call authorization`() {
checkUserActions(
permitted = setOf(arrayListOf("nodeInfo"), arrayListOf("notaryIdentities")),
permitted = setOf(listOf("nodeInfo"), listOf("notaryIdentities")),
permissions = setOf(
invokeRpc(CordaRPCOps::nodeInfo),
invokeRpc(CordaRPCOps::notaryIdentities)))
@ -40,41 +41,49 @@ class RPCSecurityManagerTest {
@Test
fun `Flow invocation authorization`() {
checkUserActions(
permissions = setOf(Permissions.startFlow<DummyFlow>()),
permissions = setOf(startFlow<DummyFlow>()),
permitted = setOf(
arrayListOf("startTrackedFlowDynamic", "net.corda.node.services.RPCSecurityManagerTest\$DummyFlow"),
arrayListOf("startFlowDynamic", "net.corda.node.services.RPCSecurityManagerTest\$DummyFlow")))
listOf("startTrackedFlowDynamic", DummyFlow::class.java.name),
listOf("startFlowDynamic", DummyFlow::class.java.name)))
}
@Test
fun `Check startFlow RPC permission implies startFlowDynamic`() {
checkUserActions(
permissions = setOf(invokeRpc("startFlow")),
permitted = setOf(arrayListOf("startFlow"), arrayListOf("startFlowDynamic")))
permitted = setOf(listOf("startFlow"), listOf("startFlowDynamic")))
}
@Test
fun `Check startTrackedFlow RPC permission implies startTrackedFlowDynamic`() {
checkUserActions(
permitted = setOf(arrayListOf("startTrackedFlow"), arrayListOf("startTrackedFlowDynamic")),
permitted = setOf(listOf("startTrackedFlow"), listOf("startTrackedFlowDynamic")),
permissions = setOf(invokeRpc("startTrackedFlow")))
}
@Test
fun `check killFlow RPC permission accepted`() {
checkUserActions(
permitted = setOf(listOf("killFlow")),
permissions = setOf(invokeRpc(CordaRPCOps::killFlow))
)
}
@Test
fun `Admin authorization`() {
checkUserActions(
permissions = setOf("all"),
permitted = allActions.map { arrayListOf(it) }.toSet())
permitted = allActions.map { listOf(it) }.toSet())
}
@Test
fun `flows draining mode permissions`() {
checkUserActions(
permitted = setOf(arrayListOf("setFlowsDrainingModeEnabled")),
permitted = setOf(listOf("setFlowsDrainingModeEnabled")),
permissions = setOf(invokeRpc(CordaRPCOps::setFlowsDrainingModeEnabled))
)
checkUserActions(
permitted = setOf(arrayListOf("isFlowsDrainingModeEnabled")),
permitted = setOf(listOf("isFlowsDrainingModeEnabled")),
permissions = setOf(invokeRpc(CordaRPCOps::isFlowsDrainingModeEnabled))
)
}
@ -134,7 +143,7 @@ class RPCSecurityManagerTest {
users = listOf(User(username, "password", setOf())), id = AuthServiceId("TEST"))
}
private fun checkUserActions(permissions: Set<String>, permitted: Set<ArrayList<String>>) {
private fun checkUserActions(permissions: Set<String>, permitted: Set<List<String>>) {
val user = User(username = "user", password = "password", permissions = permissions)
val userRealms = RPCSecurityManagerImpl(SecurityConfiguration.AuthService.fromUsers(listOf(user)), TestingNamedCacheFactory())
val disabled = allActions.filter { !permitted.contains(listOf(it)) }

View File

@ -314,12 +314,10 @@ open class MockServices private constructor(
constructor(initialIdentityName: CordaX500Name, identityService: IdentityService = makeTestIdentityService())
: this(listOf(getCallerPackage(MockServices::class)!!), TestIdentity(initialIdentityName), identityService)
@JvmOverloads
constructor(cordappPackages: List<String>, initialIdentityName: CordaX500Name, identityService: IdentityService, networkParameters: NetworkParameters)
: this(cordappPackages, TestIdentity(initialIdentityName), identityService, networkParameters)
@JvmOverloads
constructor(cordappPackages: List<String>, initialIdentityName: CordaX500Name, identityService: IdentityService, networkParameters: NetworkParameters, key: KeyPair)
: this(cordappPackages, TestIdentity(initialIdentityName, key), identityService, networkParameters)

View File

@ -695,7 +695,8 @@ class DriverDSLImpl(
Permissions.invokeRpc(CordaRPCOps::internalFindVerifiedTransaction),
Permissions.invokeRpc("vaultQueryBy"),
Permissions.invokeRpc("vaultTrackBy"),
Permissions.invokeRpc(CordaRPCOps::registeredFlows)
Permissions.invokeRpc(CordaRPCOps::registeredFlows),
Permissions.invokeRpc(CordaRPCOps::killFlow)
)
private fun <A> oneOf(array: Array<A>) = array[Random().nextInt(array.size)]

View File

@ -9,6 +9,7 @@ import net.corda.core.node.NotaryInfo
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.testing.common.internal.asContextEnv
import net.corda.testing.common.internal.checkNotOnClasspath
@ -33,8 +34,7 @@ class NodeProcess(
private val log = contextLogger()
}
fun connect(): CordaRPCConnection {
val user = config.users[0]
fun connect(user: User): CordaRPCConnection {
return client.start(user.username, user.password)
}

View File

@ -22,11 +22,6 @@ sourceSets {
srcDir file('src/integration-test/resources')
}
}
test {
resources {
srcDir file('src/test/resources')
}
}
}
dependencies {

View File

@ -15,12 +15,12 @@ import org.slf4j.LoggerFactory;
import java.util.*;
import static java.util.stream.Collectors.joining;
import static net.corda.tools.shell.InteractiveShell.killFlowById;
import static net.corda.tools.shell.InteractiveShell.runFlowByNameFragment;
import static net.corda.tools.shell.InteractiveShell.runStateMachinesView;
@Man(
"Allows you to start flows, list the ones available and to watch flows currently running on the node.\n\n" +
"Allows you to start and kill flows, list the ones available and to watch flows currently running on the node.\n\n" +
"Starting flow is the primary way in which you command the node to change the ledger.\n\n" +
"This command is generic, so the right way to use it depends on the flow you wish to start. You can use the 'flow start'\n" +
"command with either a full class name, or a substring of the class name that's unambiguous. The parameters to the \n" +
@ -28,7 +28,7 @@ import static net.corda.tools.shell.InteractiveShell.runStateMachinesView;
)
public class FlowShellCommand extends InteractiveShellCommand {
private static Logger logger = LoggerFactory.getLogger(FlowShellCommand.class);
private static final Logger logger = LoggerFactory.getLogger(FlowShellCommand.class);
@Command
@Usage("Start a (work)flow on the node. This is how you can change the ledger.")
@ -36,13 +36,13 @@ public class FlowShellCommand extends InteractiveShellCommand {
@Usage("The class name of the flow to run, or an unambiguous substring") @Argument String name,
@Usage("The data to pass as input") @Argument(unquote = false) List<String> input
) {
logger.info("Executing command \"flow start {} {}\",", name, (input != null) ? input.stream().collect(joining(" ")) : "<no arguments>");
logger.info("Executing command \"flow start {} {}\",", name, (input != null) ? String.join(" ", input) : "<no arguments>");
startFlow(name, input, out, ops(), ansiProgressRenderer(), objectMapper(null));
}
// TODO Limit number of flows shown option?
@Command
@Usage("watch information about state machines running on the node with result information")
@Usage("Watch information about state machines running on the node with result information.")
public void watch(InvocationContext<TableElement> context) throws Exception {
logger.info("Executing command \"flow watch\".");
runStateMachinesView(out, ops());
@ -63,11 +63,20 @@ public class FlowShellCommand extends InteractiveShellCommand {
}
@Command
@Usage("list flows that user can start")
@Usage("List flows that user can start.")
public void list(InvocationContext<String> context) throws Exception {
logger.info("Executing command \"flow list\".");
for (String name : ops().registeredFlows()) {
context.provide(name + System.lineSeparator());
}
}
@Command
@Usage("Kill a flow that is running on this node.")
public void kill(
@Usage("The UUID for the flow that we wish to kill") @Argument String id
) {
logger.info("Executing command \"flow kill {}\".", id);
killFlowById(id, out, ops(), objectMapper(null));
}
}

View File

@ -1,6 +1,5 @@
package net.corda.tools.shell;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import net.corda.client.jackson.StringToMethodCallParser;
@ -20,14 +19,13 @@ import java.util.Map;
import java.util.Set;
import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.joining;
// Note that this class cannot be converted to Kotlin because CRaSH does not understand InvocationContext<Map<?, ?>> which
// is the closest you can get in Kotlin to raw types.
public class RunShellCommand extends InteractiveShellCommand {
private static Logger logger = LoggerFactory.getLogger(RunShellCommand.class);
private static final Logger logger = LoggerFactory.getLogger(RunShellCommand.class);
@Command
@Man(
@ -39,7 +37,7 @@ public class RunShellCommand extends InteractiveShellCommand {
@Usage("runs a method from the CordaRPCOps interface on the node.")
public Object main(InvocationContext<Map> context,
@Usage("The command to run") @Argument(unquote = false) List<String> command) {
logger.info("Executing command \"run {}\",", (command != null) ? command.stream().collect(joining(" ")) : "<no arguments>");
logger.info("Executing command \"run {}\",", (command != null) ? String.join(" ", command) : "<no arguments>");
StringToMethodCallParser<CordaRPCOps> parser = new StringToMethodCallParser<>(CordaRPCOps.class, objectMapper(InteractiveShell.getCordappsClassloader()));
if (command == null) {

View File

@ -1,6 +1,7 @@
package net.corda.tools.shell
import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.databind.JsonMappingException
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.databind.module.SimpleModule
@ -16,6 +17,7 @@ import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture
@ -348,6 +350,30 @@ object InteractiveShell {
return innerLoop(type)
}
@JvmStatic
fun killFlowById(id: String,
output: RenderPrintWriter,
rpcOps: CordaRPCOps,
inputObjectMapper: ObjectMapper = createYamlInputMapper(rpcOps)) {
try {
val runId = try {
inputObjectMapper.readValue(id, StateMachineRunId::class.java)
} catch (e: JsonMappingException) {
output.println("Cannot parse flow ID of '$id' - expecting a UUID.", Color.red)
log.error("Failed to parse flow ID", e)
return
}
if (rpcOps.killFlow(runId)) {
output.println("Killed flow $runId", Color.yellow)
} else {
output.println("Failed to kill flow $runId", Color.red)
}
} finally {
output.flush()
}
}
// TODO: This utility is generally useful and might be better moved to the node class, or an RPC, if we can commit to making it stable API.
/**
* Given a [FlowLogic] class and a string in one-line Yaml form, finds an applicable constructor and starts

View File

@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.type.TypeFactory
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import com.nhaarman.mockito_kotlin.eq
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.verify
import com.nhaarman.mockito_kotlin.whenever
@ -30,6 +30,7 @@ import net.corda.testing.core.TestIdentity
import net.corda.testing.core.getTestPartyAndCertificate
import net.corda.testing.internal.DEV_ROOT_CA
import org.crsh.command.InvocationContext
import org.crsh.text.Color
import org.crsh.text.RenderPrintWriter
import org.junit.Before
import org.junit.Test
@ -128,7 +129,7 @@ class InteractiveShellTest {
}
private fun objectMapperWithClassLoader(classLoader: ClassLoader?): ObjectMapper {
val objectMapper = ObjectMapper()
val objectMapper = JacksonSupport.createNonRpcMapper()
val tf = TypeFactory.defaultInstance().withClassLoader(classLoader)
objectMapper.typeFactory = tf
@ -231,6 +232,34 @@ class InteractiveShellTest {
verify(printWriter).println(NETWORK_MAP_JSON_PAYLOAD.replace("\n", System.lineSeparator()))
}
@Test
fun killFlowWithNonsenseID() {
InteractiveShell.killFlowById("nonsense", printWriter, cordaRpcOps, om)
verify(printWriter).println("Cannot parse flow ID of 'nonsense' - expecting a UUID.", Color.red)
verify(printWriter).flush()
}
@Test
fun killFlowFailure() {
val runId = StateMachineRunId.createRandom()
whenever(cordaRpcOps.killFlow(any())).thenReturn(false)
InteractiveShell.killFlowById(runId.uuid.toString(), printWriter, cordaRpcOps, om)
verify(cordaRpcOps).killFlow(runId)
verify(printWriter).println("Failed to kill flow $runId", Color.red)
verify(printWriter).flush()
}
@Test
fun killFlowSuccess() {
val runId = StateMachineRunId.createRandom()
whenever(cordaRpcOps.killFlow(any())).thenReturn(true)
InteractiveShell.killFlowById(runId.uuid.toString(), printWriter, cordaRpcOps, om)
verify(cordaRpcOps).killFlow(runId)
verify(printWriter).println("Killed flow $runId", Color.yellow)
verify(printWriter).flush()
}
}
@ToStringSerialize