CORDA-3681 - Store serialised exception in database for failed and hospitalized flows (#6539)

Integrate `DBFlowException` with the rest of the checkpoint schema, so now 
we are saving the flow's exception result in the database.

Making statemachine not remove `FAILED` flows' checkpoints from the 
database if they are started with a clientId.

Retrieve the DBFlowException from the database to construct a 
`FlowStateMachineHandle` future and complete exceptionally the flow's result 
future for requests (`startFlowDynamicWithClientId`) that pick FAILED flows , 
started with client id, of status Removed.

On killing a flow the client id mapping of the flow gets removed.

The storage serialiser is used for serialising exceptions. Note, that if an
exception cannot be serialised, it will not fail and will instead be stored
as a `CordaRuntimeException`. This could be improved in future
changes.
This commit is contained in:
Kyriakos Tharrouniatis 2020-08-04 16:33:44 +01:00 committed by GitHub
parent 35bfa6945f
commit 5d42b8847c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 411 additions and 286 deletions

View File

@ -15,7 +15,6 @@ import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.node.services.statemachine.Checkpoint
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.CHARLIE_NAME
@ -77,9 +76,10 @@ class FlowIsKilledTest {
assertEquals(11, AFlowThatWantsToDieAndKillsItsFriends.position)
assertTrue(AFlowThatWantsToDieAndKillsItsFriendsResponder.receivedKilledExceptions[BOB_NAME]!!)
assertTrue(AFlowThatWantsToDieAndKillsItsFriendsResponder.receivedKilledExceptions[CHARLIE_NAME]!!)
assertEquals(1, alice.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(2, bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(1, bob.rpc.startFlow(::GetNumberOfFailedCheckpointsFlow).returnValue.getOrThrow(20.seconds))
val aliceCheckpoints = alice.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, aliceCheckpoints)
val bobCheckpoints = bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, bobCheckpoints)
}
}
}
@ -109,9 +109,10 @@ class FlowIsKilledTest {
handle.returnValue.getOrThrow(1.minutes)
}
assertEquals(11, AFlowThatGetsMurderedByItsFriendResponder.position)
assertEquals(2, alice.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(1, alice.rpc.startFlow(::GetNumberOfFailedCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(1, bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
val aliceCheckpoints = alice.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, aliceCheckpoints)
val bobCheckpoints = bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, bobCheckpoints)
}
}
@ -360,18 +361,4 @@ class FlowIsKilledTest {
}
}
}
@StartableByRPC
class GetNumberOfFailedCheckpointsFlow : FlowLogic<Long>() {
override fun call(): Long {
return serviceHub.jdbcSession()
.prepareStatement("select count(*) from node_checkpoints where status = ${Checkpoint.FlowStatus.FAILED.ordinal}")
.use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
}
}
}
}

View File

@ -508,4 +508,8 @@ class FlowReloadAfterCheckpointTest {
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
}
}
}
internal class BrokenMap<K, V>(delegate: MutableMap<K, V> = mutableMapOf()) : MutableMap<K, V> by delegate {
override fun put(key: K, value: V): V? = throw IllegalStateException("Broken on purpose")
}

View File

@ -161,7 +161,7 @@ class FlowRetryTest {
}
@Test(timeout = 300_000)
fun `General external exceptions are not retried and propagate`() {
fun `general external exceptions are not retried and propagate`() {
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME)
@ -176,10 +176,7 @@ class FlowRetryTest {
).returnValue.getOrThrow()
}
assertEquals(0, GeneralExternalFailureFlow.retryCount)
assertEquals(
1,
nodeAHandle.rpc.startFlow(::GetCheckpointNumberOfStatusFlow, Checkpoint.FlowStatus.FAILED).returnValue.get()
)
assertEquals(0, nodeAHandle.rpc.startFlow(::GetCheckpointNumberOfStatusFlow, Checkpoint.FlowStatus.FAILED).returnValue.get())
}
}
@ -304,10 +301,6 @@ enum class Step { First, BeforeInitiate, AfterInitiate, AfterInitiateSendReceive
data class Visited(val sessionNum: Int, val iterationNum: Int, val step: Step)
class BrokenMap<K, V>(delegate: MutableMap<K, V> = mutableMapOf()) : MutableMap<K, V> by delegate {
override fun put(key: K, value: V): V? = throw IllegalStateException("Broken on purpose")
}
@StartableByRPC
class RetryFlow() : FlowLogic<String>(), IdempotentFlow {
companion object {

View File

@ -1,6 +1,7 @@
package net.corda.node.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.CordaRuntimeException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.internal.concurrent.OpenFuture
@ -76,6 +77,34 @@ class FlowWithClientIdTest {
assertTrue(errorMessage!!.contains("Unable to create an object serializer for type class ${UnserializableResultFlow.UNSERIALIZABLE_OBJECT::class.java.name}"))
}
}
@Test(timeout=300_000)
fun `If flow has an unserializable exception result then it gets converted into a 'CordaRuntimeException'`() {
ResultFlow.hook = {
throw UnserializableException()
}
val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
val node = startNode().getOrThrow()
// the below exception is the one populating the flows future. It will get serialized on node jvm, sent over to client and
// deserialized on client's.
val e0 = assertFailsWith<CordaRuntimeException> {
node.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5).returnValue.getOrThrow()
}
// the below exception is getting fetched from the database first, and deserialized on node's jvm,
// then serialized on node jvm, sent over to client and deserialized on client's.
val e1 = assertFailsWith<CordaRuntimeException> {
node.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5).returnValue.getOrThrow()
}
assertTrue(e0 !is UnserializableException)
assertTrue(e1 !is UnserializableException)
assertEquals(UnserializableException::class.java.name, e0.originalExceptionClassName)
assertEquals(UnserializableException::class.java.name, e1.originalExceptionClassName)
}
}
}
@StartableByRPC
@ -104,3 +133,7 @@ internal class UnserializableResultFlow: FlowLogic<OpenFuture<Observable<Unit>>>
return UNSERIALIZABLE_OBJECT
}
}
internal class UnserializableException(
val unserializableObject: BrokenMap<Unit, Unit> = BrokenMap()
): CordaRuntimeException("123")

View File

