[ENT-850]: RPC function to kill a flow (#215)

* Tentative API and implementation.

* Tests completed. API update needed.

* Updated api-current.txt. Some previous changes hadn't been reflected and now they are.

* Improved the tests.

* Some code review changes.

* Merge branch 'master' into features/ENT-850

# Conflicts:
#	.ci/api-current.txt

* Code review changes.

* Code review changes.
This commit is contained in:
Michele Sollecito 2017-12-20 19:16:54 +00:00 committed by GitHub
parent d9574338bc
commit 0711ad34e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 153 additions and 12 deletions

View File

@ -14,6 +14,12 @@
public void setMessage(String)
public void setOriginalExceptionClassName(String)
##
public final class net.corda.core.CordaOID extends java.lang.Object
@org.jetbrains.annotations.NotNull public static final String CORDA_PLATFORM = "1.3.6.1.4.1.50530.1"
public static final net.corda.core.CordaOID INSTANCE
@org.jetbrains.annotations.NotNull public static final String R3_ROOT = "1.3.6.1.4.1.50530"
@org.jetbrains.annotations.NotNull public static final String X509_EXTENSION_CORDA_ROLE = "1.3.6.1.4.1.50530.1.1"
##
@net.corda.core.serialization.CordaSerializable public class net.corda.core.CordaRuntimeException extends java.lang.RuntimeException implements net.corda.core.CordaThrowable
public <init>(String)
public <init>(String, String, Throwable)
@ -617,6 +623,7 @@ public static final class net.corda.core.contracts.UniqueIdentifier$Companion ex
@org.jetbrains.annotations.NotNull public abstract String getName()
@org.jetbrains.annotations.NotNull public abstract List getRpcFlows()
@org.jetbrains.annotations.NotNull public abstract List getSchedulableFlows()
@org.jetbrains.annotations.NotNull public abstract List getSerializationCustomSerializers()
@org.jetbrains.annotations.NotNull public abstract List getSerializationWhitelists()
@org.jetbrains.annotations.NotNull public abstract List getServiceFlows()
@org.jetbrains.annotations.NotNull public abstract List getServices()
@ -1542,6 +1549,7 @@ public final class net.corda.core.identity.IdentityUtils extends java.lang.Objec
@org.jetbrains.annotations.NotNull public abstract Iterable getVaultTransactionNotes(net.corda.core.crypto.SecureHash)
@kotlin.Deprecated @net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.DataFeed internalVerifiedTransactionsFeed()
@kotlin.Deprecated @org.jetbrains.annotations.NotNull public abstract List internalVerifiedTransactionsSnapshot()
public abstract boolean killFlow(net.corda.core.flows.StateMachineRunId)
@net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.DataFeed networkMapFeed()
@org.jetbrains.annotations.NotNull public abstract List networkMapSnapshot()
@org.jetbrains.annotations.NotNull public abstract net.corda.core.node.NodeInfo nodeInfo()
@ -1902,6 +1910,7 @@ public final class net.corda.core.node.services.TimeWindowChecker extends java.l
@net.corda.core.DoNotImplement public interface net.corda.core.node.services.TransactionStorage extends net.corda.core.node.StateLoader
@org.jetbrains.annotations.Nullable public abstract net.corda.core.transactions.SignedTransaction getTransaction(net.corda.core.crypto.SecureHash)
@org.jetbrains.annotations.NotNull public abstract rx.Observable getUpdates()
@org.jetbrains.annotations.NotNull public abstract net.corda.core.contracts.TransactionState loadState(net.corda.core.contracts.StateRef)
@org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.DataFeed track()
@org.jetbrains.annotations.NotNull public abstract net.corda.core.concurrent.CordaFuture trackTransaction(net.corda.core.crypto.SecureHash)
##
@ -2636,6 +2645,7 @@ public final class net.corda.core.schemas.CommonSchema extends java.lang.Object
public static final net.corda.core.schemas.CommonSchema INSTANCE
##
public final class net.corda.core.schemas.CommonSchemaV1 extends net.corda.core.schemas.MappedSchema
@org.jetbrains.annotations.NotNull protected String getMigrationResource()
public static final net.corda.core.schemas.CommonSchemaV1 INSTANCE
##
@javax.persistence.MappedSuperclass @net.corda.core.serialization.CordaSerializable public static class net.corda.core.schemas.CommonSchemaV1$FungibleState extends net.corda.core.schemas.PersistentState
@ -2666,6 +2676,7 @@ public final class net.corda.core.schemas.CommonSchemaV1 extends net.corda.core.
public class net.corda.core.schemas.MappedSchema extends java.lang.Object
public <init>(Class, int, Iterable)
@org.jetbrains.annotations.NotNull public final Iterable getMappedTypes()
@org.jetbrains.annotations.Nullable protected String getMigrationResource()
@org.jetbrains.annotations.NotNull public final String getName()
public final int getVersion()
@org.jetbrains.annotations.NotNull public String toString()
@ -2691,6 +2702,9 @@ public class net.corda.core.schemas.MappedSchema extends java.lang.Object
public final void setTxId(String)
public String toString()
##
public final class net.corda.core.schemas.PersistentTypesKt extends java.lang.Object
@org.jetbrains.annotations.NotNull public static final String getMigrationResource(net.corda.core.schemas.MappedSchema)
##
@net.corda.core.serialization.CordaSerializable public interface net.corda.core.schemas.QueryableState extends net.corda.core.contracts.ContractState
@org.jetbrains.annotations.NotNull public abstract net.corda.core.schemas.PersistentState generateMappedObject(net.corda.core.schemas.MappedSchema)
@org.jetbrains.annotations.NotNull public abstract Iterable supportedSchemas()

View File

@ -1,9 +1,6 @@
package net.corda.client.rpc
import net.corda.core.flows.FlowLogic
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.RPCOps
import net.corda.node.services.Permissions
import net.corda.node.services.messaging.rpcContext
import net.corda.nodeapi.internal.config.User
import net.corda.testing.node.internal.RPCDriverDSL
@ -122,6 +119,17 @@ class RPCPermissionsTests : AbstractRPCTest() {
}
}
@Test
fun `killing flows requires permission`() {
rpcDriver {
val proxy = testProxyFor(userOf("joe", emptySet()))
assertNotAllowed {
proxy.validatePermission("killFlow")
}
}
}
private fun assertNotAllowed(action: () -> Unit) {
assertFailsWith(PermissionException::class, "User should not be allowed to perform this action.", action)

View File

@ -223,6 +223,13 @@ interface CordaRPCOps : RPCOps {
@RPCReturnsObservables
fun <T> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowProgressHandle<T>
/**
* Attempts to kill a flow. This is not a clean termination and should be reserved for exceptional cases such as stuck fibers.
*
* @return whether the flow existed and was killed.
*/
fun killFlow(id: StateMachineRunId): Boolean
/** Returns Node's NodeInfo, assuming this will not change while the node is running. */
fun nodeInfo(): NodeInfo

View File

@ -15,6 +15,10 @@ UNRELEASED
Doorman, and ``CONFIDENTIAL_IDENTITY`` certificates must be issued from a ``WELL_KNOWN_LEGAL_IDENTITY`` certificate.
For a detailed specification of the extension please see :doc:`permissioning-certificate-specification`.
* ``CordaRPCOps`` now exposes a function ``killFlow(id: StateMachineRunId): Boolean`` that attempts to terminate a flow.
This is not a clean termination and should be reserved for exceptional cases such as fibers that get stuck.
As usual, invoking this new function requires a user to be entitled with its specific permission.
* The network map service concept has been re-designed. More information can be found in :doc:`network-map`.
* The previous design was never intended to be final but was rather a quick implementation in the earliest days of the

View File

@ -9,6 +9,7 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
@ -91,6 +92,8 @@ internal class CordaRPCOpsImpl(
return snapshot
}
override fun killFlow(id: StateMachineRunId) = smm.killFlow(id)
override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
return database.transaction {
val (allStateMachines, changes) = smm.track()

View File

@ -4,6 +4,7 @@ import net.corda.client.rpc.PermissionException
import net.corda.core.contracts.ContractState
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
@ -65,6 +66,10 @@ class RpcAuthorisationProxy(private val implementation: CordaRPCOps, private val
implementation.startTrackedFlowDynamic(logicType, *args)
}
override fun killFlow(id: StateMachineRunId): Boolean = guard("killFlow") {
return implementation.killFlow(id)
}
override fun nodeInfo(): NodeInfo = guard("nodeInfo", implementation::nodeInfo)
override fun notaryIdentities(): List<Party> = guard("notaryIdentities", implementation::notaryIdentities)

View File

@ -15,10 +15,10 @@ interface CheckpointStorage {
fun addCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes<Checkpoint>)
/**
* Remove existing checkpoint from the store. It is an error to attempt to remove a checkpoint which doesn't exist
* in the store. Doing so will throw an [IllegalArgumentException].
* Remove existing checkpoint from the store.
* @return whether the id matched a checkpoint that was removed.
*/
fun removeCheckpoint(id: StateMachineRunId)
fun removeCheckpoint(id: StateMachineRunId): Boolean
/**
* Stream all checkpoints from the store. If this is backed by a database the stream will be valid until the

View File

@ -42,13 +42,13 @@ class DBCheckpointStorage : CheckpointStorage {
})
}
override fun removeCheckpoint(id: StateMachineRunId) {
override fun removeCheckpoint(id: StateMachineRunId): Boolean {
val session = DatabaseTransactionManager.current().session
val criteriaBuilder = session.criteriaBuilder
val delete = criteriaBuilder.createCriteriaDelete(DBCheckpoint::class.java)
val root = delete.from(DBCheckpoint::class.java)
delete.where(criteriaBuilder.equal(root.get<String>(DBCheckpoint::checkpointId.name), id.uuid.toString()))
session.createQuery(delete).executeUpdate()
return session.createQuery(delete).executeUpdate() > 0
}
override fun getAllCheckpoints(): Stream<Pair<StateMachineRunId, SerializedBytes<Checkpoint>>> {

View File

@ -78,6 +78,13 @@ interface StateMachineManager {
* Returns all currently live flows.
*/
val allStateMachines: List<FlowLogic<*>>
/**
* Attempts to kill a flow. This is not a clean termination and should be reserved for exceptional cases such as stuck fibers.
*
* @return whether the flow existed and was killed.
*/
fun killFlow(id: StateMachineRunId): Boolean
}
// These must be idempotent! A later failure in the state transition may error the flow state, and a replay may call

View File

@ -187,6 +187,30 @@ class StateMachineManagerImpl(
)
}
override fun killFlow(id: StateMachineRunId): Boolean {
return mutex.locked {
val flow = flows.remove(id)
if (flow != null) {
logger.debug("Killing flow known to physical node.")
decrementLiveFibers()
unfinishedFibers.countDown()
try {
flow.fiber.interrupt()
true
} finally {
database.transaction {
checkpointStorage.removeCheckpoint(id)
}
}
} else {
// TODO replace with a clustered delete after we'll support clustered nodes
logger.debug("Unable to kill a flow unknown to physical node. Might be processed by another physical node.")
false
}
}
}
override fun addSessionBinding(flowId: StateMachineRunId, sessionId: SessionId) {
val previousFlowId = sessionToFlow.put(sessionId, flowId)
if (previousFlowId != null) {

View File

@ -1,5 +1,6 @@
package net.corda.node
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.PermissionException
import net.corda.core.context.AuthServiceId
@ -9,9 +10,7 @@ import net.corda.core.contracts.ContractState
import net.corda.core.contracts.Issued
import net.corda.core.crypto.isFulfilledBy
import net.corda.core.crypto.keys
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.StateMachineRunId
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.messaging.*
import net.corda.core.node.services.Vault
@ -19,6 +18,7 @@ import net.corda.core.node.services.queryBy
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.finance.DOLLARS
import net.corda.finance.GBP
import net.corda.finance.USD
@ -33,12 +33,16 @@ import net.corda.node.services.Permissions.Companion.startFlow
import net.corda.node.services.messaging.CURRENT_RPC_CONTEXT
import net.corda.node.services.messaging.RpcAuthContext
import net.corda.nodeapi.internal.config.User
import net.corda.testing.*
import net.corda.testing.ALICE_NAME
import net.corda.testing.expect
import net.corda.testing.expectEvents
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetwork.MockNode
import net.corda.testing.node.MockNodeParameters
import net.corda.testing.node.testActor
import net.corda.testing.sequence
import org.apache.commons.io.IOUtils
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
import org.junit.Assert.assertArrayEquals
@ -289,6 +293,71 @@ class CordaRPCOpsImplTest {
}
}
@Test
fun `kill a stuck flow through RPC`() {
withPermissions(startFlow<NewJoinerFlow>(), invokeRpc(CordaRPCOps::killFlow), invokeRpc(CordaRPCOps::stateMachinesFeed), invokeRpc(CordaRPCOps::stateMachinesSnapshot)) {
val flow = rpc.startFlow(::NewJoinerFlow)
val killed = rpc.killFlow(flow.id)
assertThat(killed).isTrue()
assertThat(rpc.stateMachinesSnapshot().map { info -> info.id }).doesNotContain(flow.id)
}
}
@Test
fun `kill a waiting flow through RPC`() {
withPermissions(startFlow<HopefulFlow>(), invokeRpc(CordaRPCOps::killFlow), invokeRpc(CordaRPCOps::stateMachinesFeed), invokeRpc(CordaRPCOps::stateMachinesSnapshot)) {
val flow = rpc.startFlow(::HopefulFlow, alice)
val killed = rpc.killFlow(flow.id)
assertThat(killed).isTrue()
assertThat(rpc.stateMachinesSnapshot().map { info -> info.id }).doesNotContain(flow.id)
}
}
@Test
fun `kill a nonexistent flow through RPC`() {
withPermissions(invokeRpc(CordaRPCOps::killFlow)) {
val nonexistentFlowId = StateMachineRunId.createRandom()
val killed = rpc.killFlow(nonexistentFlowId)
assertThat(killed).isFalse()
}
}
@StartableByRPC
class NewJoinerFlow : FlowLogic<String>() {
@Suspendable
override fun call(): String {
logger.info("When can I join you say? Almost there buddy...")
Fiber.currentFiber().join()
return "You'll never get me!"
}
}
@StartableByRPC
class HopefulFlow(private val party: Party) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
logger.info("Waiting for a miracle...")
val miracle = initiateFlow(party).receive<String>().unwrap { it }
return miracle
}
}
class NonRPCFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() = Unit