CORDA-3202 Add a specific exception for flows to hospitalise themselves (#5767)

* Introducing a new type of exception and a new hospital staff member to pause flows by immediately hospitalising them.

* Renaming exception to "HospitalizeFlowException".

* Making HospitalizeFlowException an open class.

* Overloading constructors of HospitalizeFlowException to be available in Java.

* Using Throwable#mentionsThrowable.

* Moving HospitalizeFlowException in its own file.

* Update kdocs for HospitalizeFlowException and StaffedFlowHospital#SedationNurse.

* Added tests, testing various HospitalizeFlowException types thrown.

* Fix Detekt issues.

* Imports optimizing.

* Add safe casting.

* Update api-flows and node-flow-hospital docs.

* Minor code comment change.

* Add DOCSTART-DOCEND signs in HospitalizeFlowException for makeDocs. It is referenced by api-flows.rst.

* Minor change in note.

* Code formatting.

* Remove comment.

* Remove if statement that makes example worse.

* Remove redundant comment.

* Moving 'Internal Corda errors' at the bottom.

* Changing node-flow-hospital.rst as per review.

* Change HospitalizeFlowException description as per review.

* Adding an example for FlowException.

* Minor indentation fix.

* Update FlowException example label as per review.

* Correcting handling of custom exception.
This commit is contained in:
Kyriakos Tharrouniatis 2019-12-11 10:35:58 +00:00 committed by Rick Parker
parent 0d7c10a846
commit b0903efa50
6 changed files with 245 additions and 3 deletions

View File

@ -0,0 +1,17 @@
package net.corda.core.flows
import net.corda.core.CordaRuntimeException
// DOCSTART 1
/**
* This exception allows a flow to pass itself to the flow hospital. Once the flow reaches
* the hospital it will determine how to progress depending on what [cause]s the exception wraps.
* Assuming there are no important wrapped exceptions, throwing a [HospitalizeFlowException]
* will place the flow in overnight observation, where it will be replayed at a later time.
*/
open class HospitalizeFlowException(message: String?, cause: Throwable?) : CordaRuntimeException(message, cause) {
constructor(message: String?) : this(message, null)
constructor(cause: Throwable?) : this(cause?.toString(), cause)
constructor() : this(null, null)
}
// DOCEND 1

View File

@ -773,6 +773,77 @@ There are many scenarios in which throwing a ``FlowException`` would be appropri
* The transaction does not match the parameters of the deal as discussed
* You are reneging on a deal
Below is an example using ``FlowException``:
.. container:: codeset
.. sourcecode:: kotlin
@InitiatingFlow
class SendMoneyFlow(private val moneyRecipient: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val money = Money(10.0, USD)
try {
initiateFlow(moneyRecipient).sendAndReceive<Unit>(money)
} catch (e: FlowException) {
if (e.cause is WrongCurrencyException) {
log.info(e.message, e)
}
}
}
}
@InitiatedBy(SendMoneyFlow::class)
class ReceiveMoneyFlow(private val moneySender: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val receivedMoney = moneySender.receive<Money>().unwrap { it }
if (receivedMoney.currency != GBP) {
// Wrap a thrown Exception with a FlowException for the counter party to receive it.
throw FlowException(WrongCurrencyException("I only accept GBP, sorry!"))
}
}
}
class WrongCurrencyException(message: String) : CordaRuntimeException(message)
HospitalizeFlowException
------------------------
Some operations can fail intermittently and will succeed if they are tried again at a later time. Flows have the ability to halt their
execution in such situations. By throwing a ``HospitalizeFlowException`` a flow will stop and retry at a later time (on the next node restart).
A ``HospitalizeFlowException`` can be defined in various ways:
.. container:: codeset
.. literalinclude:: ../../core/src/main/kotlin/net/corda/core/flows/HospitalizeFlowException.kt
:language: kotlin
:start-after: DOCSTART 1
:end-before: DOCEND 1
.. note:: If a ``HospitalizeFlowException`` is wrapping or extending an exception already being handled by the :doc:`node-flow-hospital`, the outcome of a flow may change. For example, the flow
could instantly retry or terminate if a critical error occurred.
.. note:: ``HospitalizeFlowException`` can be extended for customized exceptions. These exceptions will be treated in the same way when thrown.
Below is an example of a flow that should retry again in the future if an error occurs:
.. container:: codeset
.. sourcecode:: kotlin
class TryAccessServiceFlow(): FlowLogic<Unit>() {
override fun call() {
try {
val code = serviceHub.cordaService(HTTPService::class.java).get() // throws UnknownHostException.
} catch (e: UnknownHostException) {
// Accessing the service failed! It might be offline. Let's hospitalize this flow, and have it retry again on next node startup.
throw HospitalizeFlowException("Service might be offline!", e)
}
}
}
ProgressTracker
---------------
We can give our flow a progress tracker. This allows us to see the flow's progress visually in our node's CRaSH shell.