@ -26,7 +26,6 @@ import net.corda.core.utilities.seconds
import net.corda.finance.DOLLARS
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.CashIssueFlow
import net.corda.node.services.statemachine.Checkpoint
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.CHARLIE_NAME
@ -61,7 +60,8 @@ class KillFlowTest {
assertFailsWith<KilledFlowException> {
handle.returnValue.getOrThrow(1.minutes)
}
assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
val checkpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, checkpoints)
}
}
}
@ -88,11 +88,12 @@ class KillFlowTest {
AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.locks.forEach { it.value.acquire() }
assertTrue(AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.receivedKilledExceptions[BOB_NAME]!!)
assertTrue(AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.receivedKilledExceptions[CHARLIE_NAME]!!)
assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(2, bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(1, bob.rpc.startFlow(::GetNumberOfFailedCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(2, charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(1, charlie.rpc.startFlow(::GetNumberOfFailedCheckpointsFlow).returnValue.getOrThrow(20.seconds))
val aliceCheckpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, aliceCheckpoints)
val bobCheckpoints = bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, bobCheckpoints)
val charlieCheckpoints = charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, charlieCheckpoints)
}
}
}
@ -112,7 +113,8 @@ class KillFlowTest {
}
assertTrue(time < 1.minutes.toMillis(), "It should at a minimum, take less than a minute to kill this flow")
assertTrue(time < 5.seconds.toMillis(), "Really, it should take less than a few seconds to kill a flow")
assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
val checkpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, checkpoints)
}
}
}
@ -150,7 +152,8 @@ class KillFlowTest {
}
assertTrue(time < 1.minutes.toMillis(), "It should at a minimum, take less than a minute to kill this flow")
assertTrue(time < 5.seconds.toMillis(), "Really, it should take less than a few seconds to kill a flow")
assertEquals(1, startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
val checkpoints = startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, checkpoints)
}
@Test(timeout = 300_000)
@ -168,7 +171,8 @@ class KillFlowTest {
}
assertTrue(time < 1.minutes.toMillis(), "It should at a minimum, take less than a minute to kill this flow")
assertTrue(time < 5.seconds.toMillis(), "Really, it should take less than a few seconds to kill a flow")
assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
val checkpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, checkpoints)
}
}
}
@ -188,7 +192,8 @@ class KillFlowTest {
}
assertTrue(time < 1.minutes.toMillis(), "It should at a minimum, take less than a minute to kill this flow")
assertTrue(time < 5.seconds.toMillis(), "Really, it should take less than a few seconds to kill a flow")
assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
val checkpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, checkpoints)
}
}
}
@ -217,11 +222,12 @@ class KillFlowTest {
}
assertTrue(AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.receivedKilledExceptions[BOB_NAME]!!)
assertTrue(AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.receivedKilledExceptions[CHARLIE_NAME]!!)
assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(2, bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(1, bob.rpc.startFlow(::GetNumberOfFailedCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(2, charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(1, charlie.rpc.startFlow(::GetNumberOfFailedCheckpointsFlow).returnValue.getOrThrow(20.seconds))
val aliceCheckpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, aliceCheckpoints)
val bobCheckpoints = bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, bobCheckpoints)
val charlieCheckpoints = charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, charlieCheckpoints)
}
}
}
@ -251,11 +257,12 @@ class KillFlowTest {
assertTrue(AFlowThatGetsMurderedByItsFriend.receivedKilledException)
assertFalse(AFlowThatGetsMurderedByItsFriendResponder.receivedKilledExceptions[BOB_NAME]!!)
assertTrue(AFlowThatGetsMurderedByItsFriendResponder.receivedKilledExceptions[CHARLIE_NAME]!!)
assertEquals(2, alice.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(1, alice.rpc.startFlow(::GetNumberOfFailedCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(1, bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(2, charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
assertEquals(1, charlie.rpc.startFlow(::GetNumberOfFailedCheckpointsFlow).returnValue.getOrThrow(20.seconds))
val aliceCheckpoints = alice.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, aliceCheckpoints)
val bobCheckpoints = bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, bobCheckpoints)
val charlieCheckpoints = charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, charlieCheckpoints)
}
}
@ -588,18 +595,4 @@ class KillFlowTest {
}
}
}
@StartableByRPC
class GetNumberOfFailedCheckpointsFlow : FlowLogic<Long>() {
override fun call(): Long {
return serviceHub.jdbcSession()
.prepareStatement("select count(*) from node_checkpoints where status = ${Checkpoint.FlowStatus.FAILED.ordinal}")
.use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
}
}
}
}

View File

@ -75,5 +75,13 @@ interface CheckpointStorage {
*/
fun getFlowResult(id: StateMachineRunId, throwIfMissing: Boolean = false): Any?
/**
* Load a flow exception from the store. If [throwIfMissing] is true then it throws an [IllegalStateException]
* if the flow exception is missing in the database.
*/
fun getFlowException(id: StateMachineRunId, throwIfMissing: Boolean = false): Any?
fun removeFlowException(id: StateMachineRunId): Boolean
fun updateStatus(runId: StateMachineRunId, flowStatus: Checkpoint.FlowStatus)
}

View File

