From 3f31aeaa5ff9e3646223c5bbd88bbff638178402 Mon Sep 17 00:00:00 2001 From: Dan Newton Date: Thu, 6 Aug 2020 11:42:02 +0100 Subject: [PATCH] CORDA-3822 Add `CordaRPCOps.reattachFlowWithClientId` (#6579) Add `CordaRPCOps.reattachFlowWithClientId` to allow clients to reattach to an existing flow by only providing a client id. This behaviour is the same as calling `startFlowDynamicWithClientId` for an existing `clientId`. Where it differs is `reattachFlowWithClientId` will return `null` if there is no flow running or finished on the node with the same client id. Return `null` if record deleted from race-condition --- .../net/corda/core/messaging/CordaRPCOps.kt | 34 ++++- .../corda/node/flows/FlowWithClientIdTest.kt | 37 +++++- .../corda/node/internal/CordaRPCOpsImpl.kt | 17 ++- .../SingleThreadedStateMachineManager.kt | 43 +++++- .../statemachine/StateMachineManager.kt | 15 +++ .../statemachine/FlowClientIdTests.kt | 123 ++++++++++++++---- 6 files changed, 235 insertions(+), 34 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index 0d1a586c33..826f52f0d9 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -265,11 +265,20 @@ interface CordaRPCOps : RPCOps { fun startFlowDynamic(logicType: Class>, vararg args: Any?): FlowHandle /** - * Start the given flow with the given arguments and a [clientId]. [logicType] must be annotated - * with [net.corda.core.flows.StartableByRPC]. The flow's result/ exception will be available for the client to - * re-connect and retrieve them even after the flow's lifetime, by re-calling [startFlowDynamicWithClientId] with the same - * [clientId]. Upon calling [removeClientId], the node's resources holding the result/ exception will be freed - * and the result/ exception will no longer be available. + * Start the given flow with the given arguments and a [clientId]. + * + * The flow's result/ exception will be available for the client to re-connect and retrieve even after the flow's lifetime, + * by re-calling [startFlowDynamicWithClientId] with the same [clientId]. The [logicType] and [args] will be ignored if the + * [clientId] matches an existing flow. If you don't have the original values, consider using [reattachFlowWithClientId]. + * + * Upon calling [removeClientId], the node's resources holding the result/ exception will be freed and the result/ exception will + * no longer be available. + * + * [logicType] must be annotated with [net.corda.core.flows.StartableByRPC]. + * + * @param clientId The client id to relate the flow to (or is already related to if the flow already exists) + * @param logicType The [FlowLogic] to start + * @param args The arguments to pass to the flow */ @RPCReturnsObservables fun startFlowDynamicWithClientId(clientId: String, logicType: Class>, vararg args: Any?): FlowHandleWithClientId @@ -288,6 +297,21 @@ interface CordaRPCOps : RPCOps { */ fun killFlow(id: StateMachineRunId): Boolean + /** + * Reattach to an existing flow that was started with [startFlowDynamicWithClientId] and has a [clientId]. + * + * If there is a flow matching the [clientId] then its result or exception is returned. + * + * When there is no flow matching the [clientId] then [null] is returned directly (not a future/[FlowHandleWithClientId]). + * + * Calling [reattachFlowWithClientId] after [removeClientId] with the same [clientId] will cause the function to return [null] as + * the result/exception of the flow will no longer be available. + * + * @param clientId The client id relating to an existing flow + */ + @RPCReturnsObservables + fun reattachFlowWithClientId(clientId: String): FlowHandleWithClientId? + /** * Removes a flow's [clientId] to result/ exception mapping. If the mapping is of a running flow, then the mapping will not get removed. * diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt index d182a3f9ec..ec1bff03e5 100644 --- a/node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt @@ -3,15 +3,16 @@ package net.corda.node.flows import co.paralleluniverse.fibers.Suspendable import net.corda.core.CordaRuntimeException import net.corda.core.flows.FlowLogic +import net.corda.core.flows.ResultSerializationException import net.corda.core.flows.StartableByRPC import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.startFlowWithClientId -import net.corda.core.flows.ResultSerializationException import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.driver +import org.assertj.core.api.Assertions import org.junit.Before import org.junit.Test import rx.Observable @@ -105,6 +106,40 @@ class FlowWithClientIdTest { assertEquals(UnserializableException::class.java.name, e1.originalExceptionClassName) } } + + @Test(timeout=300_000) + fun `reattachFlowWithClientId can retrieve results from existing flow future`() { + val clientId = UUID.randomUUID().toString() + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { + val nodeA = startNode().getOrThrow() + val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5) + val reattachedFlowHandle = nodeA.rpc.reattachFlowWithClientId(clientId) + assertEquals(5, flowHandle.returnValue.getOrThrow(20.seconds)) + assertEquals(clientId, flowHandle.clientId) + assertEquals(flowHandle.id, reattachedFlowHandle?.id) + assertEquals(flowHandle.returnValue.get(), reattachedFlowHandle?.returnValue?.get()) + } + } + + @Test(timeout = 300_000) + fun `reattachFlowWithClientId can retrieve exception from existing flow future`() { + ResultFlow.hook = { throw IllegalStateException("Bla bla bla") } + val clientId = UUID.randomUUID().toString() + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { + val nodeA = startNode().getOrThrow() + val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5) + val reattachedFlowHandle = nodeA.rpc.reattachFlowWithClientId(clientId) + + // [CordaRunTimeException] returned because [IllegalStateException] is not serializable + Assertions.assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy { + flowHandle.returnValue.getOrThrow(20.seconds) + }.withMessage("java.lang.IllegalStateException: Bla bla bla") + + Assertions.assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy { + reattachedFlowHandle?.returnValue?.getOrThrow() + }.withMessage("java.lang.IllegalStateException: Bla bla bla") + } + } } @StartableByRPC diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index 66493ac033..1a758a4050 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -172,6 +172,12 @@ internal class CordaRPCOpsImpl( override fun killFlow(id: StateMachineRunId): Boolean = smm.killFlow(id) + override fun reattachFlowWithClientId(clientId: String): FlowHandleWithClientId? { + return smm.reattachFlowWithClientId(clientId)?.run { + FlowHandleWithClientIdImpl(id = id, returnValue = resultFuture, clientId = clientId) + } + } + override fun removeClientId(clientId: String): Boolean = smm.removeClientId(clientId) override fun stateMachinesFeed(): DataFeed, StateMachineUpdate> { @@ -255,9 +261,14 @@ internal class CordaRPCOpsImpl( return FlowHandleImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture) } - override fun startFlowDynamicWithClientId(clientId: String, logicType: Class>, vararg args: Any?): FlowHandleWithClientId { - val stateMachine = startFlow(logicType, context().withClientId(clientId), args) - return FlowHandleWithClientIdImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture, clientId = stateMachine.clientId!!) + override fun startFlowDynamicWithClientId( + clientId: String, + logicType: Class>, + vararg args: Any? + ): FlowHandleWithClientId { + return startFlow(logicType, context().withClientId(clientId), args).run { + FlowHandleWithClientIdImpl(id = id, returnValue = resultFuture, clientId = clientId) + } } @Suppress("SpreadOperator") diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index ae53f5a191..88d1f08330 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -926,7 +926,7 @@ internal class SingleThreadedStateMachineManager( id: StateMachineRunId, resultFuture: CordaFuture, clientId: String - ): CordaFuture> = + ): CordaFuture> = doneFuture(object : FlowStateMachineHandle { override val logic: Nothing? = null override val id: StateMachineRunId = id @@ -935,6 +935,47 @@ internal class SingleThreadedStateMachineManager( } ) + override fun reattachFlowWithClientId(clientId: String): FlowStateMachineHandle? { + return innerState.withLock { + clientIdsToFlowIds[clientId]?.let { + val existingFuture = activeOrRemovedClientIdFutureForReattach(it, clientId) + existingFuture?.let { uncheckedCast(existingFuture.get()) } + } + } + } + + @Suppress("NestedBlockDepth") + private fun activeOrRemovedClientIdFutureForReattach( + existingStatus: FlowWithClientIdStatus, + clientId: String + ): CordaFuture>? { + return when (existingStatus) { + is FlowWithClientIdStatus.Active -> existingStatus.flowStateMachineFuture + is FlowWithClientIdStatus.Removed -> { + val flowId = existingStatus.flowId + val resultFuture = if (existingStatus.succeeded) { + try { + val flowResult = + database.transaction { checkpointStorage.getFlowResult(existingStatus.flowId, throwIfMissing = true) } + doneFuture(flowResult) + } catch (e: IllegalStateException) { + null + } + } else { + try { + val flowException = + database.transaction { checkpointStorage.getFlowException(existingStatus.flowId, throwIfMissing = true) } + openFuture().apply { setException(flowException as Throwable) } + } catch (e: IllegalStateException) { + null + } + } + + resultFuture?.let { doneClientIdFuture(flowId, it, clientId) } + } + } + } + override fun removeClientId(clientId: String): Boolean { var removedFlowId: StateMachineRunId? = null innerState.withLock { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index ee484ff77a..401a8e19dc 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -7,6 +7,7 @@ import net.corda.core.flows.StateMachineRunId import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.FlowStateMachineHandle import net.corda.core.messaging.DataFeed +import net.corda.core.messaging.FlowHandleWithClientId import net.corda.core.utilities.Try import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.messaging.ReceivedMessage @@ -99,6 +100,20 @@ interface StateMachineManager { */ fun snapshot(): Set> + /** + * Reattach to an existing flow that was started with [startFlowDynamicWithClientId] and has a [clientId]. + * + * If there is a flow matching the [clientId] then its result or exception is returned. + * + * When there is no flow matching the [clientId] then [null] is returned directly (not a future/[FlowHandleWithClientId]). + * + * Calling [reattachFlowWithClientId] after [removeClientId] with the same [clientId] will cause the function to return [null] as + * the result/exception of the flow will no longer be available. + * + * @param clientId The client id relating to an existing flow + */ + fun reattachFlowWithClientId(clientId: String): FlowStateMachineHandle? + /** * Removes a flow's [clientId] to result/ exception mapping. * diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt index 712fb48473..c945b55aa4 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt @@ -19,16 +19,17 @@ import net.corda.testing.node.internal.TestStartedNode import net.corda.testing.node.internal.startFlow import net.corda.testing.node.internal.startFlowWithClientId import net.corda.core.flows.KilledFlowException +import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.junit.After import org.junit.Assert import org.junit.Before import org.junit.Test import rx.Observable import java.lang.IllegalArgumentException -import java.lang.IllegalStateException import java.sql.SQLTransientConnectionException import java.util.UUID import java.util.concurrent.atomic.AtomicInteger +import kotlin.IllegalStateException import kotlin.concurrent.thread import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -49,7 +50,6 @@ class FlowClientIdTests { ) aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME)) - } @After @@ -66,7 +66,7 @@ class FlowClientIdTests { StaffedFlowHospital.onFlowErrorPropagated.clear() } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `no new flow starts if the client id provided pre exists`() { var counter = 0 ResultFlow.hook = { counter++ } @@ -76,7 +76,7 @@ class FlowClientIdTests { Assert.assertEquals(1, counter) } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `flow's result gets persisted if the flow is started with a client id`() { val clientId = UUID.randomUUID().toString() aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10)).resultFuture.getOrThrow() @@ -86,7 +86,7 @@ class FlowClientIdTests { } } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `flow's result is retrievable after flow's lifetime, when flow is started with a client id - different parameters are ignored`() { val clientId = UUID.randomUUID().toString() val handle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) @@ -104,7 +104,7 @@ class FlowClientIdTests { Assert.assertEquals(result0, result1) } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `if flow's result is not found in the database an IllegalStateException is thrown`() { val clientId = UUID.randomUUID().toString() val handle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) @@ -121,7 +121,7 @@ class FlowClientIdTests { } } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `flow returning null gets retrieved after flow's lifetime when started with client id`() { val clientId = UUID.randomUUID().toString() aliceNode.services.startFlowWithClientId(clientId, ResultFlow(null)).resultFuture.getOrThrow() @@ -130,7 +130,7 @@ class FlowClientIdTests { assertNull(flowResult) } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `flow returning Unit gets retrieved after flow's lifetime when started with client id`() { val clientId = UUID.randomUUID().toString() aliceNode.services.startFlowWithClientId(clientId, ResultFlow(Unit)).resultFuture.getOrThrow() @@ -139,7 +139,7 @@ class FlowClientIdTests { assertEquals(Unit, flowResult) } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `flow's result is available if reconnect after flow had retried from previous checkpoint, when flow is started with a client id`() { var firstRun = true ResultFlow.hook = { @@ -155,7 +155,7 @@ class FlowClientIdTests { Assert.assertEquals(result0, result1) } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `flow's result is available if reconnect during flow's retrying from previous checkpoint, when flow is started with a client id`() { var firstRun = true val waitForSecondRequest = Semaphore(0) @@ -186,7 +186,7 @@ class FlowClientIdTests { Assert.assertEquals(result0, result1) } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `failing flow's exception is available after flow's lifetime if flow is started with a client id`() { var counter = 0 ResultFlow.hook = { @@ -212,7 +212,7 @@ class FlowClientIdTests { assertEquals(1, counter) } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `failed flow's exception is available after flow's lifetime on node start if flow was started with a client id`() { var counter = 0 ResultFlow.hook = { @@ -240,7 +240,7 @@ class FlowClientIdTests { assertEquals(1, counter) } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `killing a flow, removes the flow from the client id mapping`() { var counter = 0 val flowIsRunning = Semaphore(0) @@ -256,7 +256,6 @@ class FlowClientIdTests { waitUntilFlowIsRunning.release() flowIsRunning.acquire() } - } } val clientId = UUID.randomUUID().toString() @@ -278,7 +277,7 @@ class FlowClientIdTests { assertEquals(2, counter) } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `flow's client id mapping gets removed upon request`() { val clientId = UUID.randomUUID().toString() var counter = 0 @@ -296,7 +295,7 @@ class FlowClientIdTests { Assert.assertEquals(2, counter) } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `removing a client id result clears resources properly`() { val clientId = UUID.randomUUID().toString() aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow() @@ -375,7 +374,7 @@ class FlowClientIdTests { Assert.assertEquals(maxTries, failedRemovals) } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `only one flow starts upon concurrent requests with the same client id`() { val requests = 2 val counter = AtomicInteger(0) @@ -422,7 +421,7 @@ class FlowClientIdTests { Assert.assertEquals(10, resultsCounter.get()) } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `on node start -running- flows with client id are hook-able`() { val clientId = UUID.randomUUID().toString() var firstRun = true @@ -511,7 +510,7 @@ class FlowClientIdTests { // Assert.assertEquals(1, noSecondFlowWasSpawned) // } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `on node start -completed- flows with client id are hook-able`() { val clientId = UUID.randomUUID().toString() var counter = 0 @@ -533,7 +532,7 @@ class FlowClientIdTests { Assert.assertEquals(5, result1) } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `On 'startFlowInternal' throwing, subsequent request with same client id does not get de-duplicated and starts a new flow`() { val clientId = UUID.randomUUID().toString() var firstRequest = true @@ -595,7 +594,7 @@ class FlowClientIdTests { // assertEquals(0, counter) // } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `if flow fails to serialize its result then the result gets converted to an exception result`() { val clientId = UUID.randomUUID().toString() assertFailsWith { @@ -619,7 +618,7 @@ class FlowClientIdTests { * The below test does not follow a valid path. Normally it should error and propagate. * However, we want to assert that a flow that fails to serialize its result its retriable. */ - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `flow failing to serialize its result gets retried and succeeds if returning a different result`() { val clientId = UUID.randomUUID().toString() // before the hospital schedules a [Event.Error] we manually schedule a [Event.RetryFlowFromSafePoint] @@ -630,7 +629,7 @@ class FlowClientIdTests { assertEquals(5, result) } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `flow that fails does not retain its checkpoint nor its exception in the database if not started with a client id`() { assertFailsWith { aliceNode.services.startFlow(ExceptionFlow { IllegalStateException("another exception") }).resultFuture.getOrThrow() @@ -644,7 +643,7 @@ class FlowClientIdTests { } } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `subsequent request to failed flow that cannot find a 'DBFlowException' in the database, fails with 'IllegalStateException'`() { ResultFlow.hook = { // just throwing a different exception from the one expected out of startFlowWithClientId second call below ([IllegalStateException]) @@ -700,6 +699,82 @@ class FlowClientIdTests { assertNull(dbFlowCheckpoint!!.blob!!.flowStack) } } + @Test(timeout = 300_000) + fun `reattachFlowWithClientId can retrieve existing flow future`() { + val clientId = UUID.randomUUID().toString() + val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10)) + val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId(clientId) + + assertEquals(10, flowHandle.resultFuture.getOrThrow(20.seconds)) + assertEquals(clientId, flowHandle.clientId) + assertEquals(flowHandle.id, reattachedFlowHandle?.id) + assertEquals(flowHandle.resultFuture.get(), reattachedFlowHandle?.resultFuture?.get()) + } + + @Test(timeout = 300_000) + fun `reattachFlowWithClientId can retrieve a null result from a flow future`() { + val clientId = UUID.randomUUID().toString() + val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(null)) + val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId(clientId) + + assertEquals(null, flowHandle.resultFuture.getOrThrow(20.seconds)) + assertEquals(clientId, flowHandle.clientId) + assertEquals(flowHandle.id, reattachedFlowHandle?.id) + assertEquals(flowHandle.resultFuture.get(), reattachedFlowHandle?.resultFuture?.get()) + } + + @Test(timeout = 300_000) + fun `reattachFlowWithClientId can retrieve result from completed flow`() { + val clientId = UUID.randomUUID().toString() + val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10)) + + assertEquals(10, flowHandle.resultFuture.getOrThrow(20.seconds)) + assertEquals(clientId, flowHandle.clientId) + + val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId(clientId) + + assertEquals(flowHandle.id, reattachedFlowHandle?.id) + assertEquals(flowHandle.resultFuture.get(), reattachedFlowHandle?.resultFuture?.get()) + } + + @Test(timeout = 300_000) + fun `reattachFlowWithClientId returns null if no flow matches the client id`() { + assertEquals(null, aliceNode.smm.reattachFlowWithClientId(UUID.randomUUID().toString())) + } + + @Test(timeout = 300_000) + fun `reattachFlowWithClientId can retrieve exception from existing flow future`() { + ResultFlow.hook = { throw IllegalStateException("Bla bla bla") } + val clientId = UUID.randomUUID().toString() + val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10)) + val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId(clientId) + + assertThatExceptionOfType(IllegalStateException::class.java).isThrownBy { + flowHandle.resultFuture.getOrThrow(20.seconds) + }.withMessage("Bla bla bla") + + assertThatExceptionOfType(IllegalStateException::class.java).isThrownBy { + reattachedFlowHandle?.resultFuture?.getOrThrow() + }.withMessage("Bla bla bla") + } + + @Test(timeout = 300_000) + fun `reattachFlowWithClientId can retrieve exception from completed flow`() { + ResultFlow.hook = { throw IllegalStateException("Bla bla bla") } + val clientId = UUID.randomUUID().toString() + val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10)) + + assertThatExceptionOfType(IllegalStateException::class.java).isThrownBy { + flowHandle.resultFuture.getOrThrow(20.seconds) + }.withMessage("Bla bla bla") + + val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId(clientId) + + // [CordaRunTimeException] returned because [IllegalStateException] is not serializable + assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy { + reattachedFlowHandle?.resultFuture?.getOrThrow() + }.withMessage("java.lang.IllegalStateException: Bla bla bla") + } } internal class ResultFlow(private val result: A): FlowLogic() {