CORDA-3881 Get all finished flows with client ids (#6580)

Return map of `clientId` -> success/fail
This commit is contained in:
Dan Newton 2020-08-17 10:27:32 +01:00 committed by GitHub
parent f1b7bc9dcb
commit 854e6638ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 88 additions and 7 deletions

View File

@ -321,6 +321,14 @@ interface CordaRPCOps : RPCOps {
*/
fun removeClientId(clientId: String): Boolean
/**
* Returns all finished flows that were started with a client id.
*
* @return A [Map] containing client ids for finished flows, mapped to [true] if finished successfully,
* [false] if completed exceptionally.
*/
fun finishedFlowsWithClientIds(): Map<String, Boolean>
/** Returns Node's NodeInfo, assuming this will not change while the node is running. */
fun nodeInfo(): NodeInfo

View File

@ -140,6 +140,17 @@ class FlowWithClientIdTest {
}.withMessage("java.lang.IllegalStateException: Bla bla bla")
}
}
@Test(timeout=300_000)
fun `finishedFlowsWithClientIds returns completed flows with client ids`() {
val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
val nodeA = startNode().getOrThrow()
nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5).returnValue.getOrThrow(20.seconds)
val finishedFlows = nodeA.rpc.finishedFlowsWithClientIds()
assertEquals(true, finishedFlows[clientId])
}
}
}
@StartableByRPC

View File

@ -180,6 +180,8 @@ internal class CordaRPCOpsImpl(
override fun removeClientId(clientId: String): Boolean = smm.removeClientId(clientId)
override fun finishedFlowsWithClientIds(): Map<String, Boolean> = smm.finishedFlowsWithClientIds()
override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
val (allStateMachines, changes) = smm.track()

View File

@ -813,7 +813,7 @@ internal class SingleThreadedStateMachineManager(
val checkpoint = currentState.checkpoint.copy(status = Checkpoint.FlowStatus.PAUSED, flowState = FlowState.Paused)
val pausedFlow = NonResidentFlow(
id,
checkpoint,
checkpoint,
flow.resultFuture,
hospitalized = currentState.checkpoint.status == Checkpoint.FlowStatus.HOSPITALIZED,
progressTracker = currentState.flowLogic.progressTracker
@ -1105,6 +1105,15 @@ internal class SingleThreadedStateMachineManager(
return false
}
override fun finishedFlowsWithClientIds(): Map<String, Boolean> {
return innerState.withLock {
clientIdsToFlowIds.asSequence()
.filter { (_, status) -> status is FlowWithClientIdStatus.Removed }
.map { (clientId, status) -> clientId to (status as FlowWithClientIdStatus.Removed).succeeded }
.toMap()
}
}
private sealed class CheckpointLoadingStatus {
class Success(val checkpoint: Checkpoint) : CheckpointLoadingStatus()
object NotFound : CheckpointLoadingStatus()

View File

@ -120,6 +120,14 @@ interface StateMachineManager {
* @return whether the mapping was removed.
*/
fun removeClientId(clientId: String): Boolean
/**
* Returns all finished flows that were started with a client id.
*
* @return A [Map] containing client ids for finished flows, mapped to [true] if finished successfully,
* [false] if completed exceptionally.
*/
fun finishedFlowsWithClientIds(): Map<String, Boolean>
}
// These must be idempotent! A later failure in the state transition may error the flow state, and a replay may call

View File

@ -4,8 +4,10 @@ import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.concurrent.Semaphore
import net.corda.core.CordaRuntimeException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.KilledFlowException
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachineHandle
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.persistence.DBCheckpointStorage
@ -18,18 +20,17 @@ import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.startFlow
import net.corda.testing.node.internal.startFlowWithClientId
import net.corda.core.flows.KilledFlowException
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
import org.junit.Assert
import org.junit.Before
import org.junit.Test
import rx.Observable
import java.lang.IllegalArgumentException
import java.sql.SQLTransientConnectionException
import java.util.UUID
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.IllegalStateException
import kotlin.concurrent.thread
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
@ -775,17 +776,59 @@ class FlowClientIdTests {
reattachedFlowHandle?.resultFuture?.getOrThrow()
}.withMessage("java.lang.IllegalStateException: Bla bla bla")
}
@Test(timeout = 300_000)
fun `finishedFlowsWithClientIds returns completed flows with client ids`() {
val clientIds = listOf("a", "b", "c", "d", "e")
val lock = CountDownLatch(1)
ResultFlow.hook = { clientId ->
if (clientId == clientIds[3]) {
throw java.lang.IllegalStateException("This didn't go so well")
}
if (clientId == clientIds[4]) {
lock.await(30, TimeUnit.SECONDS)
}
}
val flows = listOf(
aliceNode.services.startFlowWithClientId(clientIds[0], ResultFlow(10)),
aliceNode.services.startFlowWithClientId(clientIds[1], ResultFlow(10)),
aliceNode.services.startFlowWithClientId(clientIds[2], ResultFlow(10))
)
val failedFlow = aliceNode.services.startFlowWithClientId(clientIds[3], ResultFlow(10))
val runningFlow = aliceNode.services.startFlowWithClientId(clientIds[4], ResultFlow(10))
flows.map { it.resultFuture }.transpose().getOrThrow(30.seconds)
assertFailsWith<java.lang.IllegalStateException> { failedFlow.resultFuture.getOrThrow(20.seconds) }
val finishedFlows = aliceNode.smm.finishedFlowsWithClientIds()
lock.countDown()
assertEquals(4, finishedFlows.size)
assertEquals(3, finishedFlows.filterValues { it }.size)
assertEquals(1, finishedFlows.filterValues { !it }.size)
assertEquals(setOf("a", "b", "c", "d"), finishedFlows.map { it.key }.toSet())
assertTrue(runningFlow.clientId !in finishedFlows.keys)
assertEquals(
listOf(10, 10, 10),
finishedFlows.filterValues { it }.map { aliceNode.smm.reattachFlowWithClientId<Int>(it.key)?.resultFuture?.get() }
)
// [CordaRunTimeException] returned because [IllegalStateException] is not serializable
assertFailsWith<CordaRuntimeException> {
finishedFlows.filterValues { !it }.map { aliceNode.smm.reattachFlowWithClientId<Int>(it.key)?.resultFuture?.getOrThrow() }
}
}
}
internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {
companion object {
var hook: (() -> Unit)? = null
var hook: ((String?) -> Unit)? = null
var suspendableHook: FlowLogic<Unit>? = null
}
@Suspendable
override fun call(): A {
hook?.invoke()
hook?.invoke(stateMachine.clientId)
suspendableHook?.let { subFlow(it) }
return result
}