View File

@ -70,6 +70,10 @@ Specifically, there are two main ways a flow is hospitalized:
The time is hard to document as the notary members, if actually alive, will inform the requester of the ETA of a response.
This can occur an infinite number of times. i.e. we never give up notarising. No intervention required.
* ``HospitalizeFlowException``:
The aim of this exception is to provide user code a way to retry a flow from its last checkpoint if a known intermittent failure occurred.
Any ``HospitalizeFlowException`` that is thrown and not handled by any of the scenarios detailed above, will be kept in for observation.
* **Internal Corda errors**:
Flows that experience errors from inside the Corda statemachine, that are not handled by any of the scenarios details above, will be retried a number of times
and then kept in for observation if the error continues.

View File

@ -8,6 +8,7 @@ import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.NotaryException
@ -18,6 +19,7 @@ import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.Permissions
import net.corda.testing.contracts.DummyContract
import net.corda.testing.contracts.DummyContract.SingleOwnerState
@ -31,9 +33,14 @@ import net.corda.testing.node.internal.findCordapp
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
import java.util.Random
import java.sql.SQLException
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
class FlowHospitalTest {
@ -88,6 +95,101 @@ class FlowHospitalTest {
}
}
@Test
fun `HospitalizeFlowException thrown`() {
var observationCounter: Int = 0
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
++observationCounter
}
driver(
DriverParameters(
startNodesInProcess = true,
cordappsForAllNodes = listOf(enclosedCordapp(), findCordapp("net.corda.testing.contracts"))
)
) {
val alice = startNode(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)).getOrThrow()
val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<TimeoutException> {
aliceClient.startFlow(::ThrowingHospitalisedExceptionFlow, HospitalizeFlowException::class.java)
.returnValue.getOrThrow(5.seconds)
}
assertEquals(1, observationCounter)
}
}
@Test
fun `Custom exception wrapping HospitalizeFlowException thrown`() {
var observationCounter: Int = 0
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
++observationCounter
}
driver(
DriverParameters(
startNodesInProcess = true,
cordappsForAllNodes = listOf(enclosedCordapp(), findCordapp("net.corda.testing.contracts"))
)
) {
val alice = startNode(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)).getOrThrow()
val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<TimeoutException> {
aliceClient.startFlow(::ThrowingHospitalisedExceptionFlow, WrappingHospitalizeFlowException::class.java)
.returnValue.getOrThrow(5.seconds)
}
assertEquals(1, observationCounter)
}
}
@Test
fun `Custom exception extending HospitalizeFlowException thrown`() {
var observationCounter: Int = 0
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
++observationCounter
}
driver(
DriverParameters(
startNodesInProcess = true,
cordappsForAllNodes = listOf(enclosedCordapp(), findCordapp("net.corda.testing.contracts"))
)
) {
// one node will be enough for this testing
val alice = startNode(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)).getOrThrow()
val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<TimeoutException> {
aliceClient.startFlow(::ThrowingHospitalisedExceptionFlow, ExtendingHospitalizeFlowException::class.java)
.returnValue.getOrThrow(5.seconds)
}
assertEquals(1, observationCounter)
}
}
@Test
fun `HospitalizeFlowException cloaking an important exception thrown`() {
var dischargedCounter = 0
var observationCounter: Int = 0
StaffedFlowHospital.onFlowDischarged.add { _, _ ->
++dischargedCounter
}
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
++observationCounter
}
driver(
DriverParameters(
startNodesInProcess = true,
cordappsForAllNodes = listOf(enclosedCordapp(), findCordapp("net.corda.testing.contracts"))
)
) {
val alice = startNode(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)).getOrThrow()
val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
assertFailsWith<TimeoutException> {
aliceClient.startFlow(::ThrowingHospitalisedExceptionFlow, CloakingHospitalizeFlowException::class.java)
.returnValue.getOrThrow(5.seconds)
}
assertEquals(0, observationCounter)
// Since the flow will keep getting discharged from hospital dischargedCounter will be > 1.
assertTrue(dischargedCounter > 0)
}
}
@StartableByRPC
class IssueFlow(val notary: Party): FlowLogic<StateAndRef<SingleOwnerState>>() {
@ -165,4 +267,32 @@ class FlowHospitalTest {
class DoubleSpendException(message: String, cause: Throwable): FlowException(message, cause)
@StartableByRPC
class ThrowingHospitalisedExceptionFlow(
// Starting this Flow from an RPC client: if we pass in an encapsulated exception within another exception then the wrapping
// exception, when deserialized, will get grounded into a CordaRuntimeException (this happens in ThrowableSerializer#fromProxy).
private val hospitalizeFlowExceptionClass: Class<*>): FlowLogic<Unit>() {
@Suspendable
override fun call() {
val throwable = hospitalizeFlowExceptionClass.newInstance()
(throwable as? Throwable)?.let {
throw it
}
}
}
class WrappingHospitalizeFlowException(cause: HospitalizeFlowException = HospitalizeFlowException()) : Exception(cause)
class ExtendingHospitalizeFlowException : HospitalizeFlowException()
class CloakingHospitalizeFlowException : HospitalizeFlowException() { // HospitalizeFlowException wrapping important exception
init {
// Wrapping an SQLException with "deadlock" as a message should lead the flow being handled
// by StaffedFlowHospital#DeadlockNurse as well and therefore having the flow discharged
// and not getting it for overnight observation.
setCause(SQLException("deadlock"))
}
}
}

View File

@ -2,6 +2,7 @@ package net.corda.node.services.statemachine
import net.corda.core.crypto.newSecureRandom
import net.corda.core.flows.FlowException
import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.ReceiveTransactionFlow
import net.corda.core.flows.StateMachineRunId
@ -46,7 +47,8 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
FinalityDoctor,
TransientConnectionCardiologist,
DatabaseEndocrinologist,
TransitionErrorGeneralPractitioner
TransitionErrorGeneralPractitioner,
SedationNurse
)
@VisibleForTesting
@ -555,6 +557,24 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
}
}
/**
* Keeps the flow in for overnight observation if [HospitalizeFlowException] is received.
*/
object SedationNurse : Staff {
override fun consult(
flowFiber: FlowFiber,
currentState: StateMachineState,
newError: Throwable,
history: FlowMedicalHistory
): Diagnosis {
return if (newError.mentionsThrowable(HospitalizeFlowException::class.java)) {
Diagnosis.OVERNIGHT_OBSERVATION
} else {
Diagnosis.NOT_MY_SPECIALTY
}
}
}
}
private fun <T : Throwable> Throwable?.mentionsThrowable(exceptionType: Class<T>, errorMessage: String? = null): Boolean {