mirror of
https://github.com/corda/corda.git
synced 2024-12-24 07:06:44 +00:00
ENT-5768 startFlowWithClientId permissions (#6708)
Do not let a user reattach to a flow started by another user. Reattaching to a flow using startFlowWithClientId for a flow not started by the current user throws a PermissionException Reattaching to a flow using reattachFlowWithClientId for a flow not started by the current user returns null. finishedFlowsWithClientIds does not return flows started by other users. Normal rpc permissions around startFlowWithClientId and startFlowDynamicWithClientId has also been added. To allow admins to remove client ids as well as be able to see all the client ids on the node, admin versions have been added that bypass the user restrictions. These can be permitted via rpc to only provide their usage to admins.
This commit is contained in:
parent
b746d5cb22
commit
bd7b96e816
@ -54,6 +54,9 @@ class RPCPermissionsTests : AbstractRPCTest() {
|
||||
assertNotAllowed {
|
||||
proxy.validatePermission("startFlowDynamic", "net.corda.flows.DummyFlow")
|
||||
}
|
||||
assertNotAllowed {
|
||||
proxy.validatePermission("startFlow", "net.corda.flows.DummyFlow")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -64,6 +67,10 @@ class RPCPermissionsTests : AbstractRPCTest() {
|
||||
val proxy = testProxyFor(adminUser)
|
||||
proxy.validatePermission("startFlowDynamic", "net.corda.flows.DummyFlow")
|
||||
proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.DummyFlow")
|
||||
proxy.validatePermission("startFlowDynamicWithClientId", "net.corda.flows.DummyFlow")
|
||||
proxy.validatePermission("startFlow", "net.corda.flows.DummyFlow")
|
||||
proxy.validatePermission("startTrackedFlow", "net.corda.flows.DummyFlow")
|
||||
proxy.validatePermission("startFlowWithClientId", "net.corda.flows.DummyFlow")
|
||||
}
|
||||
}
|
||||
|
||||
@ -74,6 +81,10 @@ class RPCPermissionsTests : AbstractRPCTest() {
|
||||
val proxy = testProxyFor(joeUser)
|
||||
proxy.validatePermission("startFlowDynamic", "net.corda.flows.DummyFlow")
|
||||
proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.DummyFlow")
|
||||
proxy.validatePermission("startFlowDynamicWithClientId", "net.corda.flows.DummyFlow")
|
||||
proxy.validatePermission("startFlow", "net.corda.flows.DummyFlow")
|
||||
proxy.validatePermission("startTrackedFlow", "net.corda.flows.DummyFlow")
|
||||
proxy.validatePermission("startFlowWithClientId", "net.corda.flows.DummyFlow")
|
||||
}
|
||||
}
|
||||
|
||||
@ -88,6 +99,18 @@ class RPCPermissionsTests : AbstractRPCTest() {
|
||||
assertNotAllowed {
|
||||
proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.OtherFlow")
|
||||
}
|
||||
assertNotAllowed {
|
||||
proxy.validatePermission("startFlowDynamicWithClientId", "net.corda.flows.OtherFlow")
|
||||
}
|
||||
assertNotAllowed {
|
||||
proxy.validatePermission("startFlow", "net.corda.flows.OtherFlow")
|
||||
}
|
||||
assertNotAllowed {
|
||||
proxy.validatePermission("startTrackedFlow", "net.corda.flows.OtherFlow")
|
||||
}
|
||||
assertNotAllowed {
|
||||
proxy.validatePermission("startFlowWithClientId", "net.corda.flows.OtherFlow")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -118,6 +141,16 @@ class RPCPermissionsTests : AbstractRPCTest() {
|
||||
proxy.validatePermission("startFlowDynamic", "net.corda.flows.DummyFlow")
|
||||
proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.DummyFlow")
|
||||
proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.OtherFlow")
|
||||
proxy.validatePermission("startFlowDynamicWithClientId", "net.corda.flows.OtherFlow")
|
||||
proxy.validatePermission("startFlowDynamicWithClientId", "net.corda.flows.DummyFlow")
|
||||
|
||||
proxy.validatePermission("startFlow", "net.corda.flows.OtherFlow")
|
||||
proxy.validatePermission("startFlow", "net.corda.flows.DummyFlow")
|
||||
proxy.validatePermission("startTrackedFlow", "net.corda.flows.DummyFlow")
|
||||
proxy.validatePermission("startTrackedFlow", "net.corda.flows.OtherFlow")
|
||||
proxy.validatePermission("startFlowWithClientId", "net.corda.flows.OtherFlow")
|
||||
proxy.validatePermission("startFlowWithClientId", "net.corda.flows.DummyFlow")
|
||||
|
||||
assertNotAllowed {
|
||||
proxy.validatePermission("startTrackedFlowDynamic", "net.banned.flows.OtherFlow")
|
||||
}
|
||||
|
@ -314,6 +314,7 @@ interface CordaRPCOps : RPCOps {
|
||||
|
||||
/**
|
||||
* Removes a flow's [clientId] to result/ exception mapping. If the mapping is of a running flow, then the mapping will not get removed.
|
||||
* This version will only remove flow's that were started by the same user currently calling [removeClientId].
|
||||
*
|
||||
* See [startFlowDynamicWithClientId] for more information.
|
||||
*
|
||||
@ -322,13 +323,32 @@ interface CordaRPCOps : RPCOps {
|
||||
fun removeClientId(clientId: String): Boolean
|
||||
|
||||
/**
|
||||
* Returns all finished flows that were started with a client id.
|
||||
* Removes a flow's [clientId] to result/ exception mapping. If the mapping is of a running flow, then the mapping will not get removed.
|
||||
* This version can be called for all client ids, ignoring which user originally started a flow with [clientId].
|
||||
*
|
||||
* @return A [Map] containing client ids for finished flows, mapped to [true] if finished successfully,
|
||||
* [false] if completed exceptionally.
|
||||
* See [startFlowDynamicWithClientId] for more information.
|
||||
*
|
||||
* @return whether the mapping was removed.
|
||||
*/
|
||||
fun removeClientIdAsAdmin(clientId: String): Boolean
|
||||
|
||||
/**
|
||||
* Returns all finished flows that were started with a client ID for which the client ID mapping has not been removed. This version only
|
||||
* returns the client ids for flows started by the same user currently calling [finishedFlowsWithClientIds].
|
||||
*
|
||||
* @return A [Map] containing client ids for finished flows started by the user calling [finishedFlowsWithClientIds], mapped to [true]
|
||||
* if finished successfully, [false] if completed exceptionally.
|
||||
*/
|
||||
fun finishedFlowsWithClientIds(): Map<String, Boolean>
|
||||
|
||||
/**
|
||||
* Returns all finished flows that were started with a client id by all RPC users for which the client ID mapping has not been removed.
|
||||
*
|
||||
* @return A [Map] containing all client ids for finished flows, mapped to [true] if finished successfully,
|
||||
* [false] if completed exceptionally.
|
||||
*/
|
||||
fun finishedFlowsWithClientIdsAsAdmin(): Map<String, Boolean>
|
||||
|
||||
/** Returns Node's NodeInfo, assuming this will not change while the node is running. */
|
||||
fun nodeInfo(): NodeInfo
|
||||
|
||||
|
@ -1,6 +1,8 @@
|
||||
package net.corda.node.flows
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.client.rpc.PermissionException
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.HospitalizeFlowException
|
||||
@ -15,12 +17,14 @@ import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.messaging.startFlowWithClientId
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.Permissions
|
||||
import net.corda.node.services.statemachine.Checkpoint
|
||||
import net.corda.nodeapi.exceptions.RejectedCommandException
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.NodeHandle
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.node.User
|
||||
import org.assertj.core.api.Assertions
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
@ -28,11 +32,14 @@ import rx.Observable
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeoutException
|
||||
import kotlin.reflect.KClass
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertFalse
|
||||
import kotlin.test.assertNotEquals
|
||||
import kotlin.test.assertNull
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class FlowWithClientIdTest {
|
||||
@ -42,7 +49,7 @@ class FlowWithClientIdTest {
|
||||
ResultFlow.hook = null
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
@Test(timeout = 300_000)
|
||||
fun `start flow with client id`() {
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
|
||||
@ -54,7 +61,7 @@ class FlowWithClientIdTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
@Test(timeout = 300_000)
|
||||
fun `remove client id`() {
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
var counter = 0
|
||||
@ -77,7 +84,7 @@ class FlowWithClientIdTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
@Test(timeout = 300_000)
|
||||
fun `on flow unserializable result a 'CordaRuntimeException' is thrown containing in its message the unserializable type`() {
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
|
||||
@ -92,7 +99,7 @@ class FlowWithClientIdTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
@Test(timeout = 300_000)
|
||||
fun `If flow has an unserializable exception result then it gets converted into a 'CordaRuntimeException'`() {
|
||||
ResultFlow.hook = {
|
||||
throw UnserializableException()
|
||||
@ -120,7 +127,7 @@ class FlowWithClientIdTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
@Test(timeout = 300_000)
|
||||
fun `reattachFlowWithClientId can retrieve results from existing flow future`() {
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
|
||||
@ -154,7 +161,7 @@ class FlowWithClientIdTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
@Test(timeout = 300_000)
|
||||
fun `finishedFlowsWithClientIds returns completed flows with client ids`() {
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
|
||||
@ -228,7 +235,7 @@ class FlowWithClientIdTest {
|
||||
return rpc.startFlow(::IsFlowInStatus, id, status.ordinal).returnValue.getOrThrow(20.seconds)
|
||||
}
|
||||
|
||||
private fun <T: Exception> NodeHandle.hasException(id: StateMachineRunId, type: KClass<T>): Boolean {
|
||||
private fun <T : Exception> NodeHandle.hasException(id: StateMachineRunId, type: KClass<T>): Boolean {
|
||||
return rpc.startFlow(::GetExceptionType, id).returnValue.getOrThrow(20.seconds) == type.qualifiedName
|
||||
}
|
||||
|
||||
@ -244,8 +251,195 @@ class FlowWithClientIdTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `reattaching to existing running flow using startFlowWithClientId for flow started by another user throws a permission exception`() {
|
||||
val user = User("TonyStark", "I AM IRONMAN", setOf(Permissions.all()))
|
||||
val spy = User("spy", "l33t h4ck4r", setOf(Permissions.all()))
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
|
||||
val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow()
|
||||
val latch = CountDownLatch(1)
|
||||
ResultFlow.hook = {
|
||||
latch.await()
|
||||
}
|
||||
val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5)
|
||||
val reattachedByStarter = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5)
|
||||
|
||||
assertFailsWith<PermissionException> {
|
||||
CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use {
|
||||
it.proxy.startFlowWithClientId(clientId, ::ResultFlow, 5)
|
||||
}
|
||||
}
|
||||
|
||||
latch.countDown()
|
||||
|
||||
assertEquals(5, flowHandle.returnValue.getOrThrow(20.seconds))
|
||||
assertEquals(5, reattachedByStarter.returnValue.getOrThrow(20.seconds))
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `reattaching to existing completed flow using startFlowWithClientId for flow started by another user throws a permission exception`() {
|
||||
val user = User("TonyStark", "I AM IRONMAN", setOf(Permissions.all()))
|
||||
val spy = User("spy", "l33t h4ck4r", setOf(Permissions.all()))
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
|
||||
val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow()
|
||||
nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5).returnValue.getOrThrow(20.seconds)
|
||||
|
||||
assertFailsWith<PermissionException> {
|
||||
CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use {
|
||||
it.proxy.startFlowWithClientId(clientId, ::ResultFlow, 5)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `reattaching to existing completed flow using startFlowWithClientId for flow started by another user throws a permission exception (after node restart)`() {
|
||||
val user = User("TonyStark", "I AM IRONMAN", setOf(Permissions.all()))
|
||||
val spy = User("spy", "l33t h4ck4r", setOf(Permissions.all()))
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet(), inMemoryDB = false)) {
|
||||
var nodeA = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user, spy)).getOrThrow()
|
||||
nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5).returnValue.getOrThrow(20.seconds)
|
||||
|
||||
nodeA.stop()
|
||||
nodeA = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user, spy)).getOrThrow(20.seconds)
|
||||
|
||||
assertFailsWith<PermissionException> {
|
||||
CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use {
|
||||
it.proxy.startFlowWithClientId(clientId, ::ResultFlow, 5)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `reattaching to existing flow using reattachFlowWithClientId for flow started by another user returns null`() {
|
||||
val user = User("dan", "this is my password", setOf(Permissions.all()))
|
||||
val spy = User("spy", "l33t h4ck4r", setOf(Permissions.all()))
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
|
||||
val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow()
|
||||
val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5)
|
||||
val reattachedByStarter = nodeA.rpc.reattachFlowWithClientId<Int>(clientId)?.returnValue?.getOrThrow(20.seconds)
|
||||
|
||||
val reattachedBySpy = CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use {
|
||||
it.proxy.reattachFlowWithClientId<Int>(clientId)?.returnValue?.getOrThrow(20.seconds)
|
||||
}
|
||||
|
||||
assertEquals(5, flowHandle.returnValue.getOrThrow(20.seconds))
|
||||
assertEquals(5, reattachedByStarter)
|
||||
assertNull(reattachedBySpy)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `removeClientId does not remove mapping for flows started by another user`() {
|
||||
val user = User("dan", "this is my password", setOf(Permissions.all()))
|
||||
val spy = User("spy", "l33t h4ck4r", setOf(Permissions.all()))
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
|
||||
val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow()
|
||||
val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5)
|
||||
|
||||
flowHandle.returnValue.getOrThrow(20.seconds)
|
||||
|
||||
val removedBySpy = CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use {
|
||||
it.proxy.removeClientId(clientId)
|
||||
}
|
||||
|
||||
val reattachedByStarter = nodeA.rpc.reattachFlowWithClientId<Int>(clientId)?.returnValue?.getOrThrow(20.seconds)
|
||||
val removedByStarter = nodeA.rpc.removeClientId(clientId)
|
||||
|
||||
assertEquals(5, reattachedByStarter)
|
||||
assertTrue(removedByStarter)
|
||||
assertFalse(removedBySpy)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `removeClientIdAsAdmin does remove mapping for flows started by another user`() {
|
||||
val user = User("dan", "this is my password", setOf(Permissions.all()))
|
||||
val spy = User("spy", "l33t h4ck4r", setOf(Permissions.all()))
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
|
||||
val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow()
|
||||
val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5)
|
||||
|
||||
flowHandle.returnValue.getOrThrow(20.seconds)
|
||||
|
||||
val removedBySpy = CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use {
|
||||
it.proxy.removeClientIdAsAdmin(clientId)
|
||||
}
|
||||
|
||||
val reattachedByStarter = nodeA.rpc.reattachFlowWithClientId<Int>(clientId)?.returnValue?.getOrThrow(20.seconds)
|
||||
val removedByStarter = nodeA.rpc.removeClientIdAsAdmin(clientId)
|
||||
|
||||
assertNull(reattachedByStarter)
|
||||
assertFalse(removedByStarter)
|
||||
assertTrue(removedBySpy)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `finishedFlowsWithClientIds does not return flows started by other users`() {
|
||||
val user = User("CaptainAmerica", "That really is America's ass", setOf(Permissions.all()))
|
||||
val spy = User("nsa", "EternalBlue", setOf(Permissions.all()))
|
||||
val clientIdForUser = UUID.randomUUID().toString()
|
||||
val clientIdForSpy = UUID.randomUUID().toString()
|
||||
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
|
||||
val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow()
|
||||
val flowHandleStartedByUser = nodeA.rpc.startFlowWithClientId(clientIdForUser, ::ResultFlow, 5)
|
||||
|
||||
CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use {
|
||||
val flowHandleStartedBySpy = it.proxy.startFlowWithClientId(clientIdForSpy, ::ResultFlow, 10)
|
||||
|
||||
flowHandleStartedByUser.returnValue.getOrThrow(20.seconds)
|
||||
flowHandleStartedBySpy.returnValue.getOrThrow(20.seconds)
|
||||
|
||||
val userFinishedFlows = nodeA.rpc.finishedFlowsWithClientIds()
|
||||
val spyFinishedFlows = it.proxy.finishedFlowsWithClientIds()
|
||||
|
||||
assertEquals(1, userFinishedFlows.size)
|
||||
assertEquals(clientIdForUser, userFinishedFlows.keys.single())
|
||||
assertEquals(5, nodeA.rpc.reattachFlowWithClientId<Int>(userFinishedFlows.keys.single())!!.returnValue.getOrThrow())
|
||||
assertEquals(1, spyFinishedFlows.size)
|
||||
assertEquals(clientIdForSpy, spyFinishedFlows.keys.single())
|
||||
assertEquals(10, it.proxy.reattachFlowWithClientId<Int>(spyFinishedFlows.keys.single())!!.returnValue.getOrThrow())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `finishedFlowsWithClientIdsAsAdmin does return flows started by other users`() {
|
||||
val user = User("CaptainAmerica", "That really is America's ass", setOf(Permissions.all()))
|
||||
val spy = User("nsa", "EternalBlue", setOf(Permissions.all()))
|
||||
val clientIdForUser = UUID.randomUUID().toString()
|
||||
val clientIdForSpy = UUID.randomUUID().toString()
|
||||
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
|
||||
val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow()
|
||||
val flowHandleStartedByUser = nodeA.rpc.startFlowWithClientId(clientIdForUser, ::ResultFlow, 5)
|
||||
|
||||
CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use {
|
||||
val flowHandleStartedBySpy = it.proxy.startFlowWithClientId(clientIdForSpy, ::ResultFlow, 10)
|
||||
|
||||
flowHandleStartedByUser.returnValue.getOrThrow(20.seconds)
|
||||
flowHandleStartedBySpy.returnValue.getOrThrow(20.seconds)
|
||||
|
||||
val userFinishedFlows = nodeA.rpc.finishedFlowsWithClientIdsAsAdmin()
|
||||
val spyFinishedFlows = it.proxy.finishedFlowsWithClientIdsAsAdmin()
|
||||
|
||||
assertEquals(2, userFinishedFlows.size)
|
||||
assertEquals(2, spyFinishedFlows.size)
|
||||
assertEquals(userFinishedFlows, spyFinishedFlows)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {
|
||||
internal class ResultFlow<A>(private val result: A) : FlowLogic<A>() {
|
||||
companion object {
|
||||
var hook: (() -> Unit)? = null
|
||||
var suspendableHook: FlowLogic<Unit>? = null
|
||||
@ -260,9 +454,9 @@ class FlowWithClientIdTest {
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
internal class UnserializableResultFlow: FlowLogic<OpenFuture<Observable<Unit>>>() {
|
||||
internal class UnserializableResultFlow : FlowLogic<OpenFuture<Observable<Unit>>>() {
|
||||
companion object {
|
||||
val UNSERIALIZABLE_OBJECT = openFuture<Observable<Unit>>().also { it.set(Observable.empty<Unit>())}
|
||||
val UNSERIALIZABLE_OBJECT = openFuture<Observable<Unit>>().also { it.set(Observable.empty<Unit>()) }
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
@ -272,7 +466,7 @@ class FlowWithClientIdTest {
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
internal class HospitalizeFlow: FlowLogic<Unit>() {
|
||||
internal class HospitalizeFlow : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
@ -281,7 +475,7 @@ class FlowWithClientIdTest {
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
internal class IsFlowInStatus(private val id: StateMachineRunId, private val ordinal: Int): FlowLogic<Boolean>() {
|
||||
internal class IsFlowInStatus(private val id: StateMachineRunId, private val ordinal: Int) : FlowLogic<Boolean>() {
|
||||
@Suspendable
|
||||
override fun call(): Boolean {
|
||||
return serviceHub.jdbcSession().prepareStatement("select count(*) from node_checkpoints where status = ? and flow_id = ?")
|
||||
@ -299,7 +493,7 @@ class FlowWithClientIdTest {
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
internal class GetExceptionType(private val id: StateMachineRunId): FlowLogic<String>() {
|
||||
internal class GetExceptionType(private val id: StateMachineRunId) : FlowLogic<String>() {
|
||||
@Suspendable
|
||||
override fun call(): String {
|
||||
return serviceHub.jdbcSession().prepareStatement("select type from node_flow_exceptions where flow_id = ?")
|
||||
@ -316,5 +510,5 @@ class FlowWithClientIdTest {
|
||||
|
||||
internal class UnserializableException(
|
||||
val unserializableObject: BrokenMap<Unit, Unit> = BrokenMap()
|
||||
): CordaRuntimeException("123")
|
||||
) : CordaRuntimeException("123")
|
||||
}
|
@ -173,14 +173,18 @@ internal class CordaRPCOpsImpl(
|
||||
override fun killFlow(id: StateMachineRunId): Boolean = smm.killFlow(id)
|
||||
|
||||
override fun <T> reattachFlowWithClientId(clientId: String): FlowHandleWithClientId<T>? {
|
||||
return smm.reattachFlowWithClientId<T>(clientId)?.run {
|
||||
return smm.reattachFlowWithClientId<T>(clientId, context().principal())?.run {
|
||||
FlowHandleWithClientIdImpl(id = id, returnValue = resultFuture, clientId = clientId)
|
||||
}
|
||||
}
|
||||
|
||||
override fun removeClientId(clientId: String): Boolean = smm.removeClientId(clientId)
|
||||
override fun removeClientId(clientId: String): Boolean = smm.removeClientId(clientId, context().principal(), false)
|
||||
|
||||
override fun finishedFlowsWithClientIds(): Map<String, Boolean> = smm.finishedFlowsWithClientIds()
|
||||
override fun removeClientIdAsAdmin(clientId: String): Boolean = smm.removeClientId(clientId, context().principal(), true)
|
||||
|
||||
override fun finishedFlowsWithClientIds(): Map<String, Boolean> = smm.finishedFlowsWithClientIds(context().principal(), false)
|
||||
|
||||
override fun finishedFlowsWithClientIdsAsAdmin(): Map<String, Boolean> = smm.finishedFlowsWithClientIds(context().principal(), true)
|
||||
|
||||
override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
|
||||
|
||||
@ -277,9 +281,8 @@ internal class CordaRPCOpsImpl(
|
||||
private fun <T> startFlow(logicType: Class<out FlowLogic<T>>, context: InvocationContext, args: Array<out Any?>): FlowStateMachineHandle<T> {
|
||||
if (!logicType.isAnnotationPresent(StartableByRPC::class.java)) throw NonRpcFlowException(logicType)
|
||||
if (isFlowsDrainingModeEnabled()) {
|
||||
return context.clientId?.let {
|
||||
smm.reattachFlowWithClientId<T>(context.clientId!!)
|
||||
} ?: throw RejectedCommandException("Node is draining before shutdown. Cannot start new flows through RPC.")
|
||||
return context.clientId?.let { smm.reattachFlowWithClientId<T>(it, context.principal()) }
|
||||
?: throw RejectedCommandException("Node is draining before shutdown. Cannot start new flows through RPC.")
|
||||
}
|
||||
return flowStarter.invokeFlowAsync(logicType, context, *args).getOrThrow()
|
||||
}
|
||||
|
@ -144,31 +144,35 @@ private object RPCPermissionResolver : PermissionResolver {
|
||||
private const val ACTION_INVOKE_RPC = "invokerpc"
|
||||
private const val ACTION_ALL = "all"
|
||||
private val FLOW_RPC_CALLS = setOf(
|
||||
"startFlowDynamic",
|
||||
"startTrackedFlowDynamic",
|
||||
"startFlow",
|
||||
"startTrackedFlow")
|
||||
"startFlowDynamic",
|
||||
"startTrackedFlowDynamic",
|
||||
"startFlowDynamicWithClientId",
|
||||
"startFlow",
|
||||
"startTrackedFlow",
|
||||
"startFlowWithClientId"
|
||||
)
|
||||
|
||||
private val FLOW_RPC_PERMITTED_START_FLOW_CALLS = setOf("startFlow", "startFlowDynamic")
|
||||
private val FLOW_RPC_PERMITTED_TRACKED_START_FLOW_CALLS = setOf("startTrackedFlow", "startTrackedFlowDynamic")
|
||||
private val FLOW_RPC_PERMITTED_START_FLOW_WITH_CLIENT_ID_CALLS = setOf("startFlowWithClientId", "startFlowDynamicWithClientId")
|
||||
|
||||
override fun resolvePermission(representation: String): Permission {
|
||||
val action = representation.substringBefore(SEPARATOR).toLowerCase()
|
||||
val action = representation.substringBefore(SEPARATOR).toLowerCase()
|
||||
when (action) {
|
||||
ACTION_INVOKE_RPC -> {
|
||||
val rpcCall = representation.substringAfter(SEPARATOR, "")
|
||||
require(representation.count { it == SEPARATOR } == 1 && !rpcCall.isEmpty()) {
|
||||
"Malformed permission string"
|
||||
}
|
||||
val permitted = when(rpcCall) {
|
||||
"startFlow" -> setOf("startFlowDynamic", rpcCall)
|
||||
"startTrackedFlow" -> setOf("startTrackedFlowDynamic", rpcCall)
|
||||
require(representation.count { it == SEPARATOR } == 1 && rpcCall.isNotEmpty()) { "Malformed permission string" }
|
||||
val permitted = when (rpcCall) {
|
||||
"startFlow" -> FLOW_RPC_PERMITTED_START_FLOW_CALLS
|
||||
"startTrackedFlow" -> FLOW_RPC_PERMITTED_TRACKED_START_FLOW_CALLS
|
||||
"startFlowWithClientId" -> FLOW_RPC_PERMITTED_START_FLOW_WITH_CLIENT_ID_CALLS
|
||||
else -> setOf(rpcCall)
|
||||
}
|
||||
return RPCPermission(permitted)
|
||||
}
|
||||
ACTION_START_FLOW -> {
|
||||
val targetFlow = representation.substringAfter(SEPARATOR, "")
|
||||
require(targetFlow.isNotEmpty()) {
|
||||
"Missing target flow after StartFlow"
|
||||
}
|
||||
require(targetFlow.isNotEmpty()) { "Missing target flow after StartFlow" }
|
||||
return RPCPermission(FLOW_RPC_CALLS, targetFlow)
|
||||
}
|
||||
ACTION_ALL -> {
|
||||
|
@ -26,6 +26,7 @@ import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||
import org.apache.commons.lang3.exception.ExceptionUtils
|
||||
import org.hibernate.annotations.Type
|
||||
import java.security.Principal
|
||||
import java.sql.Connection
|
||||
import java.sql.SQLException
|
||||
import java.time.Clock
|
||||
@ -572,7 +573,12 @@ class DBCheckpointStorage(
|
||||
override fun getFinishedFlowsResultsMetadata(): Stream<Pair<StateMachineRunId, FlowResultMetadata>> {
|
||||
val session = currentDBSession()
|
||||
val jpqlQuery =
|
||||
"""select new ${DBFlowResultMetadataFields::class.java.name}(checkpoint.id, checkpoint.status, metadata.userSuppliedIdentifier)
|
||||
"""select new ${DBFlowResultMetadataFields::class.java.name}(
|
||||
checkpoint.id,
|
||||
checkpoint.status,
|
||||
metadata.userSuppliedIdentifier,
|
||||
metadata.startedBy
|
||||
)
|
||||
from ${DBFlowCheckpoint::class.java.name} checkpoint
|
||||
join ${DBFlowMetadata::class.java.name} metadata on metadata.id = checkpoint.flowMetadata
|
||||
where checkpoint.status = ${FlowStatus.COMPLETED.ordinal}
|
||||
@ -580,7 +586,8 @@ class DBCheckpointStorage(
|
||||
or checkpoint.status = ${FlowStatus.KILLED.ordinal}""".trimIndent()
|
||||
val query = session.createQuery(jpqlQuery, DBFlowResultMetadataFields::class.java)
|
||||
return query.resultList.stream().map {
|
||||
StateMachineRunId(UUID.fromString(it.id)) to FlowResultMetadata(it.status, it.clientId)
|
||||
val startedBy = it.startedBy
|
||||
StateMachineRunId(UUID.fromString(it.id)) to FlowResultMetadata(it.status, it.clientId, Principal { startedBy })
|
||||
}
|
||||
}
|
||||
|
||||
@ -753,7 +760,8 @@ class DBCheckpointStorage(
|
||||
private class DBFlowResultMetadataFields(
|
||||
val id: String,
|
||||
val status: FlowStatus,
|
||||
val clientId: String?
|
||||
val clientId: String?,
|
||||
val startedBy: String
|
||||
)
|
||||
|
||||
private fun <T : Any> T.storageSerialize(): SerializedBytes<T> {
|
||||
|
@ -6,6 +6,7 @@ import co.paralleluniverse.fibers.instrument.JavaAgent
|
||||
import co.paralleluniverse.strands.channels.Channel
|
||||
import com.codahale.metrics.Gauge
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder
|
||||
import net.corda.client.rpc.PermissionException
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.context.InvocationContext
|
||||
import net.corda.core.flows.FlowException
|
||||
@ -48,6 +49,7 @@ import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
|
||||
import net.corda.serialization.internal.withTokenContext
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import rx.Observable
|
||||
import java.security.Principal
|
||||
import java.security.SecureRandom
|
||||
import java.util.ArrayList
|
||||
import java.util.HashSet
|
||||
@ -179,7 +181,7 @@ internal class SingleThreadedStateMachineManager(
|
||||
flowTimeoutScheduler::resetCustomTimeout
|
||||
)
|
||||
|
||||
val (fibers, pausedFlows) = restoreFlowsFromCheckpoints()
|
||||
val (flows, pausedFlows) = restoreFlowsFromCheckpoints()
|
||||
metrics.register("Flows.InFlight", Gauge<Int> { innerState.flows.size })
|
||||
|
||||
setFlowDefaultUncaughtExceptionHandler()
|
||||
@ -195,35 +197,40 @@ internal class SingleThreadedStateMachineManager(
|
||||
}
|
||||
|
||||
// - Incompatible checkpoints need to be handled upon implementing CORDA-3897
|
||||
for (flow in fibers.values) {
|
||||
for ((id, flow) in flows) {
|
||||
flow.fiber.clientId?.let {
|
||||
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active(flow.fiber.id, doneFuture(flow.fiber))
|
||||
}
|
||||
}
|
||||
|
||||
for (pausedFlow in pausedFlows) {
|
||||
pausedFlow.value.checkpoint.checkpointState.invocationContext.clientId?.let {
|
||||
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active(
|
||||
pausedFlow.key,
|
||||
doneClientIdFuture(pausedFlow.key, pausedFlow.value.resultFuture, it)
|
||||
flowId = id,
|
||||
user = flow.fiber.transientState.checkpoint.checkpointState.invocationContext.principal(),
|
||||
flowStateMachineFuture = doneFuture(flow.fiber)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
val finishedFlowsResults = checkpointStorage.getFinishedFlowsResultsMetadata().toList()
|
||||
for ((id, finishedFlowResult) in finishedFlowsResults) {
|
||||
finishedFlowResult.clientId?.let {
|
||||
if (finishedFlowResult.status == Checkpoint.FlowStatus.COMPLETED) {
|
||||
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Removed(id, true)
|
||||
} else {
|
||||
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Removed(id, false)
|
||||
}
|
||||
for ((id, pausedFlow) in pausedFlows) {
|
||||
pausedFlow.checkpoint.checkpointState.invocationContext.clientId?.let { clientId ->
|
||||
innerState.clientIdsToFlowIds[clientId] = FlowWithClientIdStatus.Active(
|
||||
flowId = id,
|
||||
user = pausedFlow.checkpoint.checkpointState.invocationContext.principal(),
|
||||
flowStateMachineFuture = doneClientIdFuture(id, pausedFlow.resultFuture, clientId)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
val finishedFlows = checkpointStorage.getFinishedFlowsResultsMetadata().toList()
|
||||
for ((id, finishedFlow) in finishedFlows) {
|
||||
finishedFlow.clientId?.let {
|
||||
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Removed(
|
||||
flowId = id,
|
||||
user = finishedFlow.user,
|
||||
succeeded = finishedFlow.status == Checkpoint.FlowStatus.COMPLETED
|
||||
)
|
||||
} ?: logger.error("Found finished flow $id without a client id. Something is very wrong and this flow will be ignored.")
|
||||
}
|
||||
|
||||
return {
|
||||
logger.info("Node ready, info: ${serviceHub.myInfo}")
|
||||
resumeRestoredFlows(fibers)
|
||||
resumeRestoredFlows(flows)
|
||||
flowMessaging.start { _, deduplicationHandler ->
|
||||
executor.execute {
|
||||
deliverExternalEvent(deduplicationHandler.externalCause)
|
||||
@ -288,7 +295,7 @@ internal class SingleThreadedStateMachineManager(
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("ComplexMethod")
|
||||
@Suppress("ComplexMethod", "NestedBlockDepth")
|
||||
private fun <A> startFlow(
|
||||
flowId: StateMachineRunId,
|
||||
flowLogic: FlowLogic<A>,
|
||||
@ -310,7 +317,7 @@ internal class SingleThreadedStateMachineManager(
|
||||
status
|
||||
} else {
|
||||
newFuture = openFuture()
|
||||
FlowWithClientIdStatus.Active(flowId, newFuture!!)
|
||||
FlowWithClientIdStatus.Active(flowId, context.principal(), newFuture!!)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -320,6 +327,13 @@ internal class SingleThreadedStateMachineManager(
|
||||
// If the flow ID is the same as the one recorded in the client ID map,
|
||||
// then this start flow event has been retried, and we should not de-duplicate.
|
||||
if (flowId != it.flowId) {
|
||||
// If the user that started the original flow is not the same as the user making the current request,
|
||||
// return an exception as they are not permitted to see the result of the flow
|
||||
if (!it.isPermitted(context.principal())) {
|
||||
return@startFlow openFuture<FlowStateMachineHandle<A>>().apply {
|
||||
setException(PermissionException("A flow using this client id [$clientId] has already been started by another user"))
|
||||
}
|
||||
}
|
||||
val existingFuture = activeOrRemovedClientIdFuture(it, clientId)
|
||||
return@startFlow uncheckedCast(existingFuture)
|
||||
}
|
||||
@ -1067,8 +1081,9 @@ internal class SingleThreadedStateMachineManager(
|
||||
succeeded: Boolean
|
||||
) {
|
||||
clientIdsToFlowIds.compute(clientId) { _, existingStatus ->
|
||||
require(existingStatus != null && existingStatus is FlowWithClientIdStatus.Active)
|
||||
FlowWithClientIdStatus.Removed(id, succeeded)
|
||||
val status = requireNotNull(existingStatus)
|
||||
require(existingStatus is FlowWithClientIdStatus.Active)
|
||||
FlowWithClientIdStatus.Removed(flowId = id, user = status.user, succeeded = succeeded)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1106,11 +1121,15 @@ internal class SingleThreadedStateMachineManager(
|
||||
}
|
||||
)
|
||||
|
||||
override fun <T> reattachFlowWithClientId(clientId: String): FlowStateMachineHandle<T>? {
|
||||
override fun <T> reattachFlowWithClientId(clientId: String, user: Principal): FlowStateMachineHandle<T>? {
|
||||
return innerState.withLock {
|
||||
clientIdsToFlowIds[clientId]?.let {
|
||||
val existingFuture = activeOrRemovedClientIdFutureForReattach(it, clientId)
|
||||
existingFuture?.let { uncheckedCast(existingFuture.get()) }
|
||||
if (!it.isPermitted(user)) {
|
||||
null
|
||||
} else {
|
||||
val existingFuture = activeOrRemovedClientIdFutureForReattach(it, clientId)
|
||||
uncheckedCast(existingFuture?.let {existingFuture.get() })
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1147,11 +1166,11 @@ internal class SingleThreadedStateMachineManager(
|
||||
}
|
||||
}
|
||||
|
||||
override fun removeClientId(clientId: String): Boolean {
|
||||
override fun removeClientId(clientId: String, user: Principal, isAdmin: Boolean): Boolean {
|
||||
var removedFlowId: StateMachineRunId? = null
|
||||
innerState.withLock {
|
||||
clientIdsToFlowIds.computeIfPresent(clientId) { _, existingStatus ->
|
||||
if (existingStatus is FlowWithClientIdStatus.Removed) {
|
||||
if (existingStatus is FlowWithClientIdStatus.Removed && (existingStatus.isPermitted(user) || isAdmin)) {
|
||||
removedFlowId = existingStatus.flowId
|
||||
null
|
||||
} else {
|
||||
@ -1166,9 +1185,10 @@ internal class SingleThreadedStateMachineManager(
|
||||
return false
|
||||
}
|
||||
|
||||
override fun finishedFlowsWithClientIds(): Map<String, Boolean> {
|
||||
override fun finishedFlowsWithClientIds(user: Principal, isAdmin: Boolean): Map<String, Boolean> {
|
||||
return innerState.withLock {
|
||||
clientIdsToFlowIds.asSequence()
|
||||
.filter { (_, status) -> status.isPermitted(user) || isAdmin }
|
||||
.filter { (_, status) -> status is FlowWithClientIdStatus.Removed }
|
||||
.map { (clientId, status) -> clientId to (status as FlowWithClientIdStatus.Removed).succeeded }
|
||||
.toMap()
|
||||
|
@ -12,6 +12,7 @@ import net.corda.core.utilities.Try
|
||||
import net.corda.node.services.messaging.DeduplicationHandler
|
||||
import net.corda.node.services.messaging.ReceivedMessage
|
||||
import rx.Observable
|
||||
import java.security.Principal
|
||||
|
||||
/**
|
||||
* A StateMachineManager is responsible for coordination and persistence of multiple [FlowStateMachine] objects.
|
||||
@ -112,14 +113,14 @@ interface StateMachineManager {
|
||||
*
|
||||
* @param clientId The client id relating to an existing flow
|
||||
*/
|
||||
fun <T> reattachFlowWithClientId(clientId: String): FlowStateMachineHandle<T>?
|
||||
fun <T> reattachFlowWithClientId(clientId: String, user: Principal): FlowStateMachineHandle<T>?
|
||||
|
||||
/**
|
||||
* Removes a flow's [clientId] to result/ exception mapping.
|
||||
*
|
||||
* @return whether the mapping was removed.
|
||||
*/
|
||||
fun removeClientId(clientId: String): Boolean
|
||||
fun removeClientId(clientId: String, user: Principal, isAdmin: Boolean): Boolean
|
||||
|
||||
/**
|
||||
* Returns all finished flows that were started with a client id.
|
||||
@ -127,7 +128,7 @@ interface StateMachineManager {
|
||||
* @return A [Map] containing client ids for finished flows, mapped to [true] if finished successfully,
|
||||
* [false] if completed exceptionally.
|
||||
*/
|
||||
fun finishedFlowsWithClientIds(): Map<String, Boolean>
|
||||
fun finishedFlowsWithClientIds(user: Principal, isAdmin: Boolean): Map<String, Boolean>
|
||||
}
|
||||
|
||||
// These must be idempotent! A later failure in the state transition may error the flow state, and a replay may call
|
||||
|
@ -23,6 +23,7 @@ import net.corda.core.serialization.internal.checkpointDeserialize
|
||||
import net.corda.core.utilities.Try
|
||||
import net.corda.node.services.messaging.DeduplicationHandler
|
||||
import java.lang.IllegalStateException
|
||||
import java.security.Principal
|
||||
import java.time.Instant
|
||||
import java.util.concurrent.Future
|
||||
import java.util.concurrent.Semaphore
|
||||
@ -424,16 +425,21 @@ sealed class SubFlowVersion {
|
||||
data class CorDappFlow(override val platformVersion: Int, val corDappName: String, val corDappHash: SecureHash) : SubFlowVersion()
|
||||
}
|
||||
|
||||
sealed class FlowWithClientIdStatus(val flowId: StateMachineRunId) {
|
||||
sealed class FlowWithClientIdStatus(val flowId: StateMachineRunId, val user: Principal) {
|
||||
|
||||
fun isPermitted(user: Principal): Boolean = user.name == this.user.name
|
||||
|
||||
class Active(
|
||||
flowId: StateMachineRunId,
|
||||
user: Principal,
|
||||
val flowStateMachineFuture: CordaFuture<out FlowStateMachineHandle<out Any?>>
|
||||
) : FlowWithClientIdStatus(flowId)
|
||||
) : FlowWithClientIdStatus(flowId, user)
|
||||
|
||||
class Removed(flowId: StateMachineRunId, val succeeded: Boolean) : FlowWithClientIdStatus(flowId)
|
||||
class Removed(flowId: StateMachineRunId, user: Principal, val succeeded: Boolean) : FlowWithClientIdStatus(flowId, user)
|
||||
}
|
||||
|
||||
data class FlowResultMetadata(
|
||||
val status: Checkpoint.FlowStatus,
|
||||
val clientId: String?
|
||||
val clientId: String?,
|
||||
val user: Principal
|
||||
)
|
@ -21,6 +21,7 @@ import net.corda.testing.node.internal.FINANCE_CONTRACTS_CORDAPP
|
||||
import net.corda.testing.node.internal.InternalMockNetwork
|
||||
import net.corda.testing.node.internal.InternalMockNodeParameters
|
||||
import net.corda.testing.node.internal.TestStartedNode
|
||||
import net.corda.testing.node.internal.newContext
|
||||
import net.corda.testing.node.internal.startFlow
|
||||
import net.corda.testing.node.internal.startFlowWithClientId
|
||||
import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
||||
@ -340,7 +341,7 @@ class FlowClientIdTests {
|
||||
ResultFlow.hook = { counter++ }
|
||||
val flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
|
||||
flowHandle0.resultFuture.getOrThrow(20.seconds)
|
||||
val removed = aliceNode.smm.removeClientId(clientId)
|
||||
val removed = aliceNode.smm.removeClientId(clientId, aliceNode.user, false)
|
||||
// On new request with clientId, after the same clientId was removed, a brand new flow will start with that clientId
|
||||
val flowHandle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
|
||||
flowHandle1.resultFuture.getOrThrow(20.seconds)
|
||||
@ -363,7 +364,7 @@ class FlowClientIdTests {
|
||||
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
|
||||
}
|
||||
|
||||
aliceNode.smm.removeClientId(clientId)
|
||||
aliceNode.smm.removeClientId(clientId, aliceNode.user, false)
|
||||
|
||||
// assert database status after remove
|
||||
aliceNode.services.database.transaction {
|
||||
@ -374,7 +375,7 @@ class FlowClientIdTests {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
@Test(timeout = 300_000)
|
||||
fun `removing a client id exception clears resources properly`() {
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
ResultFlow.hook = { throw IllegalStateException() }
|
||||
@ -389,7 +390,7 @@ class FlowClientIdTests {
|
||||
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
|
||||
}
|
||||
|
||||
aliceNode.smm.removeClientId(clientId)
|
||||
aliceNode.smm.removeClientId(clientId, aliceNode.user, false)
|
||||
|
||||
// assert database status after remove
|
||||
aliceNode.services.database.transaction {
|
||||
@ -400,7 +401,7 @@ class FlowClientIdTests {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
@Test(timeout = 300_000)
|
||||
fun `flow's client id mapping can only get removed once the flow gets removed`() {
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
var tries = 0
|
||||
@ -417,7 +418,7 @@ class FlowClientIdTests {
|
||||
|
||||
var removed = false
|
||||
while (!removed) {
|
||||
removed = aliceNode.smm.removeClientId(clientId)
|
||||
removed = aliceNode.smm.removeClientId(clientId, aliceNode.user, false)
|
||||
if (!removed) ++failedRemovals
|
||||
++tries
|
||||
if (tries >= maxTries) {
|
||||
@ -636,7 +637,7 @@ class FlowClientIdTests {
|
||||
assertEquals("Flow's ${flowHandle0!!.id} exception was not found in the database. Something is very wrong.", e.message)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
@Test(timeout = 300_000)
|
||||
fun `completed flow started with a client id nulls its flow state in database after its lifetime`() {
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
|
||||
@ -648,7 +649,7 @@ class FlowClientIdTests {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
@Test(timeout = 300_000)
|
||||
fun `failed flow started with a client id nulls its flow state in database after its lifetime`() {
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
ResultFlow.hook = { throw IllegalStateException() }
|
||||
@ -664,11 +665,12 @@ class FlowClientIdTests {
|
||||
assertNull(dbFlowCheckpoint!!.blob!!.flowStack)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `reattachFlowWithClientId can retrieve existing flow future`() {
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10))
|
||||
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId)
|
||||
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId, aliceNode.user)
|
||||
|
||||
assertEquals(10, flowHandle.resultFuture.getOrThrow(20.seconds))
|
||||
assertEquals(clientId, flowHandle.clientId)
|
||||
@ -680,7 +682,7 @@ class FlowClientIdTests {
|
||||
fun `reattachFlowWithClientId can retrieve a null result from a flow future`() {
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(null))
|
||||
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId)
|
||||
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId, aliceNode.user)
|
||||
|
||||
assertEquals(null, flowHandle.resultFuture.getOrThrow(20.seconds))
|
||||
assertEquals(clientId, flowHandle.clientId)
|
||||
@ -696,7 +698,7 @@ class FlowClientIdTests {
|
||||
assertEquals(10, flowHandle.resultFuture.getOrThrow(20.seconds))
|
||||
assertEquals(clientId, flowHandle.clientId)
|
||||
|
||||
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId)
|
||||
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId, aliceNode.user)
|
||||
|
||||
assertEquals(flowHandle.id, reattachedFlowHandle?.id)
|
||||
assertEquals(flowHandle.resultFuture.get(), reattachedFlowHandle?.resultFuture?.get())
|
||||
@ -704,7 +706,7 @@ class FlowClientIdTests {
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `reattachFlowWithClientId returns null if no flow matches the client id`() {
|
||||
assertEquals(null, aliceNode.smm.reattachFlowWithClientId<Int>(UUID.randomUUID().toString()))
|
||||
assertEquals(null, aliceNode.smm.reattachFlowWithClientId<Int>(UUID.randomUUID().toString(), aliceNode.user))
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
@ -712,7 +714,7 @@ class FlowClientIdTests {
|
||||
ResultFlow.hook = { throw IllegalStateException("Bla bla bla") }
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10))
|
||||
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId)
|
||||
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId, aliceNode.user)
|
||||
|
||||
assertThatExceptionOfType(IllegalStateException::class.java).isThrownBy {
|
||||
flowHandle.resultFuture.getOrThrow(20.seconds)
|
||||
@ -733,7 +735,7 @@ class FlowClientIdTests {
|
||||
flowHandle.resultFuture.getOrThrow(20.seconds)
|
||||
}.withMessage("Bla bla bla")
|
||||
|
||||
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId)
|
||||
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId, aliceNode.user)
|
||||
|
||||
// [CordaRunTimeException] returned because [IllegalStateException] is not serializable
|
||||
assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy {
|
||||
@ -753,7 +755,7 @@ class FlowClientIdTests {
|
||||
}
|
||||
|
||||
assertFailsWith<KilledFlowException> {
|
||||
aliceNode.smm.reattachFlowWithClientId<Int>(clientId)?.resultFuture?.getOrThrow()
|
||||
aliceNode.smm.reattachFlowWithClientId<Int>(clientId, aliceNode.user)?.resultFuture?.getOrThrow()
|
||||
}
|
||||
}
|
||||
|
||||
@ -779,7 +781,7 @@ class FlowClientIdTests {
|
||||
flows.map { it.resultFuture }.transpose().getOrThrow(30.seconds)
|
||||
assertFailsWith<java.lang.IllegalStateException> { failedFlow.resultFuture.getOrThrow(20.seconds) }
|
||||
|
||||
val finishedFlows = aliceNode.smm.finishedFlowsWithClientIds()
|
||||
val finishedFlows = aliceNode.smm.finishedFlowsWithClientIds(aliceNode.user, false)
|
||||
|
||||
lock.countDown()
|
||||
|
||||
@ -791,11 +793,13 @@ class FlowClientIdTests {
|
||||
|
||||
assertEquals(
|
||||
listOf(10, 10, 10),
|
||||
finishedFlows.filterValues { it }.map { aliceNode.smm.reattachFlowWithClientId<Int>(it.key)?.resultFuture?.get() }
|
||||
finishedFlows.filterValues { it }
|
||||
.map { aliceNode.smm.reattachFlowWithClientId<Int>(it.key, aliceNode.user)?.resultFuture?.get() }
|
||||
)
|
||||
// [CordaRunTimeException] returned because [IllegalStateException] is not serializable
|
||||
assertFailsWith<CordaRuntimeException> {
|
||||
finishedFlows.filterValues { !it }.map { aliceNode.smm.reattachFlowWithClientId<Int>(it.key)?.resultFuture?.getOrThrow() }
|
||||
finishedFlows.filterValues { !it }
|
||||
.map { aliceNode.smm.reattachFlowWithClientId<Int>(it.key, aliceNode.user)?.resultFuture?.getOrThrow() }
|
||||
}
|
||||
}
|
||||
|
||||
@ -810,13 +814,16 @@ class FlowClientIdTests {
|
||||
flowHandle0.resultFuture.getOrThrow()
|
||||
}
|
||||
|
||||
val finishedFlows = aliceNode.smm.finishedFlowsWithClientIds()
|
||||
val finishedFlows = aliceNode.smm.finishedFlowsWithClientIds(aliceNode.user, false)
|
||||
|
||||
assertFailsWith<KilledFlowException> {
|
||||
finishedFlows.keys.single().let { aliceNode.smm.reattachFlowWithClientId<Int>(it)?.resultFuture?.getOrThrow() }
|
||||
finishedFlows.keys.single()
|
||||
.let { aliceNode.smm.reattachFlowWithClientId<Int>(it, aliceNode.user)?.resultFuture?.getOrThrow() }
|
||||
}
|
||||
}
|
||||
|
||||
private val TestStartedNode.user get() = services.newContext().principal()
|
||||
|
||||
private fun TestStartedNode.hasStatus(id: StateMachineRunId, status: Checkpoint.FlowStatus): Boolean {
|
||||
return services.database.transaction {
|
||||
services.jdbcSession().prepareStatement("select count(*) from node_checkpoints where status = ? and flow_id = ?")
|
||||
@ -870,7 +877,7 @@ class FlowClientIdTests {
|
||||
}
|
||||
}
|
||||
|
||||
internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {
|
||||
internal class ResultFlow<A>(private val result: A) : FlowLogic<A>() {
|
||||
companion object {
|
||||
var hook: ((String?) -> Unit)? = null
|
||||
var suspendableHook: FlowLogic<Unit>? = null
|
||||
@ -884,7 +891,7 @@ class FlowClientIdTests {
|
||||
}
|
||||
}
|
||||
|
||||
internal class UnSerializableResultFlow: FlowLogic<Any>() {
|
||||
internal class UnSerializableResultFlow : FlowLogic<Any>() {
|
||||
companion object {
|
||||
var firstRun = true
|
||||
}
|
||||
@ -901,7 +908,7 @@ class FlowClientIdTests {
|
||||
}
|
||||
}
|
||||
|
||||
internal class HospitalizeFlow: FlowLogic<Unit>() {
|
||||
internal class HospitalizeFlow : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
|
Loading…
Reference in New Issue
Block a user