ENT-5750 Keep killed flows that were started with client ids (#6697)

If a flow is started with a client id, do not delete it from the
database. Instead, set the status to `KILLED` and store a
`KilledFlowException` in the database.
Keeping it around allows it to be reattached to using the same
mechanisms as active, completed or failed flows. Furthermore, without
this change it could be possible for a flow to be triggered again if
killed while the reconnecting rpc client is being used.
If there is no client id related to the flow, then kill flow keeps its
original behaviour of deleting all traces of the flow.
Flows cannot be killed if they are `COMPLETED`, `FAILED` or `KILLED`
already.
Logs have been added if requests to kill flows in these statuses are
made.
Do not update the status + persist the exception if the flow was killed
during flow initialisation (before persisting its first checkpoint).
Remove the client id mapping if the flow was killed and did not persist
its original checkpoint.
This commit is contained in:
Dan Newton 2020-09-10 11:58:55 +01:00 committed by GitHub
parent 18a9545e07
commit 7ec59da318
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 472 additions and 103 deletions

View File

@ -3,20 +3,32 @@ package net.corda.node.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.CordaRuntimeException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.flows.KilledFlowException
import net.corda.core.flows.ResultSerializationException
import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.FlowHandleWithClientId
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.startFlowWithClientId
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.statemachine.Checkpoint
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.driver
import org.assertj.core.api.Assertions
import org.junit.Before
import org.junit.Test
import rx.Observable
import java.time.Duration
import java.time.Instant
import java.util.UUID
import java.util.concurrent.TimeoutException
import kotlin.reflect.KClass
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNotEquals
@ -151,10 +163,61 @@ class FlowWithClientIdTest {
assertEquals(true, finishedFlows[clientId])
}
}
}
@StartableByRPC
internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {
@Test(timeout = 300_000)
fun `a killed flow's exception can be retrieved after restarting the node`() {
val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet(), inMemoryDB = false)) {
val nodeA = startNode(providedName = ALICE_NAME).getOrThrow()
var flowHandle0: FlowHandleWithClientId<Unit>? = null
assertFailsWith<KilledFlowException> {
flowHandle0 = nodeA.rpc.startFlowWithClientId(clientId, ::HospitalizeFlow)
nodeA.waitForOvernightObservation(flowHandle0!!.id, 20.seconds)
nodeA.rpc.killFlow(flowHandle0!!.id)
flowHandle0!!.returnValue.getOrThrow(20.seconds)
}
val flowHandle1: FlowHandleWithClientId<Unit> = nodeA.rpc.startFlowWithClientId(clientId, ::HospitalizeFlow)
assertFailsWith<KilledFlowException> {
flowHandle1.returnValue.getOrThrow(20.seconds)
}
assertEquals(flowHandle0!!.id, flowHandle1.id)
assertTrue(nodeA.hasStatus(flowHandle0!!.id, Checkpoint.FlowStatus.KILLED))
assertTrue(nodeA.hasException(flowHandle0!!.id, KilledFlowException::class))
nodeA.stop()
val nodeARestarted = startNode(providedName = ALICE_NAME).getOrThrow()
assertFailsWith<KilledFlowException> {
nodeARestarted.rpc.reattachFlowWithClientId<Unit>(clientId)!!.returnValue.getOrThrow(20.seconds)
}
}
}
private fun NodeHandle.hasStatus(id: StateMachineRunId, status: Checkpoint.FlowStatus): Boolean {
return rpc.startFlow(::IsFlowInStatus, id, status.ordinal).returnValue.getOrThrow(20.seconds)
}
private fun <T: Exception> NodeHandle.hasException(id: StateMachineRunId, type: KClass<T>): Boolean {
return rpc.startFlow(::GetExceptionType, id).returnValue.getOrThrow(20.seconds) == type.qualifiedName
}
private fun NodeHandle.waitForOvernightObservation(id: StateMachineRunId, timeout: Duration) {
val timeoutTime = Instant.now().plusSeconds(timeout.seconds)
var exists = false
while (Instant.now().isBefore(timeoutTime) && !exists) {
exists = rpc.startFlow(::IsFlowInStatus, id, Checkpoint.FlowStatus.HOSPITALIZED.ordinal).returnValue.getOrThrow(timeout)
Thread.sleep(1.seconds.toMillis())
}
if (!exists) {
throw TimeoutException("Flow was not kept for observation during timeout duration")
}
}
@StartableByRPC
internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {
companion object {
var hook: (() -> Unit)? = null
var suspendableHook: FlowLogic<Unit>? = null
@ -166,10 +229,10 @@ internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {
suspendableHook?.let { subFlow(it) }
return result
}
}
}
@StartableByRPC
internal class UnserializableResultFlow: FlowLogic<OpenFuture<Observable<Unit>>>() {
@StartableByRPC
internal class UnserializableResultFlow: FlowLogic<OpenFuture<Observable<Unit>>>() {
companion object {
val UNSERIALIZABLE_OBJECT = openFuture<Observable<Unit>>().also { it.set(Observable.empty<Unit>())}
}
@ -178,8 +241,52 @@ internal class UnserializableResultFlow: FlowLogic<OpenFuture<Observable<Unit>>>
override fun call(): OpenFuture<Observable<Unit>> {
return UNSERIALIZABLE_OBJECT
}
}
}
internal class UnserializableException(
@StartableByRPC
internal class HospitalizeFlow: FlowLogic<Unit>() {
@Suspendable
override fun call() {
throw HospitalizeFlowException("time to go to the doctors")
}
}
@StartableByRPC
internal class IsFlowInStatus(private val id: StateMachineRunId, private val ordinal: Int): FlowLogic<Boolean>() {
@Suspendable
override fun call(): Boolean {
return serviceHub.jdbcSession().prepareStatement("select count(*) from node_checkpoints where status = ? and flow_id = ?")
.apply {
setInt(1, ordinal)
setString(2, id.uuid.toString())
}
.use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
}.toInt() == 1
}
}
@StartableByRPC
internal class GetExceptionType(private val id: StateMachineRunId): FlowLogic<String>() {
@Suspendable
override fun call(): String {
return serviceHub.jdbcSession().prepareStatement("select type from node_flow_exceptions where flow_id = ?")
.apply { setString(1, id.uuid.toString()) }
.use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getString(1)
}
}
}
}
internal class UnserializableException(
val unserializableObject: BrokenMap<Unit, Unit> = BrokenMap()
): CordaRuntimeException("123")
): CordaRuntimeException("123")
}