@ -442,14 +442,20 @@ class DBCheckpointStorage(
null
}
val exceptionDetails = updateDBFlowException(flowId, checkpoint, now)
val dbFlowException = if (checkpoint.status == FlowStatus.FAILED || checkpoint.status == FlowStatus.HOSPITALIZED) {
val errored = checkpoint.errorState as? ErrorState.Errored
errored?.let { createDBFlowException(flowId, it, now) }
?: throw IllegalStateException("Found '${checkpoint.status}' checkpoint whose error state is not ${ErrorState.Errored::class.java.simpleName}")
} else {
null
}
// Updates to children entities ([DBFlowCheckpointBlob], [DBFlowResult], [DBFlowException], [DBFlowMetadata]) are not cascaded to children tables.
val dbFlowCheckpoint = DBFlowCheckpoint(
flowId = flowId,
blob = blob,
result = dbFlowResult,
exceptionDetails = exceptionDetails,
exceptionDetails = dbFlowException,
flowMetadata = dummyDBFlowMetadata, // [DBFlowMetadata] will only update its 'finish_time' when a checkpoint finishes
status = checkpoint.status,
compatible = checkpoint.compatible,
@ -461,6 +467,7 @@ class DBCheckpointStorage(
currentDBSession().update(dbFlowCheckpoint)
blob?.let { currentDBSession().update(it) }
dbFlowResult?.let { currentDBSession().save(it) }
dbFlowException?.let { currentDBSession().save(it) }
if (checkpoint.isFinished()) {
setDBFlowMetadataFinishTime(flowId, now)
}
@ -475,16 +482,15 @@ class DBCheckpointStorage(
query.executeUpdate()
}
// DBFlowResult and DBFlowException to be integrated with rest of schema
@Suppress("MagicNumber")
override fun removeCheckpoint(id: StateMachineRunId): Boolean {
var deletedRows = 0
val flowId = id.uuid.toString()
deletedRows += deleteRow(DBFlowMetadata::class.java, DBFlowMetadata::flowId.name, flowId)
deletedRows += deleteRow(DBFlowException::class.java, DBFlowException::flow_id.name, flowId)
deletedRows += deleteRow(DBFlowResult::class.java, DBFlowResult::flow_id.name, flowId)
deletedRows += deleteRow(DBFlowCheckpointBlob::class.java, DBFlowCheckpointBlob::flowId.name, flowId)
deletedRows += deleteRow(DBFlowCheckpoint::class.java, DBFlowCheckpoint::flowId.name, flowId)
// exceptionId?.let { deletedRows += deleteRow(DBFlowException::class.java, DBFlowException::flow_id.name, it.toString()) }
return deletedRows >= 2
}
@ -527,6 +533,10 @@ class DBCheckpointStorage(
return currentDBSession().find(DBFlowResult::class.java, id.uuid.toString())
}
private fun getDBFlowException(id: StateMachineRunId): DBFlowException? {
return currentDBSession().find(DBFlowException::class.java, id.uuid.toString())
}
override fun getPausedCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>> {
val session = currentDBSession()
val jpqlQuery = """select new ${DBPausedFields::class.java.name}(checkpoint.id, blob.checkpoint, checkpoint.status,
@ -539,13 +549,13 @@ class DBCheckpointStorage(
}
}
// This method needs modification once CORDA-3681 is implemented to include FAILED flows as well
override fun getFinishedFlowsResultsMetadata(): Stream<Pair<StateMachineRunId, FlowResultMetadata>> {
val session = currentDBSession()
val jpqlQuery = """select new ${DBFlowResultMetadataFields::class.java.name}(checkpoint.id, checkpoint.status, metadata.userSuppliedIdentifier)
val jpqlQuery =
"""select new ${DBFlowResultMetadataFields::class.java.name}(checkpoint.id, checkpoint.status, metadata.userSuppliedIdentifier)
from ${DBFlowCheckpoint::class.java.name} checkpoint
join ${DBFlowMetadata::class.java.name} metadata on metadata.id = checkpoint.flowMetadata
where checkpoint.status = ${FlowStatus.COMPLETED.ordinal}""".trimIndent()
where checkpoint.status = ${FlowStatus.COMPLETED.ordinal} or checkpoint.status = ${FlowStatus.FAILED.ordinal}""".trimIndent()
val query = session.createQuery(jpqlQuery, DBFlowResultMetadataFields::class.java)
return query.resultList.stream().map {
StateMachineRunId(UUID.fromString(it.id)) to FlowResultMetadata(it.status, it.clientId)
@ -561,6 +571,20 @@ class DBCheckpointStorage(
return serializedFlowResult?.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)
}
override fun getFlowException(id: StateMachineRunId, throwIfMissing: Boolean): Any? {
val dbFlowException = getDBFlowException(id)
if (throwIfMissing && dbFlowException == null) {
throw IllegalStateException("Flow's $id exception was not found in the database. Something is very wrong.")
}
val serializedFlowException = dbFlowException?.value?.let { SerializedBytes<Any>(it) }
return serializedFlowException?.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)
}
override fun removeFlowException(id: StateMachineRunId): Boolean {
val flowId = id.uuid.toString()
return deleteRow(DBFlowException::class.java, DBFlowException::flow_id.name, flowId) == 1
}
override fun updateStatus(runId: StateMachineRunId, flowStatus: FlowStatus) {
val update = "Update ${NODE_DATABASE_PREFIX}checkpoints set status = ${flowStatus.ordinal} where flow_id = '${runId.uuid}'"
currentDBSession().createNativeQuery(update).executeUpdate()
@ -610,37 +634,6 @@ class DBCheckpointStorage(
)
}
/**
* Creates, updates or deletes the error related to the current flow/checkpoint.
*
* This is needed because updates are not cascading via Hibernate, therefore operations must be handled manually.
*
* A [DBFlowException] is created if [DBFlowCheckpoint.exceptionDetails] does not exist and the [Checkpoint] has an error attached to it.
* The existing [DBFlowException] is updated if [DBFlowCheckpoint.exceptionDetails] exists and the [Checkpoint] has an error.
* The existing [DBFlowException] is deleted if [DBFlowCheckpoint.exceptionDetails] exists and the [Checkpoint] has no error.
* Nothing happens if both [DBFlowCheckpoint] and [Checkpoint] are related to no errors.
*/
// DBFlowException to be integrated with rest of schema
// Add a flag notifying if an exception is already saved in the database for below logic (are we going to do this after all?)
private fun updateDBFlowException(flowId: String, checkpoint: Checkpoint, now: Instant): DBFlowException? {
val exceptionDetails = (checkpoint.errorState as? ErrorState.Errored)?.let { createDBFlowException(flowId, it, now) }
// if (checkpoint.dbExoSkeleton.dbFlowExceptionId != null) {
// if (exceptionDetails != null) {
// exceptionDetails.flow_id = checkpoint.dbExoSkeleton.dbFlowExceptionId!!
// currentDBSession().update(exceptionDetails)
// } else {
// val session = currentDBSession()
// val entity = session.get(DBFlowException::class.java, checkpoint.dbExoSkeleton.dbFlowExceptionId)
// session.delete(entity)
// return null
// }
// } else if (exceptionDetails != null) {
// currentDBSession().save(exceptionDetails)
// checkpoint.dbExoSkeleton.dbFlowExceptionId = exceptionDetails.flow_id
// }
return exceptionDetails
}
private fun createDBFlowException(flowId: String, errorState: ErrorState.Errored, now: Instant): DBFlowException {
return errorState.errors.last().exception.let {
DBFlowException(
@ -648,7 +641,7 @@ class DBCheckpointStorage(
type = it::class.java.name.truncate(MAX_EXC_TYPE_LENGTH, true),
message = it.message?.truncate(MAX_EXC_MSG_LENGTH, false),
stackTrace = it.stackTraceToString(),
value = null, // TODO to be populated upon implementing https://r3-cev.atlassian.net/browse/CORDA-3681
value = it.storageSerialize().bytes,
persistedInstant = now
)
}

View File

@ -5,7 +5,6 @@ import co.paralleluniverse.fibers.FiberExecutorScheduler
import co.paralleluniverse.fibers.instrument.JavaAgent
import com.codahale.metrics.Gauge
import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.core.CordaRuntimeException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.flows.FlowException
@ -192,7 +191,7 @@ internal class SingleThreadedStateMachineManager(
if (finishedFlowResult.status == Checkpoint.FlowStatus.COMPLETED) {
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Removed(id, true)
} else {
// - FAILED flows need to be fetched upon implementing https://r3-cev.atlassian.net/browse/CORDA-3681
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Removed(id, false)
}
} ?: logger.error("Found finished flow $id without a client id. Something is very wrong and this flow will be ignored.")
}
@ -431,7 +430,15 @@ internal class SingleThreadedStateMachineManager(
it.mapNotNull { (id, serializedCheckpoint) ->
// If a flow is added before start() then don't attempt to restore it
innerState.withLock { if (id in flows) return@mapNotNull null }
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id) ?: return@mapNotNull null
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id)?.also {
if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) {
checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.RUNNABLE)
if (!checkpointStorage.removeFlowException(id)) {
logger.error("Unable to remove database exception for flow $id. Something is very wrong. The flow will not be loaded and run.")
return@mapNotNull null
}
}
} ?: return@mapNotNull null
flowCreator.createFlowFromCheckpoint(id, checkpoint)
}.toList()
}
@ -466,13 +473,24 @@ internal class SingleThreadedStateMachineManager(
val flow = if (currentState.isAnyCheckpointPersisted) {
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
val serializedCheckpoint = database.transaction { checkpointStorage.getCheckpoint(flowId) }
if (serializedCheckpoint == null) {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
val checkpoint = database.transaction {
val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId)
if (serializedCheckpoint == null) {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return@transaction null
}
tryDeserializeCheckpoint(serializedCheckpoint, flowId)?.also {
if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) {
checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE)
if (!checkpointStorage.removeFlowException(flowId)) {
logger.error("Unable to remove database exception for flow $flowId. Something is very wrong. The flow will not retry.")
return@transaction null
}
}
} ?: return@transaction null
} ?: return
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, flowId) ?: return
// Resurrect flow
flowCreator.createFlowFromCheckpoint(flowId, checkpoint, reloadCheckpointAfterSuspendCount = currentState.reloadCheckpointAfterSuspendCount)
?: return
@ -817,7 +835,12 @@ internal class SingleThreadedStateMachineManager(
drainFlowEventQueue(flow)
// Complete the started future, needed when the flow fails during flow init (before completing an [UnstartedFlowTransition])
startedFutures.remove(flow.fiber.id)?.set(Unit)
flow.fiber.clientId?.let { setClientIdAsFailed(it, flow.fiber.id) }
flow.fiber.clientId?.let {
if (flow.fiber.isKilled) {
clientIdsToFlowIds.remove(it)
} else {
setClientIdAsFailed(it, flow.fiber.id) }
}
val flowError = removalReason.flowErrors[0] // TODO what to do with several?
val exception = flowError.exception
(exception as? FlowException)?.originalErrorId = flowError.errorId
@ -882,14 +905,13 @@ internal class SingleThreadedStateMachineManager(
is FlowWithClientIdStatus.Active -> existingStatus.flowStateMachineFuture
is FlowWithClientIdStatus.Removed -> {
val flowId = existingStatus.flowId
val resultFuture = if (existingStatus.succeeded) {
val flowResult = database.transaction { checkpointStorage.getFlowResult(existingStatus.flowId, throwIfMissing = true) }
doneFuture(flowResult)
} else {
// this block will be implemented upon implementing CORDA-3681 - for now just return a dummy exception
val flowException = CordaRuntimeException("dummy")
openFuture<Any?>().apply { setException(flowException) }
val flowException =
database.transaction { checkpointStorage.getFlowException(existingStatus.flowId, throwIfMissing = true) }
openFuture<Any?>().apply { setException(flowException as Throwable) }
}
doneClientIdFuture(flowId, resultFuture, clientId)

View File

@ -61,9 +61,15 @@ class ErrorFlowTransition(
if (!currentState.isRemoved) {
val newCheckpoint = startingState.checkpoint.copy(status = Checkpoint.FlowStatus.FAILED)
val removeOrPersistCheckpoint = if (currentState.checkpoint.checkpointState.invocationContext.clientId == null) {
Action.RemoveCheckpoint(context.id)
} else {
Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted)
}
actions.addAll(arrayOf(
Action.CreateTransaction,
Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted),
removeOrPersistCheckpoint,
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
Action.ReleaseSoftLocks(context.id.uuid),
Action.CommitTransaction,

View File

@ -21,6 +21,9 @@
<createIndex indexName="node_flow_results_idx" tableName="node_flow_results" clustered="false" unique="true">
<column name="flow_id"/>
</createIndex>
<createIndex indexName="node_flow_exceptions_idx" tableName="node_flow_exceptions" clustered="false" unique="true">
<column name="flow_id"/>
</createIndex>
<createIndex indexName="node_flow_metadata_idx" tableName="node_flow_metadata" clustered="false" unique="true">
<column name="flow_id"/>
</createIndex>

View File

@ -1,12 +1,15 @@
package net.corda.node.services.persistence
import net.corda.core.CordaRuntimeException
import net.corda.core.context.InvocationContext
import net.corda.core.context.InvocationOrigin
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.toSet
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
import net.corda.core.serialization.internal.checkpointSerialize
import net.corda.core.utilities.contextLogger
@ -38,7 +41,6 @@ import org.junit.After
import org.junit.Assert.assertNotNull
import org.junit.Assert.assertNull
import org.junit.Before
import org.junit.Ignore
import org.junit.Rule
import org.junit.Test
import java.time.Clock
@ -181,51 +183,6 @@ class DBCheckpointStorageTests {
}
}
@Ignore
@Test(timeout = 300_000)
fun `removing a checkpoint deletes from all checkpoint tables`() {
val exception = IllegalStateException("I am a naughty exception")
val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState()
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState, checkpoint.serializeCheckpointState())
}
val updatedCheckpoint = checkpoint.addError(exception).copy(result = "The result")
val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState, updatedCheckpoint.serializeCheckpointState()) }
database.transaction {
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
// The result not stored yet
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
// The saving of checkpoint blobs needs to be fixed
assertEquals(2, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().size)
}
database.transaction {
checkpointStorage.removeCheckpoint(id)
}
database.transaction {
assertThat(checkpointStorage.checkpoints()).isEmpty()
}
newCheckpointStorage()
database.transaction {
assertThat(checkpointStorage.checkpoints()).isEmpty()
}
database.transaction {
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
// The saving of checkpoint blobs needs to be fixed
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().size)
}
}
@Ignore
@Test(timeout = 300_000)
fun `removing a checkpoint when there is no result does not fail`() {
val exception = IllegalStateException("I am a naughty exception")
@ -240,11 +197,9 @@ class DBCheckpointStorageTests {
database.transaction {
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
// The result not stored yet
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
// The saving of checkpoint blobs needs to be fixed
assertEquals(2, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().size)
}
@ -263,8 +218,7 @@ class DBCheckpointStorageTests {
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
// The saving of checkpoint blobs needs to be fixed
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().size)
}
}
@ -479,7 +433,6 @@ class DBCheckpointStorageTests {
}
}
@Ignore
@Test(timeout = 300_000)
fun `update checkpoint with error information creates a new error database record`() {
val exception = IllegalStateException("I am a naughty exception")
@ -498,58 +451,12 @@ class DBCheckpointStorageTests {
assertNotNull(exceptionDetails)
assertEquals(exception::class.java.name, exceptionDetails!!.type)
assertEquals(exception.message, exceptionDetails.message)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
}
}
@Ignore
@Test(timeout = 300_000)
fun `update checkpoint with new error information updates the existing error database record`() {
val illegalStateException = IllegalStateException("I am a naughty exception")
val illegalArgumentException = IllegalArgumentException("I am a very naughty exception")
val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState()
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState, checkpoint.serializeCheckpointState())
}
val updatedCheckpoint1 = checkpoint.addError(illegalStateException)
val updatedSerializedFlowState1 = updatedCheckpoint1.serializeFlowState()
database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint1, updatedSerializedFlowState1, updatedCheckpoint1.serializeCheckpointState()) }
// Set back to clean
val updatedCheckpoint2 = checkpoint.addError(illegalArgumentException)
val updatedSerializedFlowState2 = updatedCheckpoint2.serializeFlowState()
database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint2, updatedSerializedFlowState2, updatedCheckpoint2.serializeCheckpointState()) }
database.transaction {
assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize().errorState is ErrorState.Clean)
val exceptionDetails = session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails
assertNotNull(exceptionDetails)
assertEquals(illegalArgumentException::class.java.name, exceptionDetails!!.type)
assertEquals(illegalArgumentException.message, exceptionDetails.message)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
}
}
@Test(timeout = 300_000)
fun `clean checkpoints delete the error record from the database`() {
val exception = IllegalStateException("I am a naughty exception")
val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.serializeFlowState()
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState, checkpoint.serializeCheckpointState())
}
val updatedCheckpoint = checkpoint.addError(exception)
val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState, updatedCheckpoint.serializeCheckpointState()) }
database.transaction {
// Checkpoint always returns clean error state when retrieved via [getCheckpoint]
assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize().errorState is ErrorState.Clean)
}
// Set back to clean
database.transaction { checkpointStorage.updateCheckpoint(id, checkpoint, serializedFlowState, checkpoint.serializeCheckpointState()) }
database.transaction {
assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize().errorState is ErrorState.Clean)
assertNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
val deserializedException = exceptionDetails.value?.let { SerializedBytes<Any>(it) }?.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)
// IllegalStateException does not implement [CordaThrowable] therefore gets deserialized as a [CordaRuntimeException]
assertTrue(deserializedException is CordaRuntimeException)
val cordaRuntimeException = deserializedException as CordaRuntimeException
assertEquals(IllegalStateException::class.java.name, cordaRuntimeException.originalExceptionClassName)
assertEquals("I am a naughty exception", cordaRuntimeException.originalMessage!!)
}
}
@ -642,7 +549,6 @@ class DBCheckpointStorageTests {
}
}
@Ignore
@Test(timeout = 300_000)
fun `-not greater than DBCheckpointStorage_MAX_STACKTRACE_LENGTH- stackTrace gets persisted as a whole`() {
val smallerDummyStackTrace = ArrayList<StackTraceElement>()
@ -675,7 +581,6 @@ class DBCheckpointStorageTests {
}
}
@Ignore
@Test(timeout = 300_000)
fun `-greater than DBCheckpointStorage_MAX_STACKTRACE_LENGTH- stackTrace gets truncated to MAX_LENGTH_VARCHAR, and persisted`() {
val smallerDummyStackTrace = ArrayList<StackTraceElement>()
@ -721,9 +626,9 @@ class DBCheckpointStorageTests {
private fun iterationsBasedOnLineSeparatorLength() = when {
System.getProperty("line.separator").length == 1 -> // Linux or Mac
158
78
System.getProperty("line.separator").length == 2 -> // Windows
152
75
else -> throw IllegalStateException("Unknown line.separator")
}
@ -794,7 +699,7 @@ class DBCheckpointStorageTests {
}
@Test(timeout = 300_000)
fun `updateCheckpoint setting DBFlowCheckpoint_blob to null whenever flow fails or gets hospitalized doesn't break ORM relationship`() {
fun `'updateCheckpoint' setting 'DBFlowCheckpoint_blob' to null whenever flow fails or gets hospitalized doesn't break ORM relationship`() {
val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
@ -803,8 +708,8 @@ class DBCheckpointStorageTests {
}
database.transaction {
val paused = changeStatus(checkpoint, Checkpoint.FlowStatus.FAILED) // the exact same behaviour applies for 'HOSPITALIZED' as well
checkpointStorage.updateCheckpoint(id, paused.checkpoint, serializedFlowState, paused.checkpoint.serializeCheckpointState())
val failed = checkpoint.addError(IllegalStateException()) // the exact same behaviour applies for 'HOSPITALIZED' as well
checkpointStorage.updateCheckpoint(id, failed, serializedFlowState, failed.serializeCheckpointState())
}
database.transaction {
@ -831,7 +736,6 @@ class DBCheckpointStorageTests {
}
}
// This test needs modification once CORDA-3681 is implemented to include FAILED flows as well
@Test(timeout = 300_000)
fun `'getFinishedFlowsResultsMetadata' fetches flows results metadata for finished flows only`() {
val (_, checkpoint) = newCheckpoint(1)
@ -863,7 +767,10 @@ class DBCheckpointStorageTests {
}.toList()
assertEquals(6, checkpointsInDb)
assertEquals(Checkpoint.FlowStatus.COMPLETED, resultsMetadata.single().second.status)
val finishedStatuses = resultsMetadata.map { it.second.status }
assertTrue(Checkpoint.FlowStatus.COMPLETED in finishedStatuses)
assertTrue(Checkpoint.FlowStatus.FAILED in finishedStatuses)
}
data class IdAndCheckpoint(val id: StateMachineRunId, val checkpoint: Checkpoint)
@ -928,7 +835,8 @@ class DBCheckpointStorageTests {
exception
)
), 0, false
)
),
status = Checkpoint.FlowStatus.FAILED
)
}

View File

@ -5,6 +5,7 @@ import co.paralleluniverse.strands.concurrent.Semaphore
import net.corda.core.CordaRuntimeException
import net.corda.core.flows.FlowLogic
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachineHandle
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.persistence.DBCheckpointStorage
@ -15,13 +16,15 @@ import net.corda.testing.node.internal.FINANCE_CONTRACTS_CORDAPP
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.startFlow
import net.corda.testing.node.internal.startFlowWithClientId
import net.corda.core.flows.KilledFlowException
import org.junit.After
import org.junit.Assert
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import rx.Observable
import java.lang.IllegalArgumentException
import java.lang.IllegalStateException
import java.sql.SQLTransientConnectionException
import java.util.UUID
@ -29,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNotEquals
import kotlin.test.assertNull
import kotlin.test.assertTrue
@ -58,6 +62,8 @@ class FlowClientIdTests {
SingleThreadedStateMachineManager.onClientIDNotFound = null
SingleThreadedStateMachineManager.onCallingStartFlowInternal = null
SingleThreadedStateMachineManager.onStartFlowInternalThrewAndAboutToRemove = null
StaffedFlowHospital.onFlowErrorPropagated.clear()
}
@Test(timeout=300_000)
@ -180,19 +186,96 @@ class FlowClientIdTests {
Assert.assertEquals(result0, result1)
}
@Ignore // this is to be unignored upon implementing CORDA-3681
@Test(timeout=300_000)
fun `flow's exception is available after flow's lifetime if flow is started with a client id`() {
ResultFlow.hook = { throw IllegalStateException() }
fun `failing flow's exception is available after flow's lifetime if flow is started with a client id`() {
var counter = 0
ResultFlow.hook = {
counter++
throw IllegalStateException()
}
val clientId = UUID.randomUUID().toString()
var flowHandle0: FlowStateMachineHandle<Int>? = null
assertFailsWith<IllegalStateException> {
aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow()
flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
flowHandle0!!.resultFuture.getOrThrow()
}
assertFailsWith<IllegalStateException> {
aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow()
var flowHandle1: FlowStateMachineHandle<Int>? = null
assertFailsWith<CordaRuntimeException> {
flowHandle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
flowHandle1!!.resultFuture.getOrThrow()
}
// Assert no new flow has started
assertEquals(flowHandle0!!.id, flowHandle1!!.id)
assertEquals(1, counter)
}
@Test(timeout=300_000)
fun `failed flow's exception is available after flow's lifetime on node start if flow was started with a client id`() {
var counter = 0
ResultFlow.hook = {
counter++
throw IllegalStateException()
}
val clientId = UUID.randomUUID().toString()
var flowHandle0: FlowStateMachineHandle<Int>? = null
assertFailsWith<IllegalStateException> {
flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
flowHandle0!!.resultFuture.getOrThrow()
}
aliceNode = mockNet.restartNode(aliceNode)
var flowHandle1: FlowStateMachineHandle<Int>? = null
assertFailsWith<CordaRuntimeException> {
flowHandle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
flowHandle1!!.resultFuture.getOrThrow()
}
// Assert no new flow has started
assertEquals(flowHandle0!!.id, flowHandle1!!.id)
assertEquals(1, counter)
}
@Test(timeout=300_000)
fun `killing a flow, removes the flow from the client id mapping`() {
var counter = 0
val flowIsRunning = Semaphore(0)
val waitUntilFlowIsRunning = Semaphore(0)
ResultFlow.suspendableHook = object : FlowLogic<Unit>() {
var firstRun = true
@Suspendable
override fun call() {
++counter
if (firstRun) {
firstRun = false
waitUntilFlowIsRunning.release()
flowIsRunning.acquire()
}
}
}
val clientId = UUID.randomUUID().toString()
var flowHandle0: FlowStateMachineHandle<Int>? = null
assertFailsWith<KilledFlowException> {
flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
waitUntilFlowIsRunning.acquire()
aliceNode.internals.smm.killFlow(flowHandle0!!.id)
flowIsRunning.release()
flowHandle0!!.resultFuture.getOrThrow()
}
// a new flow will start since the client id mapping was removed when flow got killed
val flowHandle1: FlowStateMachineHandle<Int> = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
flowHandle1.resultFuture.getOrThrow()
assertNotEquals(flowHandle0!!.id, flowHandle1.id)
assertEquals(2, counter)
}
@Test(timeout=300_000)
@ -326,6 +409,7 @@ class FlowClientIdTests {
Assert.assertEquals(5, flowHandle1.resultFuture.getOrThrow(20.seconds))
}
// the below test has to be made available only in ENT
// @Test(timeout=300_000)
// fun `on node restart -paused- flows with client id are hook-able`() {
// val clientId = UUID.randomUUID().toString()
@ -424,6 +508,7 @@ class FlowClientIdTests {
assertEquals(1, counter)
}
// the below test has to be made available only in ENT
// @Test(timeout=300_000)
// fun `On 'startFlowInternal' throwing, subsequent request with same client hits the time window in which the previous request was about to remove the client id mapping`() {
// val clientId = UUID.randomUUID().toString()
@ -461,7 +546,6 @@ class FlowClientIdTests {
// assertEquals(0, counter)
// }
// This test needs modification once CORDA-3681 is implemented to check that 'node_flow_exceptions' gets a row
@Test(timeout=300_000)
fun `if flow fails to serialize its result then the result gets converted to an exception result`() {
val clientId = UUID.randomUUID().toString()
@ -474,8 +558,7 @@ class FlowClientIdTests {
val checkpointStatus = findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().single().status
assertEquals(Checkpoint.FlowStatus.FAILED, checkpointStatus)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
// uncomment the below line once CORDA-3681 is implemented
//assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
}
assertFailsWith<CordaRuntimeException> {
@ -497,6 +580,49 @@ class FlowClientIdTests {
val result = aliceNode.services.startFlowWithClientId(clientId, UnSerializableResultFlow()).resultFuture.getOrThrow()
assertEquals(5, result)
}
@Test(timeout=300_000)
fun `flow that fails does not retain its checkpoint nor its exception in the database if not started with a client id`() {
assertFailsWith<IllegalStateException> {
aliceNode.services.startFlow(ExceptionFlow { IllegalStateException("another exception") }).resultFuture.getOrThrow()
}
aliceNode.services.database.transaction {
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
}
}
@Test(timeout=300_000)
fun `subsequent request to failed flow that cannot find a 'DBFlowException' in the database, fails with 'IllegalStateException'`() {
ResultFlow.hook = {
// just throwing a different exception from the one expected out of startFlowWithClientId second call below ([IllegalStateException])
// to be sure [IllegalStateException] gets thrown from [DBFlowException] that is missing
throw IllegalArgumentException()
}
val clientId = UUID.randomUUID().toString()
var flowHandle0: FlowStateMachineHandle<Int>? = null
assertFailsWith<IllegalArgumentException> {
flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
flowHandle0!!.resultFuture.getOrThrow()
}
// manually remove [DBFlowException] from the database to impersonate missing [DBFlowException]
val removed = aliceNode.services.database.transaction {
aliceNode.internals.checkpointStorage.removeFlowException(flowHandle0!!.id)
}
assertTrue(removed)
val e = assertFailsWith<IllegalStateException> {
aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)).resultFuture.getOrThrow()
}
assertEquals("Flow's ${flowHandle0!!.id} exception was not found in the database. Something is very wrong.", e.message)
}
}
internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {

View File

@ -30,6 +30,7 @@ import net.corda.core.internal.declaredField
import net.corda.core.messaging.MessageRecipients
import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.queryBy
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
@ -64,7 +65,6 @@ import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.getMessage
import net.corda.testing.node.internal.startFlow
import net.corda.testing.node.internal.startFlowWithClientId
import org.apache.commons.lang3.exception.ExceptionUtils
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatIllegalArgumentException
import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType
@ -75,7 +75,6 @@ import org.junit.Assert.assertNotEquals
import org.junit.Assert.assertNotNull
import org.junit.Assert.assertNull
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import rx.Notification
import rx.Observable
@ -86,6 +85,7 @@ import java.time.Instant
import java.util.UUID
import java.util.concurrent.TimeoutException
import java.util.function.Predicate
import kotlin.concurrent.thread
import kotlin.reflect.KClass
import kotlin.streams.toList
import kotlin.test.assertFailsWith
@ -309,8 +309,7 @@ class FlowFrameworkTests {
.withStackTraceContaining(ReceiveFlow::class.java.name) // Make sure the stack trace is that of the receiving flow
.withStackTraceContaining("Received counter-flow exception from peer")
bobNode.database.transaction {
val checkpoint = bobNode.internals.checkpointStorage.checkpoints().single()
assertEquals(Checkpoint.FlowStatus.FAILED, checkpoint.status)
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
}
assertThat(receivingFiber.state).isEqualTo(Strand.State.WAITING)
@ -686,6 +685,7 @@ class FlowFrameworkTests {
flowState = flowFiber!!.transientState.checkpoint.flowState
if (firstExecution) {
firstExecution = false
throw HospitalizeFlowException()
} else {
dbCheckpointStatusBeforeSuspension = aliceNode.internals.checkpointStorage.getCheckpoints().toList().single().second.status
@ -701,7 +701,7 @@ class FlowFrameworkTests {
}
assertFailsWith<TimeoutException> {
aliceNode.services.startFlow(SuspendingFlow()).resultFuture.getOrThrow(30.seconds) // wait till flow gets hospitalized
aliceNode.services.startFlow(SuspendingFlow()).resultFuture.getOrThrow(10.seconds) // wait till flow gets hospitalized
}
// flow is in hospital
assertTrue(flowState is FlowState.Unstarted)
@ -712,11 +712,10 @@ class FlowFrameworkTests {
assertEquals(Checkpoint.FlowStatus.HOSPITALIZED, checkpoint.status)
}
// restart Node - flow will be loaded from checkpoint
firstExecution = false
aliceNode = mockNet.restartNode(aliceNode)
futureFiber.get().resultFuture.getOrThrow() // wait until the flow has completed
// checkpoint states ,after flow retried, before and after suspension
assertEquals(Checkpoint.FlowStatus.HOSPITALIZED, dbCheckpointStatusBeforeSuspension)
assertEquals(Checkpoint.FlowStatus.RUNNABLE, dbCheckpointStatusBeforeSuspension)
assertEquals(Checkpoint.FlowStatus.RUNNABLE, inMemoryCheckpointStatusBeforeSuspension)
assertEquals(Checkpoint.FlowStatus.RUNNABLE, dbCheckpointStatusAfterSuspension)
}
@ -734,6 +733,7 @@ class FlowFrameworkTests {
flowState = flowFiber!!.transientState.checkpoint.flowState
if (firstExecution) {
firstExecution = false
throw HospitalizeFlowException()
} else {
dbCheckpointStatus = aliceNode.internals.checkpointStorage.getCheckpoints().toList().single().second.status
@ -744,7 +744,7 @@ class FlowFrameworkTests {
}
assertFailsWith<TimeoutException> {
aliceNode.services.startFlow(SuspendingFlow()).resultFuture.getOrThrow(30.seconds) // wait till flow gets hospitalized
aliceNode.services.startFlow(SuspendingFlow()).resultFuture.getOrThrow(10.seconds) // wait till flow gets hospitalized
}
// flow is in hospital
assertTrue(flowState is FlowState.Started)
@ -753,41 +753,13 @@ class FlowFrameworkTests {
assertEquals(Checkpoint.FlowStatus.HOSPITALIZED, checkpoint.status)
}
// restart Node - flow will be loaded from checkpoint
firstExecution = false
aliceNode = mockNet.restartNode(aliceNode)
futureFiber.get().resultFuture.getOrThrow() // wait until the flow has completed
// checkpoint states ,after flow retried, after suspension
assertEquals(Checkpoint.FlowStatus.HOSPITALIZED, dbCheckpointStatus)
assertEquals(Checkpoint.FlowStatus.RUNNABLE, dbCheckpointStatus)
assertEquals(Checkpoint.FlowStatus.RUNNABLE, inMemoryCheckpointStatus)
}
// Upon implementing CORDA-3681 unignore this test; DBFlowException is not currently integrated
@Ignore
@Test(timeout=300_000)
fun `Checkpoint is updated in DB with FAILED status and the error when flow fails`() {
var flowId: StateMachineRunId? = null
val e = assertFailsWith<FlowException> {
val fiber = aliceNode.services.startFlow(ExceptionFlow { FlowException("Just an exception") })
flowId = fiber.id
fiber.resultFuture.getOrThrow()
}
aliceNode.database.transaction {
val checkpoint = aliceNode.internals.checkpointStorage.checkpoints().single()
assertEquals(Checkpoint.FlowStatus.FAILED, checkpoint.status)
// assert all fields of DBFlowException
val persistedException = aliceNode.internals.checkpointStorage.getDBCheckpoint(flowId!!)!!.exceptionDetails
assertEquals(FlowException::class.java.name, persistedException!!.type)
assertEquals("Just an exception", persistedException.message)
assertEquals(ExceptionUtils.getStackTrace(e), persistedException.stackTrace)
assertEquals(null, persistedException.value)
}
}
// Upon implementing CORDA-3681 unignore this test; DBFlowException is not currently integrated
@Ignore
@Test(timeout=300_000)
fun `Checkpoint is updated in DB with HOSPITALIZED status and the error when flow is kept for overnight observation` () {
var flowId: StateMachineRunId? = null
@ -803,10 +775,13 @@ class FlowFrameworkTests {
assertEquals(Checkpoint.FlowStatus.HOSPITALIZED, checkpoint.status)
// assert all fields of DBFlowException
val persistedException = aliceNode.internals.checkpointStorage.getDBCheckpoint(flowId!!)!!.exceptionDetails
assertEquals(HospitalizeFlowException::class.java.name, persistedException!!.type)
assertEquals("Overnight observation", persistedException.message)
assertEquals(null, persistedException.value)
val exceptionDetails = aliceNode.internals.checkpointStorage.getDBCheckpoint(flowId!!)!!.exceptionDetails
assertEquals(HospitalizeFlowException::class.java.name, exceptionDetails!!.type)
assertEquals("Overnight observation", exceptionDetails.message)
val deserializedException = exceptionDetails.value?.let { SerializedBytes<Any>(it) }?.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)
assertNotNull(deserializedException)
val hospitalizeFlowException = deserializedException as HospitalizeFlowException
assertEquals("Overnight observation", hospitalizeFlowException.message)
}
}
@ -836,7 +811,81 @@ class FlowFrameworkTests {
assertEquals(null, persistedException)
}
//region Helpers
// When ported to ENT use the existing API there to properly retry the flow
@Test(timeout=300_000)
fun `Hospitalized flow, resets to 'RUNNABLE' and clears exception when retried`() {
var firstRun = true
var counter = 0
val waitUntilHospitalizedTwice = Semaphore(-1)
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
++counter
if (firstRun) {
firstRun = false
val fiber = FlowStateMachineImpl.currentStateMachine()!!
thread {
// schedule a [RetryFlowFromSafePoint] after the [OvernightObservation] gets scheduled by the hospital
Thread.sleep(2000)
fiber.scheduleEvent(Event.RetryFlowFromSafePoint)
}
}
waitUntilHospitalizedTwice.release()
}
var counterRes = 0
StaffedFlowHospital.onFlowResuscitated.add { _, _, _ -> ++counterRes }
aliceNode.services.startFlow(ExceptionFlow { HospitalizeFlowException("hospitalizing") })
waitUntilHospitalizedTwice.acquire()
assertEquals(2, counter)
assertEquals(0, counterRes)
}
@Test(timeout=300_000)
fun `Hospitalized flow, resets to 'RUNNABLE' and clears database exception on node start`() {
var checkpointStatusAfterRestart: Checkpoint.FlowStatus? = null
var dbExceptionAfterRestart: List<DBCheckpointStorage.DBFlowException>? = null
var secondRun = false
SuspendingFlow.hookBeforeCheckpoint = {
if(secondRun) {
aliceNode.database.transaction {
checkpointStatusAfterRestart = findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().single().status
dbExceptionAfterRestart = findRecordsFromDatabase()
}
} else {
secondRun = true
}
throw HospitalizeFlowException("hospitalizing")
}
var counter = 0
val waitUntilHospitalized = Semaphore(0)
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
++counter
waitUntilHospitalized.release()
}
var counterRes = 0
StaffedFlowHospital.onFlowResuscitated.add { _, _, _ -> ++counterRes }
aliceNode.services.startFlow(SuspendingFlow())
waitUntilHospitalized.acquire()
Thread.sleep(3000) // wait until flow saves overnight observation state in database
aliceNode = mockNet.restartNode(aliceNode)
waitUntilHospitalized.acquire()
Thread.sleep(3000) // wait until flow saves overnight observation state in database
assertEquals(2, counter)
assertEquals(0, counterRes)
assertEquals(Checkpoint.FlowStatus.RUNNABLE, checkpointStatusAfterRestart)
assertEquals(0, dbExceptionAfterRestart!!.size)
}
//region Helpers
private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0)
@ -1207,7 +1256,7 @@ internal class SuspendingFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
stateMachine.hookBeforeCheckpoint()
sleep(1.seconds) // flow checkpoints => checkpoint is in DB
stateMachine.suspend(FlowIORequest.ForceCheckpoint, maySkipCheckpoint = false) // flow checkpoints => checkpoint is in DB
stateMachine.hookAfterCheckpoint()
}
}