diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 09b766e46d..8770bf5465 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -14,6 +14,12 @@ public void setMessage(String) public void setOriginalExceptionClassName(String) ## +public final class net.corda.core.CordaOID extends java.lang.Object + @org.jetbrains.annotations.NotNull public static final String CORDA_PLATFORM = "1.3.6.1.4.1.50530.1" + public static final net.corda.core.CordaOID INSTANCE + @org.jetbrains.annotations.NotNull public static final String R3_ROOT = "1.3.6.1.4.1.50530" + @org.jetbrains.annotations.NotNull public static final String X509_EXTENSION_CORDA_ROLE = "1.3.6.1.4.1.50530.1.1" +## @net.corda.core.serialization.CordaSerializable public class net.corda.core.CordaRuntimeException extends java.lang.RuntimeException implements net.corda.core.CordaThrowable public (String) public (String, String, Throwable) @@ -617,6 +623,7 @@ public static final class net.corda.core.contracts.UniqueIdentifier$Companion ex @org.jetbrains.annotations.NotNull public abstract String getName() @org.jetbrains.annotations.NotNull public abstract List getRpcFlows() @org.jetbrains.annotations.NotNull public abstract List getSchedulableFlows() + @org.jetbrains.annotations.NotNull public abstract List getSerializationCustomSerializers() @org.jetbrains.annotations.NotNull public abstract List getSerializationWhitelists() @org.jetbrains.annotations.NotNull public abstract List getServiceFlows() @org.jetbrains.annotations.NotNull public abstract List getServices() @@ -1542,6 +1549,7 @@ public final class net.corda.core.identity.IdentityUtils extends java.lang.Objec @org.jetbrains.annotations.NotNull public abstract Iterable getVaultTransactionNotes(net.corda.core.crypto.SecureHash) @kotlin.Deprecated @net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.DataFeed internalVerifiedTransactionsFeed() @kotlin.Deprecated @org.jetbrains.annotations.NotNull public abstract List internalVerifiedTransactionsSnapshot() + public abstract boolean killFlow(net.corda.core.flows.StateMachineRunId) @net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.DataFeed networkMapFeed() @org.jetbrains.annotations.NotNull public abstract List networkMapSnapshot() @org.jetbrains.annotations.NotNull public abstract net.corda.core.node.NodeInfo nodeInfo() @@ -1902,6 +1910,7 @@ public final class net.corda.core.node.services.TimeWindowChecker extends java.l @net.corda.core.DoNotImplement public interface net.corda.core.node.services.TransactionStorage extends net.corda.core.node.StateLoader @org.jetbrains.annotations.Nullable public abstract net.corda.core.transactions.SignedTransaction getTransaction(net.corda.core.crypto.SecureHash) @org.jetbrains.annotations.NotNull public abstract rx.Observable getUpdates() + @org.jetbrains.annotations.NotNull public abstract net.corda.core.contracts.TransactionState loadState(net.corda.core.contracts.StateRef) @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.DataFeed track() @org.jetbrains.annotations.NotNull public abstract net.corda.core.concurrent.CordaFuture trackTransaction(net.corda.core.crypto.SecureHash) ## @@ -2636,6 +2645,7 @@ public final class net.corda.core.schemas.CommonSchema extends java.lang.Object public static final net.corda.core.schemas.CommonSchema INSTANCE ## public final class net.corda.core.schemas.CommonSchemaV1 extends net.corda.core.schemas.MappedSchema + @org.jetbrains.annotations.NotNull protected String getMigrationResource() public static final net.corda.core.schemas.CommonSchemaV1 INSTANCE ## @javax.persistence.MappedSuperclass @net.corda.core.serialization.CordaSerializable public static class net.corda.core.schemas.CommonSchemaV1$FungibleState extends net.corda.core.schemas.PersistentState @@ -2666,6 +2676,7 @@ public final class net.corda.core.schemas.CommonSchemaV1 extends net.corda.core. public class net.corda.core.schemas.MappedSchema extends java.lang.Object public (Class, int, Iterable) @org.jetbrains.annotations.NotNull public final Iterable getMappedTypes() + @org.jetbrains.annotations.Nullable protected String getMigrationResource() @org.jetbrains.annotations.NotNull public final String getName() public final int getVersion() @org.jetbrains.annotations.NotNull public String toString() @@ -2691,6 +2702,9 @@ public class net.corda.core.schemas.MappedSchema extends java.lang.Object public final void setTxId(String) public String toString() ## +public final class net.corda.core.schemas.PersistentTypesKt extends java.lang.Object + @org.jetbrains.annotations.NotNull public static final String getMigrationResource(net.corda.core.schemas.MappedSchema) +## @net.corda.core.serialization.CordaSerializable public interface net.corda.core.schemas.QueryableState extends net.corda.core.contracts.ContractState @org.jetbrains.annotations.NotNull public abstract net.corda.core.schemas.PersistentState generateMappedObject(net.corda.core.schemas.MappedSchema) @org.jetbrains.annotations.NotNull public abstract Iterable supportedSchemas() diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPermissionsTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPermissionsTests.kt index fae5fbc6d5..25ad584211 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPermissionsTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPermissionsTests.kt @@ -1,9 +1,6 @@ package net.corda.client.rpc -import net.corda.core.flows.FlowLogic -import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.RPCOps -import net.corda.node.services.Permissions import net.corda.node.services.messaging.rpcContext import net.corda.nodeapi.internal.config.User import net.corda.testing.node.internal.RPCDriverDSL @@ -122,6 +119,17 @@ class RPCPermissionsTests : AbstractRPCTest() { } } + @Test + fun `killing flows requires permission`() { + + rpcDriver { + val proxy = testProxyFor(userOf("joe", emptySet())) + assertNotAllowed { + proxy.validatePermission("killFlow") + } + } + } + private fun assertNotAllowed(action: () -> Unit) { assertFailsWith(PermissionException::class, "User should not be allowed to perform this action.", action) diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index b6880a8103..a44b62ccc0 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -223,6 +223,13 @@ interface CordaRPCOps : RPCOps { @RPCReturnsObservables fun startTrackedFlowDynamic(logicType: Class>, vararg args: Any?): FlowProgressHandle + /** + * Attempts to kill a flow. This is not a clean termination and should be reserved for exceptional cases such as stuck fibers. + * + * @return whether the flow existed and was killed. + */ + fun killFlow(id: StateMachineRunId): Boolean + /** Returns Node's NodeInfo, assuming this will not change while the node is running. */ fun nodeInfo(): NodeInfo diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index d21c3105fe..0017ec6630 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -15,6 +15,10 @@ UNRELEASED Doorman, and ``CONFIDENTIAL_IDENTITY`` certificates must be issued from a ``WELL_KNOWN_LEGAL_IDENTITY`` certificate. For a detailed specification of the extension please see :doc:`permissioning-certificate-specification`. +* ``CordaRPCOps`` now exposes a function ``killFlow(id: StateMachineRunId): Boolean`` that attempts to terminate a flow. + This is not a clean termination and should be reserved for exceptional cases such as fibers that get stuck. + As usual, invoking this new function requires a user to be entitled with its specific permission. + * The network map service concept has been re-designed. More information can be found in :doc:`network-map`. * The previous design was never intended to be final but was rather a quick implementation in the earliest days of the diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index b3f7ebbec3..000675a048 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -9,6 +9,7 @@ import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowInitiator import net.corda.core.flows.FlowLogic import net.corda.core.flows.StartableByRPC +import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.AbstractParty import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party @@ -91,6 +92,8 @@ internal class CordaRPCOpsImpl( return snapshot } + override fun killFlow(id: StateMachineRunId) = smm.killFlow(id) + override fun stateMachinesFeed(): DataFeed, StateMachineUpdate> { return database.transaction { val (allStateMachines, changes) = smm.track() diff --git a/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt b/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt index d2f0f8afc1..e074bfb4b4 100644 --- a/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt +++ b/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt @@ -4,6 +4,7 @@ import net.corda.client.rpc.PermissionException import net.corda.core.contracts.ContractState import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.AbstractParty import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party @@ -65,6 +66,10 @@ class RpcAuthorisationProxy(private val implementation: CordaRPCOps, private val implementation.startTrackedFlowDynamic(logicType, *args) } + override fun killFlow(id: StateMachineRunId): Boolean = guard("killFlow") { + return implementation.killFlow(id) + } + override fun nodeInfo(): NodeInfo = guard("nodeInfo", implementation::nodeInfo) override fun notaryIdentities(): List = guard("notaryIdentities", implementation::notaryIdentities) diff --git a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt index 227780cb4c..4a55d7163a 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt @@ -15,10 +15,10 @@ interface CheckpointStorage { fun addCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes) /** - * Remove existing checkpoint from the store. It is an error to attempt to remove a checkpoint which doesn't exist - * in the store. Doing so will throw an [IllegalArgumentException]. + * Remove existing checkpoint from the store. + * @return whether the id matched a checkpoint that was removed. */ - fun removeCheckpoint(id: StateMachineRunId) + fun removeCheckpoint(id: StateMachineRunId): Boolean /** * Stream all checkpoints from the store. If this is backed by a database the stream will be valid until the diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index fcad94611e..91b64b81e6 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -42,13 +42,13 @@ class DBCheckpointStorage : CheckpointStorage { }) } - override fun removeCheckpoint(id: StateMachineRunId) { + override fun removeCheckpoint(id: StateMachineRunId): Boolean { val session = DatabaseTransactionManager.current().session val criteriaBuilder = session.criteriaBuilder val delete = criteriaBuilder.createCriteriaDelete(DBCheckpoint::class.java) val root = delete.from(DBCheckpoint::class.java) delete.where(criteriaBuilder.equal(root.get(DBCheckpoint::checkpointId.name), id.uuid.toString())) - session.createQuery(delete).executeUpdate() + return session.createQuery(delete).executeUpdate() > 0 } override fun getAllCheckpoints(): Stream>> { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 2a1bc3b219..924de20d51 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -78,6 +78,13 @@ interface StateMachineManager { * Returns all currently live flows. */ val allStateMachines: List> + + /** + * Attempts to kill a flow. This is not a clean termination and should be reserved for exceptional cases such as stuck fibers. + * + * @return whether the flow existed and was killed. + */ + fun killFlow(id: StateMachineRunId): Boolean } // These must be idempotent! A later failure in the state transition may error the flow state, and a replay may call diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt index a281e7fc4d..00bcae9d92 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt @@ -187,6 +187,30 @@ class StateMachineManagerImpl( ) } + override fun killFlow(id: StateMachineRunId): Boolean { + + return mutex.locked { + val flow = flows.remove(id) + if (flow != null) { + logger.debug("Killing flow known to physical node.") + decrementLiveFibers() + unfinishedFibers.countDown() + try { + flow.fiber.interrupt() + true + } finally { + database.transaction { + checkpointStorage.removeCheckpoint(id) + } + } + } else { + // TODO replace with a clustered delete after we'll support clustered nodes + logger.debug("Unable to kill a flow unknown to physical node. Might be processed by another physical node.") + false + } + } + } + override fun addSessionBinding(flowId: StateMachineRunId, sessionId: SessionId) { val previousFlowId = sessionToFlow.put(sessionId, flowId) if (previousFlowId != null) { diff --git a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt index a554e194cd..b706edc037 100644 --- a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt @@ -1,5 +1,6 @@ package net.corda.node +import co.paralleluniverse.fibers.Fiber import co.paralleluniverse.fibers.Suspendable import net.corda.client.rpc.PermissionException import net.corda.core.context.AuthServiceId @@ -9,9 +10,7 @@ import net.corda.core.contracts.ContractState import net.corda.core.contracts.Issued import net.corda.core.crypto.isFulfilledBy import net.corda.core.crypto.keys -import net.corda.core.flows.FlowLogic -import net.corda.core.flows.StartableByRPC -import net.corda.core.flows.StateMachineRunId +import net.corda.core.flows.* import net.corda.core.identity.Party import net.corda.core.messaging.* import net.corda.core.node.services.Vault @@ -19,6 +18,7 @@ import net.corda.core.node.services.queryBy import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.unwrap import net.corda.finance.DOLLARS import net.corda.finance.GBP import net.corda.finance.USD @@ -33,12 +33,16 @@ import net.corda.node.services.Permissions.Companion.startFlow import net.corda.node.services.messaging.CURRENT_RPC_CONTEXT import net.corda.node.services.messaging.RpcAuthContext import net.corda.nodeapi.internal.config.User -import net.corda.testing.* +import net.corda.testing.ALICE_NAME +import net.corda.testing.expect +import net.corda.testing.expectEvents import net.corda.testing.node.MockNetwork import net.corda.testing.node.MockNetwork.MockNode import net.corda.testing.node.MockNodeParameters import net.corda.testing.node.testActor +import net.corda.testing.sequence import org.apache.commons.io.IOUtils +import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.junit.After import org.junit.Assert.assertArrayEquals @@ -289,6 +293,71 @@ class CordaRPCOpsImplTest { } } + @Test + fun `kill a stuck flow through RPC`() { + + withPermissions(startFlow(), invokeRpc(CordaRPCOps::killFlow), invokeRpc(CordaRPCOps::stateMachinesFeed), invokeRpc(CordaRPCOps::stateMachinesSnapshot)) { + + val flow = rpc.startFlow(::NewJoinerFlow) + + val killed = rpc.killFlow(flow.id) + + assertThat(killed).isTrue() + assertThat(rpc.stateMachinesSnapshot().map { info -> info.id }).doesNotContain(flow.id) + } + } + + @Test + fun `kill a waiting flow through RPC`() { + + withPermissions(startFlow(), invokeRpc(CordaRPCOps::killFlow), invokeRpc(CordaRPCOps::stateMachinesFeed), invokeRpc(CordaRPCOps::stateMachinesSnapshot)) { + + val flow = rpc.startFlow(::HopefulFlow, alice) + + val killed = rpc.killFlow(flow.id) + + assertThat(killed).isTrue() + assertThat(rpc.stateMachinesSnapshot().map { info -> info.id }).doesNotContain(flow.id) + } + } + + @Test + fun `kill a nonexistent flow through RPC`() { + + withPermissions(invokeRpc(CordaRPCOps::killFlow)) { + + val nonexistentFlowId = StateMachineRunId.createRandom() + + val killed = rpc.killFlow(nonexistentFlowId) + + assertThat(killed).isFalse() + } + } + + @StartableByRPC + class NewJoinerFlow : FlowLogic() { + + @Suspendable + override fun call(): String { + + logger.info("When can I join you say? Almost there buddy...") + Fiber.currentFiber().join() + return "You'll never get me!" + } + } + + @StartableByRPC + class HopefulFlow(private val party: Party) : FlowLogic() { + + @Suspendable + override fun call(): String { + + logger.info("Waiting for a miracle...") + val miracle = initiateFlow(party).receive().unwrap { it } + return miracle + } + } + class NonRPCFlow : FlowLogic() { @Suspendable override fun call() = Unit