View File

@ -6,6 +6,7 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowExternalOperation
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.KilledFlowException
@ -210,6 +211,26 @@ class KillFlowTest {
}
}
@Test(timeout = 300_000)
fun `killing a hospitalized flow ends the flow immediately`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.let { rpc ->
val handle = rpc.startFlow(::AFlowThatGetsMurderedWhileInTheHospital)
Thread.sleep(5000)
val time = measureTimeMillis {
rpc.killFlow(handle.id)
assertFailsWith<KilledFlowException> {
handle.returnValue.getOrThrow(1.minutes)
}
}
assertTrue(time < 1.minutes.toMillis(), "It should at a minimum, take less than a minute to kill this flow")
assertTrue(time < 5.seconds.toMillis(), "Really, it should take less than a few seconds to kill a flow")
assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
}
}
}
@Test(timeout = 300_000)
fun `a killed flow will propagate the killed error to counter parties if it was suspended`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
@ -482,6 +503,15 @@ class KillFlowTest {
}
}
@StartableByRPC
@InitiatingFlow
class AFlowThatGetsMurderedWhileInTheHospital : FlowLogic<Unit>() {
@Suspendable
override fun call() {
throw HospitalizeFlowException("time to go to the doctors")
}
}
@StartableByRPC
@InitiatingFlow
class AFlowThatGetsMurderedAndSomehowKillsItsFriends(private val parties: List<Party>) : FlowLogic<Unit>() {

View File

@ -11,6 +11,7 @@ import java.util.stream.Stream
/**
* Thread-safe storage of fiber checkpoints.
*/
@Suppress("TooManyFunctions")
interface CheckpointStorage {
/**
* Add a checkpoint for a new id to the store. Will throw if there is already a checkpoint for this id
@ -100,5 +101,7 @@ interface CheckpointStorage {
*/
fun getFlowException(id: StateMachineRunId, throwIfMissing: Boolean = false): Any?
fun addFlowException(id: StateMachineRunId, exception: Throwable)
fun removeFlowException(id: StateMachineRunId): Boolean
}

View File

@ -392,7 +392,7 @@ class DBCheckpointStorage(
val dbFlowException = if (checkpoint.status == FlowStatus.FAILED || checkpoint.status == FlowStatus.HOSPITALIZED) {
val errored = checkpoint.errorState as? ErrorState.Errored
errored?.let { createDBFlowException(flowId, it, now) }
errored?.run { createDBFlowException(flowId, errors.last().exception, now) }
?: throw IllegalStateException("Found '${checkpoint.status}' checkpoint whose error state is not ${ErrorState.Errored::class.java.simpleName}")
} else {
null
@ -460,7 +460,7 @@ class DBCheckpointStorage(
val dbFlowException = if (checkpoint.status == FlowStatus.FAILED || checkpoint.status == FlowStatus.HOSPITALIZED) {
val errored = checkpoint.errorState as? ErrorState.Errored
errored?.let { createDBFlowException(flowId, it, now) }
errored?.run { createDBFlowException(flowId, errors.last().exception, now) }
?: throw IllegalStateException("Found '${checkpoint.status}' checkpoint whose error state is not ${ErrorState.Errored::class.java.simpleName}")
} else {
null
@ -575,7 +575,9 @@ class DBCheckpointStorage(
"""select new ${DBFlowResultMetadataFields::class.java.name}(checkpoint.id, checkpoint.status, metadata.userSuppliedIdentifier)
from ${DBFlowCheckpoint::class.java.name} checkpoint
join ${DBFlowMetadata::class.java.name} metadata on metadata.id = checkpoint.flowMetadata
where checkpoint.status = ${FlowStatus.COMPLETED.ordinal} or checkpoint.status = ${FlowStatus.FAILED.ordinal}""".trimIndent()
where checkpoint.status = ${FlowStatus.COMPLETED.ordinal}
or checkpoint.status = ${FlowStatus.FAILED.ordinal}
or checkpoint.status = ${FlowStatus.KILLED.ordinal}""".trimIndent()
val query = session.createQuery(jpqlQuery, DBFlowResultMetadataFields::class.java)
return query.resultList.stream().map {
StateMachineRunId(UUID.fromString(it.id)) to FlowResultMetadata(it.status, it.clientId)
@ -600,14 +602,21 @@ class DBCheckpointStorage(
return serializedFlowException?.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)
}
override fun addFlowException(id: StateMachineRunId, exception: Throwable) {
currentDBSession().save(createDBFlowException(id.uuid.toString(), exception, clock.instant()))
}
override fun removeFlowException(id: StateMachineRunId): Boolean {
val flowId = id.uuid.toString()
return deleteRow(DBFlowException::class.java, DBFlowException::flow_id.name, flowId) == 1
return deleteRow(DBFlowException::class.java, DBFlowException::flow_id.name, id.uuid.toString()) == 1
}
override fun updateStatus(runId: StateMachineRunId, flowStatus: FlowStatus) {
val update = "Update ${NODE_DATABASE_PREFIX}checkpoints set status = ${flowStatus.ordinal} where flow_id = '${runId.uuid}'"
currentDBSession().createNativeQuery(update).executeUpdate()
currentDBSession()
.createNativeQuery("Update ${NODE_DATABASE_PREFIX}checkpoints set status = :status, timestamp = :timestamp where flow_id = :id")
.setParameter("status", flowStatus.ordinal)
.setParameter("timestamp", clock.instant())
.setParameter("id", runId.uuid.toString())
.executeUpdate()
}
override fun updateCompatible(runId: StateMachineRunId, compatible: Boolean) {
@ -659,18 +668,16 @@ class DBCheckpointStorage(
)
}
private fun createDBFlowException(flowId: String, errorState: ErrorState.Errored, now: Instant): DBFlowException {
return errorState.errors.last().exception.let {
DBFlowException(
private fun createDBFlowException(flowId: String, exception: Throwable, now: Instant): DBFlowException {
return DBFlowException(
flow_id = flowId,
type = it::class.java.name.truncate(MAX_EXC_TYPE_LENGTH, true),
message = it.message?.truncate(MAX_EXC_MSG_LENGTH, false),
stackTrace = it.stackTraceToString(),
value = it.storageSerialize().bytes,
type = exception::class.java.name.truncate(MAX_EXC_TYPE_LENGTH, true),
message = exception.message?.truncate(MAX_EXC_MSG_LENGTH, false),
stackTrace = exception.stackTraceToString(),
value = exception.storageSerialize().bytes,
persistedInstant = now
)
}
}
private fun setDBFlowMetadataFinishTime(flowId: String, now: Instant) {
val session = currentDBSession()

View File

@ -69,6 +69,21 @@ sealed class Action {
*/
data class RemoveCheckpoint(val id: StateMachineRunId, val mayHavePersistentResults: Boolean = false) : Action()
/**
* Remove a flow's exception from the database.
*
* @param id The id of the flow
*/
data class RemoveFlowException(val id: StateMachineRunId) : Action()
/**
* Persist an exception to the database for the related flow.
*
* @param id The id of the flow
* @param exception The exception to persist
*/
data class AddFlowException(val id: StateMachineRunId, val exception: Throwable) : Action()
/**
* Persist the deduplication facts of [deduplicationHandlers].
*/

View File

@ -69,6 +69,8 @@ internal class ActionExecutorImpl(
is Action.CancelFlowTimeout -> cancelFlowTimeout(action)
is Action.MoveFlowToPaused -> executeMoveFlowToPaused(action)
is Action.UpdateFlowStatus -> executeUpdateFlowStatus(action)
is Action.RemoveFlowException -> executeRemoveFlowException(action)
is Action.AddFlowException -> executeAddFlowException(action)
}
}
private fun executeReleaseSoftLocks(action: Action.ReleaseSoftLocks) {
@ -252,4 +254,12 @@ internal class ActionExecutorImpl(
private fun scheduleFlowTimeout(action: Action.ScheduleFlowTimeout) {
stateMachineManager.scheduleFlowTimeout(action.flowId)
}
private fun executeRemoveFlowException(action: Action.RemoveFlowException) {
checkpointStorage.removeFlowException(action.id)
}
private fun executeAddFlowException(action: Action.AddFlowException) {
checkpointStorage.addFlowException(action.id, action.exception)
}
}

View File

@ -11,6 +11,7 @@ import net.corda.core.context.InvocationContext
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.KilledFlowException
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
@ -78,8 +79,6 @@ internal class SingleThreadedStateMachineManager(
private val VALID_KILL_FLOW_STATUSES = setOf(
Checkpoint.FlowStatus.RUNNABLE,
Checkpoint.FlowStatus.FAILED,
Checkpoint.FlowStatus.COMPLETED,
Checkpoint.FlowStatus.HOSPITALIZED,
Checkpoint.FlowStatus.PAUSED
)
@ -352,28 +351,62 @@ internal class SingleThreadedStateMachineManager(
override fun killFlow(id: StateMachineRunId): Boolean {
val flow = innerState.withLock { flows[id] }
val killFlowResult = if (flow != null) {
flow.withFlowLock(VALID_KILL_FLOW_STATUSES) {
val killFlowResult = flow?.let { killInMemoryFlow(it) } ?: killOutOfMemoryFlow(id)
return killFlowResult || flowHospital.dropSessionInit(id)
}
private fun killInMemoryFlow(flow: Flow<*>): Boolean {
val id = flow.fiber.id
return flow.withFlowLock(VALID_KILL_FLOW_STATUSES) {
if (!flow.fiber.transientState.isKilled) {
flow.fiber.transientState = flow.fiber.transientState.copy(isKilled = true)
logger.info("Killing flow $id known to this node.")
// The checkpoint and soft locks are removed here instead of relying on the processing of the next event after setting
// the killed flag. This is to ensure a flow can be removed from the database, even if it is stuck in a infinite loop.
// The checkpoint and soft locks are handled here as well as in a flow's transition. This means that we do not need to rely
// on the processing of the next event after setting the killed flag. This is to ensure a flow can be updated/removed from
// the database, even if it is stuck in a infinite loop.
if (flow.fiber.transientState.isAnyCheckpointPersisted) {
database.transaction {
if (flow.fiber.clientId != null) {
checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.KILLED)
checkpointStorage.removeFlowException(id)
checkpointStorage.addFlowException(id, KilledFlowException(id))
} else {
checkpointStorage.removeCheckpoint(id, mayHavePersistentResults = true)
}
serviceHub.vaultService.softLockRelease(id.uuid)
}
}
unfinishedFibers.countDown()
flow.fiber.transientState = flow.fiber.transientState.copy(isKilled = true)
scheduleEvent(Event.DoRemainingWork)
true
}
} else {
// It may be that the id refers to a checkpoint that couldn't be deserialised into a flow, so we delete it if it exists.
database.transaction { checkpointStorage.removeCheckpoint(id, mayHavePersistentResults = true) }
logger.info("A repeated request to kill flow $id has been made, ignoring...")
false
}
}
}
return killFlowResult || flowHospital.dropSessionInit(id)
private fun killOutOfMemoryFlow(id: StateMachineRunId): Boolean {
return database.transaction {
val checkpoint = checkpointStorage.getCheckpoint(id)
when {
checkpoint != null && checkpoint.status == Checkpoint.FlowStatus.COMPLETED -> {
logger.info("Attempt to kill flow $id which has already completed, ignoring...")
false
}
checkpoint != null && checkpoint.status == Checkpoint.FlowStatus.FAILED -> {
logger.info("Attempt to kill flow $id which has already failed, ignoring...")
false
}
checkpoint != null && checkpoint.status == Checkpoint.FlowStatus.KILLED -> {
logger.info("Attempt to kill flow $id which has already been killed, ignoring...")
false
}
// It may be that the id refers to a checkpoint that couldn't be deserialised into a flow, so we delete it if it exists.
else -> checkpointStorage.removeCheckpoint(id, mayHavePersistentResults = true)
}
}
}
private fun markAllFlowsAsPaused() {
@ -969,14 +1002,16 @@ internal class SingleThreadedStateMachineManager(
lastState: StateMachineState
) {
drainFlowEventQueue(flow)
// Complete the started future, needed when the flow fails during flow init (before completing an [UnstartedFlowTransition])
startedFutures.remove(flow.fiber.id)?.set(Unit)
flow.fiber.clientId?.let {
if (flow.fiber.isKilled) {
// If the flow was killed before fully initialising and persisting its initial checkpoint,
// then remove it from the client id map (removing the final proof of its existence from the node)
if (flow.fiber.isKilled && !flow.fiber.transientState.isAnyCheckpointPersisted) {
clientIdsToFlowIds.remove(it)
} else {
setClientIdAsFailed(it, flow.fiber.id) }
}
// Complete the started future, needed when the flow fails during flow init (before completing an [UnstartedFlowTransition])
startedFutures.remove(flow.fiber.id)?.set(Unit)
val flowError = removalReason.flowErrors[0] // TODO what to do with several?
val exception = flowError.exception
(exception as? FlowException)?.originalErrorId = flowError.errorId

View File

@ -3,11 +3,13 @@ package net.corda.node.services.statemachine.transitions
import net.corda.core.flows.FlowException
import net.corda.core.flows.KilledFlowException
import net.corda.node.services.statemachine.Action
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.services.statemachine.ErrorSessionMessage
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.FlowError
import net.corda.node.services.statemachine.FlowRemovalReason
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.SessionId
import net.corda.node.services.statemachine.SessionState
import net.corda.node.services.statemachine.StateMachineState
@ -29,24 +31,34 @@ class KilledFlowTransition(
startingState.checkpoint.checkpointState.sessions,
errorMessages
)
val newCheckpoint = startingState.checkpoint.copy(
status = Checkpoint.FlowStatus.KILLED,
flowState = FlowState.Finished,
checkpointState = startingState.checkpoint.checkpointState.copy(sessions = newSessions)
)
currentState = currentState.copy(
checkpoint = startingState.checkpoint.setSessions(sessions = newSessions),
checkpoint = newCheckpoint,
pendingDeduplicationHandlers = emptyList(),
isRemoved = true
)
actions += Action.PropagateErrors(
errorMessages,
initiatedSessions,
startingState.senderUUID
)
actions += Action.PropagateErrors(errorMessages, initiatedSessions, startingState.senderUUID)
if (!startingState.isFlowResumed) {
actions += Action.CreateTransaction
}
// The checkpoint and soft locks are also removed directly in [StateMachineManager.killFlow]
if (startingState.isAnyCheckpointPersisted) {
// The checkpoint is updated/removed and soft locks are removed directly in [StateMachineManager.killFlow] as well
if (currentState.checkpoint.checkpointState.invocationContext.clientId == null) {
actions += Action.RemoveCheckpoint(context.id, mayHavePersistentResults = true)
} else if (startingState.isAnyCheckpointPersisted) {
actions += Action.UpdateFlowStatus(context.id, Checkpoint.FlowStatus.KILLED)
actions += Action.RemoveFlowException(context.id)
actions += Action.AddFlowException(context.id, killedFlowError.exception)
}
actions += Action.PersistDeduplicationFacts(startingState.pendingDeduplicationHandlers)
actions += Action.ReleaseSoftLocks(context.id.uuid)
actions += Action.CommitTransaction(currentState)

View File

@ -4,11 +4,14 @@ import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.concurrent.Semaphore
import net.corda.core.CordaRuntimeException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.flows.KilledFlowException
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachineHandle
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.testing.core.ALICE_NAME
@ -27,14 +30,17 @@ import org.junit.Before
import org.junit.Test
import rx.Observable
import java.sql.SQLTransientConnectionException
import java.time.Duration
import java.time.Instant
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNotEquals
import kotlin.test.assertFalse
import kotlin.test.assertNull
import kotlin.test.assertTrue
@ -242,9 +248,8 @@ class FlowClientIdTests {
}
@Test(timeout = 300_000)
fun `killing a flow, removes the flow from the client id mapping`() {
fun `killing a flow, sets the flow status to killed and adds an exception to the database`() {
var counter = 0
val flowIsRunning = Semaphore(0)
val waitUntilFlowIsRunning = Semaphore(0)
ResultFlow.suspendableHook = object : FlowLogic<Unit>() {
var firstRun = true
@ -255,7 +260,7 @@ class FlowClientIdTests {
if (firstRun) {
firstRun = false
waitUntilFlowIsRunning.release()
flowIsRunning.acquire()
sleep(1.minutes)
}
}
}
@ -266,16 +271,66 @@ class FlowClientIdTests {
flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
waitUntilFlowIsRunning.acquire()
aliceNode.internals.smm.killFlow(flowHandle0!!.id)
flowIsRunning.release()
flowHandle0!!.resultFuture.getOrThrow()
}
// a new flow will start since the client id mapping was removed when flow got killed
val flowHandle1: FlowStateMachineHandle<Int> = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
assertFailsWith<KilledFlowException> {
flowHandle1.resultFuture.getOrThrow()
}
assertNotEquals(flowHandle0!!.id, flowHandle1.id)
assertEquals(2, counter)
assertEquals(flowHandle0!!.id, flowHandle1.id)
assertEquals(1, counter)
assertTrue(aliceNode.hasStatus(flowHandle0!!.id, Checkpoint.FlowStatus.KILLED))
assertTrue(aliceNode.hasException(flowHandle0!!.id))
}
@Test(timeout = 300_000)
fun `killing a hospitalized flow, sets the flow status to killed and adds an exception to the database`() {
val clientId = UUID.randomUUID().toString()
var flowHandle0: FlowStateMachineHandle<Unit>? = null
assertFailsWith<KilledFlowException> {
flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, HospitalizeFlow())
aliceNode.waitForOvernightObservation(flowHandle0!!.id, 20.seconds)
aliceNode.internals.smm.killFlow(flowHandle0!!.id)
flowHandle0!!.resultFuture.getOrThrow()
}
val flowHandle1: FlowStateMachineHandle<Unit> = aliceNode.services.startFlowWithClientId(clientId, HospitalizeFlow())
assertFailsWith<KilledFlowException> {
flowHandle1.resultFuture.getOrThrow()
}
assertEquals(flowHandle0!!.id, flowHandle1.id)
assertTrue(aliceNode.hasStatus(flowHandle0!!.id, Checkpoint.FlowStatus.KILLED))
assertTrue(aliceNode.hasException(flowHandle0!!.id))
}
@Test(timeout = 300_000)
fun `killing a flow twice does nothing`() {
val clientId = UUID.randomUUID().toString()
var flowHandle0: FlowStateMachineHandle<Unit>? = null
assertFailsWith<KilledFlowException> {
flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, HospitalizeFlow())
aliceNode.waitForOvernightObservation(flowHandle0!!.id, 20.seconds)
aliceNode.internals.smm.killFlow(flowHandle0!!.id)
flowHandle0!!.resultFuture.getOrThrow()
}
val flowHandle1: FlowStateMachineHandle<Unit> = aliceNode.services.startFlowWithClientId(clientId, HospitalizeFlow())
assertFailsWith<KilledFlowException> {
flowHandle1.resultFuture.getOrThrow()
}
assertEquals(flowHandle0!!.id, flowHandle1.id)
assertTrue(aliceNode.hasStatus(flowHandle0!!.id, Checkpoint.FlowStatus.KILLED))
assertTrue(aliceNode.hasException(flowHandle0!!.id))
assertFalse(aliceNode.internals.smm.killFlow(flowHandle0!!.id))
assertTrue(aliceNode.hasStatus(flowHandle0!!.id, Checkpoint.FlowStatus.KILLED))
assertTrue(aliceNode.hasException(flowHandle0!!.id))
}
@Test(timeout = 300_000)
@ -686,6 +741,22 @@ class FlowClientIdTests {
}.withMessage("java.lang.IllegalStateException: Bla bla bla")
}
@Test(timeout = 300_000)
fun `reattachFlowWithClientId can retrieve exception from killed flow`() {
val clientId = UUID.randomUUID().toString()
var flowHandle0: FlowStateMachineHandle<Unit>
assertFailsWith<KilledFlowException> {
flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, HospitalizeFlow())
aliceNode.waitForOvernightObservation(flowHandle0.id, 20.seconds)
aliceNode.internals.smm.killFlow(flowHandle0.id)
flowHandle0.resultFuture.getOrThrow()
}
assertFailsWith<KilledFlowException> {
aliceNode.smm.reattachFlowWithClientId<Int>(clientId)?.resultFuture?.getOrThrow()
}
}
@Test(timeout = 300_000)
fun `finishedFlowsWithClientIds returns completed flows with client ids`() {
val clientIds = listOf("a", "b", "c", "d", "e")
@ -727,9 +798,79 @@ class FlowClientIdTests {
finishedFlows.filterValues { !it }.map { aliceNode.smm.reattachFlowWithClientId<Int>(it.key)?.resultFuture?.getOrThrow() }
}
}
}
internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {
@Test(timeout = 300_000)
fun `finishedFlowsWithClientIds returns exception for killed flows`() {
val clientId = UUID.randomUUID().toString()
var flowHandle0: FlowStateMachineHandle<Unit>
assertFailsWith<KilledFlowException> {
flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, HospitalizeFlow())
aliceNode.waitForOvernightObservation(flowHandle0.id, 20.seconds)
aliceNode.internals.smm.killFlow(flowHandle0.id)
flowHandle0.resultFuture.getOrThrow()
}
val finishedFlows = aliceNode.smm.finishedFlowsWithClientIds()
assertFailsWith<KilledFlowException> {
finishedFlows.keys.single().let { aliceNode.smm.reattachFlowWithClientId<Int>(it)?.resultFuture?.getOrThrow() }
}
}
private fun TestStartedNode.hasStatus(id: StateMachineRunId, status: Checkpoint.FlowStatus): Boolean {
return services.database.transaction {
services.jdbcSession().prepareStatement("select count(*) from node_checkpoints where status = ? and flow_id = ?")
.apply {
setInt(1, status.ordinal)
setString(2, id.uuid.toString())
}
.use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
}.toInt() == 1
}
}
private fun TestStartedNode.hasException(id: StateMachineRunId): Boolean {
return services.database.transaction {
services.jdbcSession().prepareStatement("select count(*) from node_flow_exceptions where flow_id = ?")
.apply { setString(1, id.uuid.toString()) }
.use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
}.toInt() == 1
}
}
private fun TestStartedNode.waitForOvernightObservation(id: StateMachineRunId, timeout: Duration) {
val timeoutTime = Instant.now().plusSeconds(timeout.seconds)
var exists = false
while (Instant.now().isBefore(timeoutTime) && !exists) {
services.database.transaction {
exists = services.jdbcSession().prepareStatement("select count(*) from node_checkpoints where status = ? and flow_id = ?")
.apply {
setInt(1, Checkpoint.FlowStatus.HOSPITALIZED.ordinal)
setString(2, id.uuid.toString())
}
.use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
}.toInt() == 1
Thread.sleep(1.seconds.toMillis())
}
}
if (!exists) {
throw TimeoutException("Flow was not kept for observation during timeout duration")
}
}
internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {
companion object {
var hook: ((String?) -> Unit)? = null
var suspendableHook: FlowLogic<Unit>? = null
@ -741,9 +882,9 @@ internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {
suspendableHook?.let { subFlow(it) }
return result
}
}
}
internal class UnSerializableResultFlow: FlowLogic<Any>() {
internal class UnSerializableResultFlow: FlowLogic<Any>() {
companion object {
var firstRun = true
}
@ -758,4 +899,13 @@ internal class UnSerializableResultFlow: FlowLogic<Any>() {
5 // serializable result
}
}
}
internal class HospitalizeFlow: FlowLogic<Unit>() {
@Suspendable
override fun call() {
throw HospitalizeFlowException("time to go to the doctors")
}
}
}