ENT-6355 Empty flow arguments on error transition (#4307)

A user passed in a `FlowLogic` as an argument into another `FlowLogic`
called `subFlow` on it and had it throw an exception.

This all occurred before the first checkpoint, causing the state machine
to try and persist a FAILED checkpoint containing the flow's arguments.
Because the arguments contained a `FlowLogic` that had been started via
`subFlow` it held a reference to `FlowLogic._stateMachine` which cannot
be serialized.

This caused the flow to fail when trying to persist the fact that it
failed.

The flow arguments are now emptied during `ErrorFlowTransition` to
resolve this issue which mimics the behaviour of the first suspend.
Note, this only takes the arguments out of the serialized checkpoint, it
does not affect the flow metadata and therefore a flow's arguments can
still be viewed.

Co-authored-by: Dan Newton <dan.newton@r3.com>
This commit is contained in:
Kyriakos Tharrouniatis 2021-12-01 14:57:30 +00:00 committed by GitHub
parent e93e7c2846
commit 3cbfb0e024
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 154 additions and 1 deletions

View File

@ -1,6 +1,10 @@
package net.corda.node.flows
import co.paralleluniverse.fibers.Suspendable
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.KryoSerializable
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.PermissionException
import net.corda.core.CordaRuntimeException
@ -11,10 +15,14 @@ import net.corda.core.flows.ResultSerializationException
import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.doOnError
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.FlowHandleWithClientId
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.startFlowWithClientId
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.Permissions
@ -23,9 +31,12 @@ import net.corda.nodeapi.exceptions.RejectedCommandException
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.NodeParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import net.corda.testing.node.internal.enclosedCordapp
import org.assertj.core.api.Assertions
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Before
import org.junit.Test
import rx.Observable
@ -33,6 +44,7 @@ import java.time.Duration
import java.time.Instant
import java.util.UUID
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import kotlin.reflect.KClass
import kotlin.test.assertEquals
@ -498,6 +510,124 @@ class FlowWithClientIdTest {
}
}
// This test is not very realistic because the scenario it happens under is also not very realistic.
@Test(timeout = 300_000)
fun `flow started with client id that fails before its first checkpoint that contains an unserializable argument will be persited as FAILED`() {
val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) {
val nodeA = startNode(NodeParameters(ALICE_NAME)).getOrThrow()
val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::QuickFailingFlow, LazyUnserializableObject())
val reattachedFlowHandle = nodeA.rpc.reattachFlowWithClientId<Int>(clientId)
assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy {
flowHandle.returnValue.getOrThrow(20.seconds)
}.withMessage("I have failed quickly")
assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy {
reattachedFlowHandle?.returnValue?.getOrThrow()
}.withMessage("I have failed quickly")
assertTrue(nodeA.hasStatus(flowHandle.id, Checkpoint.FlowStatus.FAILED))
val arguments = nodeA.rpc.startFlow(::GetFlowInitialArgumentsFromMetadata, flowHandle.id).returnValue.getOrThrow(20.seconds)
assertEquals(arguments.size, 1)
assertTrue(arguments.single() is LazyUnserializableObject)
}
}
// This test has been added to replicate the exact scenario a user experienced.
@Test(timeout = 300_000)
fun `flow started with client id that fails before its first checkpoint with subflow'd flow will be persited as FAILED`() {
val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) {
val nodeA = startNode(NodeParameters(ALICE_NAME)).getOrThrow()
val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::PassedInFailingFlow, SuperQuickFailingFlow())
val reattachedFlowHandle = nodeA.rpc.reattachFlowWithClientId<Int>(clientId)
assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy {
flowHandle.returnValue.getOrThrow(20.seconds)
}.withMessage("I have failed quickly")
assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy {
reattachedFlowHandle?.returnValue?.getOrThrow()
}.withMessage("I have failed quickly")
assertTrue(nodeA.hasStatus(flowHandle.id, Checkpoint.FlowStatus.FAILED))
val arguments = nodeA.rpc.startFlow(::GetFlowInitialArgumentsFromMetadata, flowHandle.id).returnValue.getOrThrow(20.seconds)
assertEquals(arguments.size, 1)
assertTrue(arguments.single() is SuperQuickFailingFlow)
}
}
@Test(timeout = 300_000)
fun `flow started with client id that fails can use doOnError to process the exception`() {
val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) {
val nodeA = startNode(NodeParameters(ALICE_NAME)).getOrThrow()
val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::SuperQuickFailingFlow)
val reattachedFlowHandle = nodeA.rpc.reattachFlowWithClientId<Int>(clientId)
val lock = CountDownLatch(1)
val reattachedLock = CountDownLatch(1)
flowHandle.returnValue.doOnError {
lock.countDown()
}
reattachedFlowHandle?.returnValue?.doOnError {
reattachedLock.countDown()
}
assertTrue(lock.await(20, TimeUnit.SECONDS))
assertTrue(reattachedLock.await(20, TimeUnit.SECONDS))
assertTrue(flowHandle.returnValue.isDone)
assertTrue(reattachedFlowHandle!!.returnValue.isDone)
}
}
@CordaSerializable
@StartableByRPC
internal class QuickFailingFlow(private val lazyUnserializableObject: LazyUnserializableObject) : FlowLogic<Int>() {
@Suspendable
override fun call(): Int {
lazyUnserializableObject.prop = UnserializableObject()
throw CordaRuntimeException("I have failed quickly")
}
}
@CordaSerializable
class LazyUnserializableObject(var prop: UnserializableObject? = null)
@CordaSerializable
class UnserializableObject : KryoSerializable {
override fun write(kryo: Kryo?, output: Output?) {
throw IllegalStateException("Cannot be serialized")
}
override fun read(kryo: Kryo?, input: Input?) {
throw IllegalStateException("Cannot be read")
}
}
@StartableByRPC
internal class PassedInFailingFlow(private val flow: SuperQuickFailingFlow) : FlowLogic<Int>() {
@Suspendable
override fun call(): Int {
return subFlow(flow)
}
}
@CordaSerializable
@StartableByRPC
internal class SuperQuickFailingFlow : FlowLogic<Int>() {
@Suspendable
override fun call(): Int {
throw CordaRuntimeException("I have failed quickly")
}
}
@StartableByRPC
internal class ResultFlow<A>(private val result: A) : FlowLogic<A>() {
companion object {
@ -568,6 +698,24 @@ class FlowWithClientIdTest {
}
}
@StartableByRPC
internal class GetFlowInitialArgumentsFromMetadata(private val id: StateMachineRunId) : FlowLogic<List<Any?>>() {
@Suspendable
override fun call(): List<Any?> {
val argumentBytes = serviceHub.jdbcSession().prepareStatement("select flow_parameters from node_flow_metadata where flow_id = ?")
.apply {
setString(1, id.uuid.toString())
}
.use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getBytes(1)
}
}
return argumentBytes.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)
}
}
internal class UnserializableException(
val unserializableObject: BrokenMap<Unit, Unit> = BrokenMap()
) : CordaRuntimeException("123")

View File

@ -63,7 +63,12 @@ class ErrorFlowTransition(
status = Checkpoint.FlowStatus.FAILED,
flowState = FlowState.Finished,
checkpointState = startingState.checkpoint.checkpointState.copy(
numberOfCommits = startingState.checkpoint.checkpointState.numberOfCommits + 1
numberOfCommits = startingState.checkpoint.checkpointState.numberOfCommits + 1,
invocationContext = if (startingState.checkpoint.checkpointState.invocationContext.arguments!!.isNotEmpty()) {
startingState.checkpoint.checkpointState.invocationContext.copy(arguments = emptyList())
} else {
startingState.checkpoint.checkpointState.invocationContext
}
)
)
currentState = currentState.copy(