CORDA-3822 Add CordaRPCOps.reattachFlowWithClientId (#6579)

Add `CordaRPCOps.reattachFlowWithClientId` to allow clients to reattach
to an existing flow by only providing a client id. This behaviour is the
same as calling `startFlowDynamicWithClientId` for an existing
`clientId`. Where it differs is `reattachFlowWithClientId` will return
`null` if there is no flow running or finished on the node with the same
client id.

Return `null` if record deleted from race-condition
This commit is contained in:
Dan Newton 2020-08-06 11:42:02 +01:00 committed by GitHub
parent 5ba8477733
commit 3f31aeaa5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 235 additions and 34 deletions

View File

@ -265,11 +265,20 @@ interface CordaRPCOps : RPCOps {
fun <T> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T>
/**
* Start the given flow with the given arguments and a [clientId]. [logicType] must be annotated
* with [net.corda.core.flows.StartableByRPC]. The flow's result/ exception will be available for the client to
* re-connect and retrieve them even after the flow's lifetime, by re-calling [startFlowDynamicWithClientId] with the same
* [clientId]. Upon calling [removeClientId], the node's resources holding the result/ exception will be freed
* and the result/ exception will no longer be available.
* Start the given flow with the given arguments and a [clientId].
*
* The flow's result/ exception will be available for the client to re-connect and retrieve even after the flow's lifetime,
* by re-calling [startFlowDynamicWithClientId] with the same [clientId]. The [logicType] and [args] will be ignored if the
* [clientId] matches an existing flow. If you don't have the original values, consider using [reattachFlowWithClientId].
*
* Upon calling [removeClientId], the node's resources holding the result/ exception will be freed and the result/ exception will
* no longer be available.
*
* [logicType] must be annotated with [net.corda.core.flows.StartableByRPC].
*
* @param clientId The client id to relate the flow to (or is already related to if the flow already exists)
* @param logicType The [FlowLogic] to start
* @param args The arguments to pass to the flow
*/
@RPCReturnsObservables
fun <T> startFlowDynamicWithClientId(clientId: String, logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandleWithClientId<T>
@ -288,6 +297,21 @@ interface CordaRPCOps : RPCOps {
*/
fun killFlow(id: StateMachineRunId): Boolean
/**
* Reattach to an existing flow that was started with [startFlowDynamicWithClientId] and has a [clientId].
*
* If there is a flow matching the [clientId] then its result or exception is returned.
*
* When there is no flow matching the [clientId] then [null] is returned directly (not a future/[FlowHandleWithClientId]).
*
* Calling [reattachFlowWithClientId] after [removeClientId] with the same [clientId] will cause the function to return [null] as
* the result/exception of the flow will no longer be available.
*
* @param clientId The client id relating to an existing flow
*/
@RPCReturnsObservables
fun <T> reattachFlowWithClientId(clientId: String): FlowHandleWithClientId<T>?
/**
* Removes a flow's [clientId] to result/ exception mapping. If the mapping is of a running flow, then the mapping will not get removed.
*

View File

@ -3,15 +3,16 @@ package net.corda.node.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.CordaRuntimeException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.ResultSerializationException
import net.corda.core.flows.StartableByRPC
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.startFlowWithClientId
import net.corda.core.flows.ResultSerializationException
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import org.assertj.core.api.Assertions
import org.junit.Before
import org.junit.Test
import rx.Observable
@ -105,6 +106,40 @@ class FlowWithClientIdTest {
assertEquals(UnserializableException::class.java.name, e1.originalExceptionClassName)
}
}
@Test(timeout=300_000)
fun `reattachFlowWithClientId can retrieve results from existing flow future`() {
val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
val nodeA = startNode().getOrThrow()
val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5)
val reattachedFlowHandle = nodeA.rpc.reattachFlowWithClientId<Int>(clientId)
assertEquals(5, flowHandle.returnValue.getOrThrow(20.seconds))
assertEquals(clientId, flowHandle.clientId)
assertEquals(flowHandle.id, reattachedFlowHandle?.id)
assertEquals(flowHandle.returnValue.get(), reattachedFlowHandle?.returnValue?.get())
}
}
@Test(timeout = 300_000)
fun `reattachFlowWithClientId can retrieve exception from existing flow future`() {
ResultFlow.hook = { throw IllegalStateException("Bla bla bla") }
val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
val nodeA = startNode().getOrThrow()
val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5)
val reattachedFlowHandle = nodeA.rpc.reattachFlowWithClientId<Int>(clientId)
// [CordaRunTimeException] returned because [IllegalStateException] is not serializable
Assertions.assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy {
flowHandle.returnValue.getOrThrow(20.seconds)
}.withMessage("java.lang.IllegalStateException: Bla bla bla")
Assertions.assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy {
reattachedFlowHandle?.returnValue?.getOrThrow()
}.withMessage("java.lang.IllegalStateException: Bla bla bla")
}
}
}
@StartableByRPC

View File

@ -172,6 +172,12 @@ internal class CordaRPCOpsImpl(
override fun killFlow(id: StateMachineRunId): Boolean = smm.killFlow(id)
override fun <T> reattachFlowWithClientId(clientId: String): FlowHandleWithClientId<T>? {
return smm.reattachFlowWithClientId<T>(clientId)?.run {
FlowHandleWithClientIdImpl(id = id, returnValue = resultFuture, clientId = clientId)
}
}
override fun removeClientId(clientId: String): Boolean = smm.removeClientId(clientId)
override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
@ -255,9 +261,14 @@ internal class CordaRPCOpsImpl(
return FlowHandleImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture)
}
override fun <T> startFlowDynamicWithClientId(clientId: String, logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandleWithClientId<T> {
val stateMachine = startFlow(logicType, context().withClientId(clientId), args)
return FlowHandleWithClientIdImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture, clientId = stateMachine.clientId!!)
override fun <T> startFlowDynamicWithClientId(
clientId: String,
logicType: Class<out FlowLogic<T>>,
vararg args: Any?
): FlowHandleWithClientId<T> {
return startFlow(logicType, context().withClientId(clientId), args).run {
FlowHandleWithClientIdImpl(id = id, returnValue = resultFuture, clientId = clientId)
}
}
@Suppress("SpreadOperator")

View File

@ -926,7 +926,7 @@ internal class SingleThreadedStateMachineManager(
id: StateMachineRunId,
resultFuture: CordaFuture<Any?>,
clientId: String
): CordaFuture<FlowStateMachineHandle<Any?>> =
): CordaFuture<FlowStateMachineHandle<out Any?>> =
doneFuture(object : FlowStateMachineHandle<Any?> {
override val logic: Nothing? = null
override val id: StateMachineRunId = id
@ -935,6 +935,47 @@ internal class SingleThreadedStateMachineManager(
}
)
override fun <T> reattachFlowWithClientId(clientId: String): FlowStateMachineHandle<T>? {
return innerState.withLock {
clientIdsToFlowIds[clientId]?.let {
val existingFuture = activeOrRemovedClientIdFutureForReattach(it, clientId)
existingFuture?.let { uncheckedCast(existingFuture.get()) }
}
}
}
@Suppress("NestedBlockDepth")
private fun activeOrRemovedClientIdFutureForReattach(
existingStatus: FlowWithClientIdStatus,
clientId: String
): CordaFuture<out FlowStateMachineHandle<out Any?>>? {
return when (existingStatus) {
is FlowWithClientIdStatus.Active -> existingStatus.flowStateMachineFuture
is FlowWithClientIdStatus.Removed -> {
val flowId = existingStatus.flowId
val resultFuture = if (existingStatus.succeeded) {
try {
val flowResult =
database.transaction { checkpointStorage.getFlowResult(existingStatus.flowId, throwIfMissing = true) }
doneFuture(flowResult)
} catch (e: IllegalStateException) {
null
}
} else {
try {
val flowException =
database.transaction { checkpointStorage.getFlowException(existingStatus.flowId, throwIfMissing = true) }
openFuture<Any?>().apply { setException(flowException as Throwable) }
} catch (e: IllegalStateException) {
null
}
}
resultFuture?.let { doneClientIdFuture(flowId, it, clientId) }
}
}
}
override fun removeClientId(clientId: String): Boolean {
var removedFlowId: StateMachineRunId? = null
innerState.withLock {

View File

@ -7,6 +7,7 @@ import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.FlowStateMachineHandle
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowHandleWithClientId
import net.corda.core.utilities.Try
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.ReceivedMessage
@ -99,6 +100,20 @@ interface StateMachineManager {
*/
fun snapshot(): Set<FlowStateMachineImpl<*>>
/**
* Reattach to an existing flow that was started with [startFlowDynamicWithClientId] and has a [clientId].
*
* If there is a flow matching the [clientId] then its result or exception is returned.
*
* When there is no flow matching the [clientId] then [null] is returned directly (not a future/[FlowHandleWithClientId]).
*
* Calling [reattachFlowWithClientId] after [removeClientId] with the same [clientId] will cause the function to return [null] as
* the result/exception of the flow will no longer be available.
*
* @param clientId The client id relating to an existing flow
*/
fun <T> reattachFlowWithClientId(clientId: String): FlowStateMachineHandle<T>?
/**
* Removes a flow's [clientId] to result/ exception mapping.
*

View File

@ -19,16 +19,17 @@ import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.startFlow
import net.corda.testing.node.internal.startFlowWithClientId
import net.corda.core.flows.KilledFlowException
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
import org.junit.Assert
import org.junit.Before
import org.junit.Test
import rx.Observable
import java.lang.IllegalArgumentException
import java.lang.IllegalStateException
import java.sql.SQLTransientConnectionException
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import kotlin.IllegalStateException
import kotlin.concurrent.thread
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
@ -49,7 +50,6 @@ class FlowClientIdTests {
)
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
}
@After
@ -66,7 +66,7 @@ class FlowClientIdTests {
StaffedFlowHospital.onFlowErrorPropagated.clear()
}
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `no new flow starts if the client id provided pre exists`() {
var counter = 0
ResultFlow.hook = { counter++ }
@ -76,7 +76,7 @@ class FlowClientIdTests {
Assert.assertEquals(1, counter)
}
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `flow's result gets persisted if the flow is started with a client id`() {
val clientId = UUID.randomUUID().toString()
aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10)).resultFuture.getOrThrow()
@ -86,7 +86,7 @@ class FlowClientIdTests {
}
}
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `flow's result is retrievable after flow's lifetime, when flow is started with a client id - different parameters are ignored`() {
val clientId = UUID.randomUUID().toString()
val handle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
@ -104,7 +104,7 @@ class FlowClientIdTests {
Assert.assertEquals(result0, result1)
}
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `if flow's result is not found in the database an IllegalStateException is thrown`() {
val clientId = UUID.randomUUID().toString()
val handle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
@ -121,7 +121,7 @@ class FlowClientIdTests {
}
}
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `flow returning null gets retrieved after flow's lifetime when started with client id`() {
val clientId = UUID.randomUUID().toString()
aliceNode.services.startFlowWithClientId(clientId, ResultFlow(null)).resultFuture.getOrThrow()
@ -130,7 +130,7 @@ class FlowClientIdTests {
assertNull(flowResult)
}
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `flow returning Unit gets retrieved after flow's lifetime when started with client id`() {
val clientId = UUID.randomUUID().toString()
aliceNode.services.startFlowWithClientId(clientId, ResultFlow(Unit)).resultFuture.getOrThrow()
@ -139,7 +139,7 @@ class FlowClientIdTests {
assertEquals(Unit, flowResult)
}
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `flow's result is available if reconnect after flow had retried from previous checkpoint, when flow is started with a client id`() {
var firstRun = true
ResultFlow.hook = {
@ -155,7 +155,7 @@ class FlowClientIdTests {
Assert.assertEquals(result0, result1)
}
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `flow's result is available if reconnect during flow's retrying from previous checkpoint, when flow is started with a client id`() {
var firstRun = true
val waitForSecondRequest = Semaphore(0)
@ -186,7 +186,7 @@ class FlowClientIdTests {
Assert.assertEquals(result0, result1)
}
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `failing flow's exception is available after flow's lifetime if flow is started with a client id`() {
var counter = 0
ResultFlow.hook = {
@ -212,7 +212,7 @@ class FlowClientIdTests {
assertEquals(1, counter)
}
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `failed flow's exception is available after flow's lifetime on node start if flow was started with a client id`() {
var counter = 0
ResultFlow.hook = {
@ -240,7 +240,7 @@ class FlowClientIdTests {
assertEquals(1, counter)
}
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `killing a flow, removes the flow from the client id mapping`() {
var counter = 0
val flowIsRunning = Semaphore(0)
@ -256,7 +256,6 @@ class FlowClientIdTests {
waitUntilFlowIsRunning.release()
flowIsRunning.acquire()
}
}
}
val clientId = UUID.randomUUID().toString()
@ -278,7 +277,7 @@ class FlowClientIdTests {
assertEquals(2, counter)
}
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `flow's client id mapping gets removed upon request`() {
val clientId = UUID.randomUUID().toString()
var counter = 0
@ -296,7 +295,7 @@ class FlowClientIdTests {
Assert.assertEquals(2, counter)
}
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `removing a client id result clears resources properly`() {
val clientId = UUID.randomUUID().toString()
aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow()
@ -375,7 +374,7 @@ class FlowClientIdTests {
Assert.assertEquals(maxTries, failedRemovals)
}
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `only one flow starts upon concurrent requests with the same client id`() {
val requests = 2
val counter = AtomicInteger(0)
@ -422,7 +421,7 @@ class FlowClientIdTests {
Assert.assertEquals(10, resultsCounter.get())
}
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `on node start -running- flows with client id are hook-able`() {
val clientId = UUID.randomUUID().toString()
var firstRun = true
@ -511,7 +510,7 @@ class FlowClientIdTests {
// Assert.assertEquals(1, noSecondFlowWasSpawned)
// }
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `on node start -completed- flows with client id are hook-able`() {
val clientId = UUID.randomUUID().toString()
var counter = 0
@ -533,7 +532,7 @@ class FlowClientIdTests {
Assert.assertEquals(5, result1)
}
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `On 'startFlowInternal' throwing, subsequent request with same client id does not get de-duplicated and starts a new flow`() {
val clientId = UUID.randomUUID().toString()
var firstRequest = true
@ -595,7 +594,7 @@ class FlowClientIdTests {
// assertEquals(0, counter)
// }
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `if flow fails to serialize its result then the result gets converted to an exception result`() {
val clientId = UUID.randomUUID().toString()
assertFailsWith<CordaRuntimeException> {
@ -619,7 +618,7 @@ class FlowClientIdTests {
* The below test does not follow a valid path. Normally it should error and propagate.
* However, we want to assert that a flow that fails to serialize its result its retriable.
*/
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `flow failing to serialize its result gets retried and succeeds if returning a different result`() {
val clientId = UUID.randomUUID().toString()
// before the hospital schedules a [Event.Error] we manually schedule a [Event.RetryFlowFromSafePoint]
@ -630,7 +629,7 @@ class FlowClientIdTests {
assertEquals(5, result)
}
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `flow that fails does not retain its checkpoint nor its exception in the database if not started with a client id`() {
assertFailsWith<IllegalStateException> {
aliceNode.services.startFlow(ExceptionFlow { IllegalStateException("another exception") }).resultFuture.getOrThrow()
@ -644,7 +643,7 @@ class FlowClientIdTests {
}
}
@Test(timeout=300_000)
@Test(timeout = 300_000)
fun `subsequent request to failed flow that cannot find a 'DBFlowException' in the database, fails with 'IllegalStateException'`() {
ResultFlow.hook = {
// just throwing a different exception from the one expected out of startFlowWithClientId second call below ([IllegalStateException])
@ -700,6 +699,82 @@ class FlowClientIdTests {
assertNull(dbFlowCheckpoint!!.blob!!.flowStack)
}
}
@Test(timeout = 300_000)
fun `reattachFlowWithClientId can retrieve existing flow future`() {
val clientId = UUID.randomUUID().toString()
val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10))
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId)
assertEquals(10, flowHandle.resultFuture.getOrThrow(20.seconds))
assertEquals(clientId, flowHandle.clientId)
assertEquals(flowHandle.id, reattachedFlowHandle?.id)
assertEquals(flowHandle.resultFuture.get(), reattachedFlowHandle?.resultFuture?.get())
}
@Test(timeout = 300_000)
fun `reattachFlowWithClientId can retrieve a null result from a flow future`() {
val clientId = UUID.randomUUID().toString()
val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(null))
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId)
assertEquals(null, flowHandle.resultFuture.getOrThrow(20.seconds))
assertEquals(clientId, flowHandle.clientId)
assertEquals(flowHandle.id, reattachedFlowHandle?.id)
assertEquals(flowHandle.resultFuture.get(), reattachedFlowHandle?.resultFuture?.get())
}
@Test(timeout = 300_000)
fun `reattachFlowWithClientId can retrieve result from completed flow`() {
val clientId = UUID.randomUUID().toString()
val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10))
assertEquals(10, flowHandle.resultFuture.getOrThrow(20.seconds))
assertEquals(clientId, flowHandle.clientId)
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId)
assertEquals(flowHandle.id, reattachedFlowHandle?.id)
assertEquals(flowHandle.resultFuture.get(), reattachedFlowHandle?.resultFuture?.get())
}
@Test(timeout = 300_000)
fun `reattachFlowWithClientId returns null if no flow matches the client id`() {
assertEquals(null, aliceNode.smm.reattachFlowWithClientId<Int>(UUID.randomUUID().toString()))
}
@Test(timeout = 300_000)
fun `reattachFlowWithClientId can retrieve exception from existing flow future`() {
ResultFlow.hook = { throw IllegalStateException("Bla bla bla") }
val clientId = UUID.randomUUID().toString()
val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10))
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId)
assertThatExceptionOfType(IllegalStateException::class.java).isThrownBy {
flowHandle.resultFuture.getOrThrow(20.seconds)
}.withMessage("Bla bla bla")
assertThatExceptionOfType(IllegalStateException::class.java).isThrownBy {
reattachedFlowHandle?.resultFuture?.getOrThrow()
}.withMessage("Bla bla bla")
}
@Test(timeout = 300_000)
fun `reattachFlowWithClientId can retrieve exception from completed flow`() {
ResultFlow.hook = { throw IllegalStateException("Bla bla bla") }
val clientId = UUID.randomUUID().toString()
val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10))
assertThatExceptionOfType(IllegalStateException::class.java).isThrownBy {
flowHandle.resultFuture.getOrThrow(20.seconds)
}.withMessage("Bla bla bla")
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId)
// [CordaRunTimeException] returned because [IllegalStateException] is not serializable
assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy {
reattachedFlowHandle?.resultFuture?.getOrThrow()
}.withMessage("java.lang.IllegalStateException: Bla bla bla")
}
}
internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {