CORDA-1546: Updated the flow hospital to suspend FinalityHandler if it errors (#3304)

It will re-run automatically from last checkpoint on node restart, allowing the opportunity to resolve the issue, something required when dealing with contract constraint failures.
This commit is contained in:
Shams Asari 2018-06-07 16:18:00 +01:00 committed by GitHub
parent 9514ad6be1
commit f6a23a0216
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 329 additions and 154 deletions

View File

@ -6,6 +6,7 @@ import net.corda.core.internal.ResolveTransactionsFlow
import net.corda.core.internal.pushToLoggingContext import net.corda.core.internal.pushToLoggingContext
import net.corda.core.node.StatesToRecord import net.corda.core.node.StatesToRecord
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.trace
import net.corda.core.utilities.unwrap import net.corda.core.utilities.unwrap
import java.security.SignatureException import java.security.SignatureException
@ -33,9 +34,9 @@ class ReceiveTransactionFlow @JvmOverloads constructor(private val otherSideSess
TransactionVerificationException::class) TransactionVerificationException::class)
override fun call(): SignedTransaction { override fun call(): SignedTransaction {
if (checkSufficientSignatures) { if (checkSufficientSignatures) {
logger.trace("Receiving a transaction from ${otherSideSession.counterparty}") logger.trace { "Receiving a transaction from ${otherSideSession.counterparty}" }
} else { } else {
logger.trace("Receiving a transaction (but without checking the signatures) from ${otherSideSession.counterparty}") logger.trace { "Receiving a transaction (but without checking the signatures) from ${otherSideSession.counterparty}" }
} }
val stx = otherSideSession.receive<SignedTransaction>().unwrap { val stx = otherSideSession.receive<SignedTransaction>().unwrap {
it.pushToLoggingContext() it.pushToLoggingContext()

View File

@ -378,7 +378,8 @@ fun TransactionBuilder.toWireTransaction(services: ServicesForResolution, serial
fun TransactionBuilder.toLedgerTransaction(services: ServicesForResolution, serializationContext: SerializationContext) = toLedgerTransactionWithContext(services, serializationContext) fun TransactionBuilder.toLedgerTransaction(services: ServicesForResolution, serializationContext: SerializationContext) = toLedgerTransactionWithContext(services, serializationContext)
/** Convenience method to get the package name of a class literal. */ /** Convenience method to get the package name of a class literal. */
val KClass<*>.packageName: String get() = java.`package`.name val KClass<*>.packageName: String get() = java.packageName
val Class<*>.packageName: String get() = `package`.name
inline val Class<*>.isAbstractClass: Boolean get() = Modifier.isAbstract(modifiers) inline val Class<*>.isAbstractClass: Boolean get() = Modifier.isAbstract(modifiers)

View File

@ -161,12 +161,11 @@ class AttachmentSerializationTest {
} }
private fun rebootClientAndGetAttachmentContent(checkAttachmentsOnLoad: Boolean = true): String { private fun rebootClientAndGetAttachmentContent(checkAttachmentsOnLoad: Boolean = true): String {
client.dispose() client = mockNet.restartNode(client) { args ->
client = mockNet.createNode(InternalMockNodeParameters(client.internals.id, client.internals.configuration.myLegalName), nodeFactory = { args ->
object : InternalMockNetwork.MockNode(args) { object : InternalMockNetwork.MockNode(args) {
override fun start() = super.start().apply { attachments.checkAttachmentsOnLoad = checkAttachmentsOnLoad } override fun start() = super.start().apply { attachments.checkAttachmentsOnLoad = checkAttachmentsOnLoad }
} }
}) }
return (client.smm.allStateMachines[0].stateMachine.resultFuture.apply { mockNet.runNetwork() }.getOrThrow() as ClientResult).attachmentContent return (client.smm.allStateMachines[0].stateMachine.resultFuture.apply { mockNet.runNetwork() }.getOrThrow() as ClientResult).attachmentContent
} }

View File

@ -132,6 +132,22 @@ you require the hash of the node's installed app which supplies the specified co
hash constraints, you almost always want "whatever the current code is" and not a hard-coded hash. So this automatic hash constraints, you almost always want "whatever the current code is" and not a hard-coded hash. So this automatic
constraint placeholder is useful. constraint placeholder is useful.
FinalityFlow
------------
It's possible to encounter contract contraint issues when notarising transactions with the ``FinalityFlow`` on a network
containing multiple versions of the same CorDapp. This will happen when using hash contraints or with zone contraints
if the zone whitelist has missing CorDapp versions. If a participating party fails to validate the **notarised** transaction
then we have a scenerio where the members of the network do not have a consistent view of the ledger.
Therfore, if the finality handler flow (which is run on the counterparty) errors for any reason it will always be sent to
the flow hospital. From there it's suspended waiting to be retried on node restart. This gives the node operator the opportunity
to recover from those errors, which in the case of contract constraint voilations means either updating the CorDapp or
adding its hash to the zone whitelist.
.. note:: This is a temporary issue in the current version of Corda, until we implement some missing features which will
enable a seemless handling of differences in CorDapp versions.
CorDapps as attachments CorDapps as attachments
----------------------- -----------------------

View File

@ -581,6 +581,18 @@ We can also choose to send the transaction to additional parties who aren't one
Only one party has to call ``FinalityFlow`` for a given transaction to be recorded by all participants. It does Only one party has to call ``FinalityFlow`` for a given transaction to be recorded by all participants. It does
**not** need to be called by each participant individually. **not** need to be called by each participant individually.
Because the transaction has already been notarised and the input states consumed, if the participants when receiving the
transaction fail to verify it, or the receiving flow (the finality handler) fails due to some other error, we then have
the scenerio where not all parties have the correct up to date view of the ledger. To recover from this the finality handler
is automatically sent to the flow hospital where it's suspended and retried from its last checkpoint on node restart.
This gives the node operator the opportunity to recover from the error. Until the issue is resolved the node will continue
to retry the flow on each startup.
.. note:: It's possible to forcibly terminate the erroring finality handler using the ``killFlow`` RPC but at the risk
of an inconsistent view of the ledger.
.. note:: A future release will allow retrying hospitalised flows without restarting the node, i.e. via RPC.
CollectSignaturesFlow/SignTransactionFlow CollectSignaturesFlow/SignTransactionFlow
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The list of parties who need to sign a transaction is dictated by the transaction's commands. Once we've signed a The list of parties who need to sign a transaction is dictated by the transaction's commands. Once we've signed a

View File

@ -7,11 +7,19 @@ release, see :doc:`upgrade-notes`.
Unreleased Unreleased
========== ==========
* Introducing the flow hospital - a component of the node that manages flows that have errored and whether they should
be retried from their previous checkpoints or have their errors propagate. Currently it will respond to any error that
occurs during the resolution of a received transaction as part of ``FinalityFlow``. In such a scenerio the receiving
flow will be parked and retried on node restart. This is to allow the node operator to rectify the situation as otherwise
the node will have an incomplete view of the ledger.
* Fixed an issue preventing out of process nodes started by the ``Driver`` from logging to file. * Fixed an issue preventing out of process nodes started by the ``Driver`` from logging to file.
* Fixed an issue with ``CashException`` not being able to deserialise after the introduction of AMQP for RPC. * Fixed an issue with ``CashException`` not being able to deserialise after the introduction of AMQP for RPC.
* Removed -xmx VM argument from Explorer's Capsule setup. This helps avoiding out of memory errors. * Removed -Xmx VM argument from Explorer's Capsule setup. This helps avoiding out of memory errors.
* New ``killFlow`` RPC for killing stuck flows.
* Shell now kills an ongoing flow when CTRL+C is pressed in the terminal. * Shell now kills an ongoing flow when CTRL+C is pressed in the terminal.
@ -25,7 +33,8 @@ Unreleased
* Improved audit trail for ``FinalityFlow`` and related sub-flows. * Improved audit trail for ``FinalityFlow`` and related sub-flows.
* ``NodeStartup`` will now only print node's configuration if ``devMode`` is ``true``, avoiding the risk of printing passwords in a production setup. * The node's configuration is only printed on startup if ``devMode`` is ``true``, avoiding the risk of printing passwords
in a production setup.
* SLF4J's MDC will now only be printed to the console if not empty. No more log lines ending with "{}". * SLF4J's MDC will now only be printed to the console if not empty. No more log lines ending with "{}".

View File

@ -1,23 +0,0 @@
package net.corda.node.services.statemachine
/**
* A flow hospital is a class that is notified when a flow transitions into an error state due to an uncaught exception
* or internal error condition, and when it becomes clean again (e.g. due to a resume).
* Also see [net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor].
*/
interface FlowHospital {
/**
* The flow running in [flowFiber] has errored.
*/
fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>)
/**
* The flow running in [flowFiber] has cleaned, possibly as a result of a flow hospital resume.
*/
fun flowCleaned(flowFiber: FlowFiber)
/**
* The flow has been removed from the state machine.
*/
fun flowRemoved(flowFiber: FlowFiber)
}

View File

@ -1,25 +0,0 @@
package net.corda.node.services.statemachine
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
/**
* A simple [FlowHospital] implementation that immediately triggers error propagation when a flow dirties.
*/
object PropagatingFlowHospital : FlowHospital {
private val log = loggerFor<PropagatingFlowHospital>()
override fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
log.debug { "Flow ${flowFiber.id} in state $currentState encountered error" }
flowFiber.scheduleEvent(Event.StartErrorPropagation)
for ((index, error) in errors.withIndex()) {
log.warn("Flow ${flowFiber.id} is propagating error [$index] ", error)
}
}
override fun flowCleaned(flowFiber: FlowFiber) {
throw IllegalStateException("Flow ${flowFiber.id} cleaned after error propagation triggered")
}
override fun flowRemoved(flowFiber: FlowFiber) {}
}

View File

@ -89,6 +89,8 @@ class SingleThreadedStateMachineManager(
val timedFlows = HashMap<StateMachineRunId, ScheduledTimeout>() val timedFlows = HashMap<StateMachineRunId, ScheduledTimeout>()
} }
override val flowHospital: StaffedFlowHospital = StaffedFlowHospital()
private val mutex = ThreadBox(InnerState()) private val mutex = ThreadBox(InnerState())
private val scheduler = FiberExecutorScheduler("Same thread scheduler", executor) private val scheduler = FiberExecutorScheduler("Same thread scheduler", executor)
private val timeoutScheduler = Executors.newScheduledThreadPool(1) private val timeoutScheduler = Executors.newScheduledThreadPool(1)
@ -103,13 +105,11 @@ class SingleThreadedStateMachineManager(
private val ourSenderUUID = serviceHub.networkService.ourSenderUUID private val ourSenderUUID = serviceHub.networkService.ourSenderUUID
private var checkpointSerializationContext: SerializationContext? = null private var checkpointSerializationContext: SerializationContext? = null
private var tokenizableServices: List<Any>? = null
private var actionExecutor: ActionExecutor? = null private var actionExecutor: ActionExecutor? = null
override val allStateMachines: List<FlowLogic<*>> override val allStateMachines: List<FlowLogic<*>>
get() = mutex.locked { flows.values.map { it.fiber.logic } } get() = mutex.locked { flows.values.map { it.fiber.logic } }
private val totalStartedFlows = metrics.counter("Flows.Started") private val totalStartedFlows = metrics.counter("Flows.Started")
private val totalFinishedFlows = metrics.counter("Flows.Finished") private val totalFinishedFlows = metrics.counter("Flows.Finished")
@ -123,7 +123,6 @@ class SingleThreadedStateMachineManager(
override fun start(tokenizableServices: List<Any>) { override fun start(tokenizableServices: List<Any>) {
checkQuasarJavaAgentPresence() checkQuasarJavaAgentPresence()
this.tokenizableServices = tokenizableServices
val checkpointSerializationContext = SerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext( val checkpointSerializationContext = SerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext(
SerializeAsTokenContextImpl(tokenizableServices, SerializationDefaults.SERIALIZATION_FACTORY, SerializationDefaults.CHECKPOINT_CONTEXT, serviceHub) SerializeAsTokenContextImpl(tokenizableServices, SerializationDefaults.SERIALIZATION_FACTORY, SerializationDefaults.CHECKPOINT_CONTEXT, serviceHub)
) )
@ -753,7 +752,7 @@ class SingleThreadedStateMachineManager(
private fun makeTransitionExecutor(): TransitionExecutor { private fun makeTransitionExecutor(): TransitionExecutor {
val interceptors = ArrayList<TransitionInterceptor>() val interceptors = ArrayList<TransitionInterceptor>()
interceptors.add { HospitalisingInterceptor(StaffedFlowHospital, it) } interceptors.add { HospitalisingInterceptor(flowHospital, it) }
if (serviceHub.configuration.devMode) { if (serviceHub.configuration.devMode) {
interceptors.add { DumpHistoryOnErrorInterceptor(it) } interceptors.add { DumpHistoryOnErrorInterceptor(it) }
} }
@ -777,7 +776,7 @@ class SingleThreadedStateMachineManager(
require(lastState.pendingDeduplicationHandlers.isEmpty()) require(lastState.pendingDeduplicationHandlers.isEmpty())
require(lastState.isRemoved) require(lastState.isRemoved)
require(lastState.checkpoint.subFlowStack.size == 1) require(lastState.checkpoint.subFlowStack.size == 1)
sessionToFlow.none { it.value == flow.fiber.id } require(flow.fiber.id !in sessionToFlow.values)
flow.resultFuture.set(removalReason.flowReturnValue) flow.resultFuture.set(removalReason.flowReturnValue)
lastState.flowLogic.progressTracker?.currentStep = ProgressTracker.DONE lastState.flowLogic.progressTracker?.currentStep = ProgressTracker.DONE
changesPublisher.onNext(StateMachineManager.Change.Removed(lastState.flowLogic, Try.Success(removalReason.flowReturnValue))) changesPublisher.onNext(StateMachineManager.Change.Removed(lastState.flowLogic, Try.Success(removalReason.flowReturnValue)))

View File

@ -1,89 +1,160 @@
package net.corda.node.services.statemachine package net.corda.node.services.statemachine
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.TimedFlow import net.corda.core.internal.TimedFlow
import net.corda.core.utilities.loggerFor import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.messaging.DataFeed
import net.corda.core.utilities.contextLogger
import net.corda.node.services.FinalityHandler
import org.hibernate.exception.ConstraintViolationException
import rx.subjects.PublishSubject
import java.sql.SQLException import java.sql.SQLException
import java.time.Instant import java.time.Instant
import java.util.concurrent.ConcurrentHashMap import java.util.*
/** /**
* This hospital consults "staff" to see if they can automatically diagnose and treat flows. * This hospital consults "staff" to see if they can automatically diagnose and treat flows.
*/ */
object StaffedFlowHospital : FlowHospital { class StaffedFlowHospital {
private val log = loggerFor<StaffedFlowHospital>() private companion object {
private val log = contextLogger()
private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist, DoctorTimeout, FinalityDoctor)
}
private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist, DoctorTimeout) private val mutex = ThreadBox(object {
val patients = HashMap<StateMachineRunId, MedicalHistory>()
private val patients = ConcurrentHashMap<StateMachineRunId, MedicalHistory>() val recordsPublisher = PublishSubject.create<MedicalRecord>()
})
val numberOfPatients get() = patients.size
class MedicalHistory { class MedicalHistory {
val records: MutableList<Record> = mutableListOf() internal val records: MutableList<MedicalRecord> = mutableListOf()
sealed class Record(val suspendCount: Int) {
class Admitted(val at: Instant, suspendCount: Int) : Record(suspendCount) {
override fun toString() = "Admitted(at=$at, suspendCount=$suspendCount)"
}
class Discharged(val at: Instant, suspendCount: Int, val by: Staff, val error: Throwable) : Record(suspendCount) {
override fun toString() = "Discharged(at=$at, suspendCount=$suspendCount, by=$by)"
}
}
fun notDischargedForTheSameThingMoreThan(max: Int, by: Staff): Boolean { fun notDischargedForTheSameThingMoreThan(max: Int, by: Staff): Boolean {
val lastAdmittanceSuspendCount = (records.last() as MedicalHistory.Record.Admitted).suspendCount val lastAdmittanceSuspendCount = (records.last() as MedicalRecord.Admitted).suspendCount
return records.filterIsInstance(MedicalHistory.Record.Discharged::class.java).filter { it.by == by && it.suspendCount == lastAdmittanceSuspendCount }.count() <= max return records
.filterIsInstance<MedicalRecord.Discharged>()
.count { by in it.by && it.suspendCount == lastAdmittanceSuspendCount } <= max
} }
override fun toString(): String = "${this.javaClass.simpleName}(records = $records)" override fun toString(): String = "${this.javaClass.simpleName}(records = $records)"
} }
override fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) { /**
* The flow running in [flowFiber] has errored.
*/
fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState") log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState")
val medicalHistory = patients.computeIfAbsent(flowFiber.id) { MedicalHistory() } val suspendCount = currentState.checkpoint.numberOfSuspends
medicalHistory.records += MedicalHistory.Record.Admitted(Instant.now(), currentState.checkpoint.numberOfSuspends)
for ((index, error) in errors.withIndex()) { val event = mutex.locked {
log.info("Flow ${flowFiber.id} has error [$index]", error) val medicalHistory = patients.computeIfAbsent(flowFiber.id) { MedicalHistory() }
if (!errorIsDischarged(flowFiber, currentState, error, medicalHistory)) {
// If any error isn't discharged, then we propagate. val admitted = MedicalRecord.Admitted(flowFiber.id, Instant.now(), suspendCount)
log.warn("Flow ${flowFiber.id} error was not discharged, propagating.") medicalHistory.records += admitted
flowFiber.scheduleEvent(Event.StartErrorPropagation) recordsPublisher.onNext(admitted)
return
val report = consultStaff(flowFiber, currentState, errors, medicalHistory)
val (newRecord, event) = when (report.diagnosis) {
Diagnosis.DISCHARGE -> {
log.info("Flow ${flowFiber.id} error discharged from hospital by ${report.by}")
Pair(MedicalRecord.Discharged(flowFiber.id, Instant.now(), suspendCount, report.by, errors), Event.RetryFlowFromSafePoint)
}
Diagnosis.OVERNIGHT_OBSERVATION -> {
log.info("Flow ${flowFiber.id} error kept for overnight observation by ${report.by}")
// We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart
Pair(MedicalRecord.KeptInForObservation(flowFiber.id, Instant.now(), suspendCount, report.by, errors), null)
}
Diagnosis.NOT_MY_SPECIALTY -> {
// None of the staff care for these errors so we let them propagate
log.info("Flow ${flowFiber.id} error allowed to propagate")
Pair(MedicalRecord.NothingWeCanDo(flowFiber.id, Instant.now(), suspendCount), Event.StartErrorPropagation)
}
} }
medicalHistory.records += newRecord
recordsPublisher.onNext(newRecord)
event
}
if (event != null) {
flowFiber.scheduleEvent(event)
} }
// If all are discharged, retry.
flowFiber.scheduleEvent(Event.RetryFlowFromSafePoint)
} }
private fun errorIsDischarged(flowFiber: FlowFiber, currentState: StateMachineState, error: Throwable, medicalHistory: MedicalHistory): Boolean { private fun consultStaff(flowFiber: FlowFiber,
for (staffMember in staff) { currentState: StateMachineState,
val diagnosis = staffMember.consult(flowFiber, currentState, error, medicalHistory) errors: List<Throwable>,
if (diagnosis == Diagnosis.DISCHARGE) { medicalHistory: MedicalHistory): ConsultationReport {
medicalHistory.records += MedicalHistory.Record.Discharged(Instant.now(), currentState.checkpoint.numberOfSuspends, staffMember, error) return errors
log.info("Flow ${flowFiber.id} error discharged from hospital by $staffMember") .mapIndexed { index, error ->
return true log.info("Flow ${flowFiber.id} has error [$index]", error)
} val diagnoses: Map<Diagnosis, List<Staff>> = staff.groupBy { it.consult(flowFiber, currentState, error, medicalHistory) }
} // We're only interested in the highest priority diagnosis for the error
return false val (diagnosis, by) = diagnoses.entries.minBy { it.key }!!
ConsultationReport(error, diagnosis, by)
}
// And we're only interested in the error with the highest priority diagnosis
.minBy { it.diagnosis }!!
} }
private data class ConsultationReport(val error: Throwable, val diagnosis: Diagnosis, val by: List<Staff>)
/**
* The flow running in [flowFiber] has cleaned, possibly as a result of a flow hospital resume.
*/
// It's okay for flows to be cleaned... we fix them now! // It's okay for flows to be cleaned... we fix them now!
override fun flowCleaned(flowFiber: FlowFiber) {} fun flowCleaned(flowFiber: FlowFiber) = Unit
override fun flowRemoved(flowFiber: FlowFiber) { /**
patients.remove(flowFiber.id) * The flow has been removed from the state machine.
*/
fun flowRemoved(flowFiber: FlowFiber) {
mutex.locked { patients.remove(flowFiber.id) }
} }
// TODO MedicalRecord subtypes can expose the Staff class, something which we probably don't want when wiring this method to RPC
/** Returns a stream of medical records as flows pass through the hospital. */
fun track(): DataFeed<List<MedicalRecord>, MedicalRecord> {
return mutex.locked {
DataFeed(patients.values.flatMap { it.records }, recordsPublisher.bufferUntilSubscribed())
}
}
sealed class MedicalRecord {
abstract val flowId: StateMachineRunId
abstract val at: Instant
abstract val suspendCount: Int
data class Admitted(override val flowId: StateMachineRunId,
override val at: Instant,
override val suspendCount: Int) : MedicalRecord()
data class Discharged(override val flowId: StateMachineRunId,
override val at: Instant,
override val suspendCount: Int,
val by: List<Staff>,
val errors: List<Throwable>) : MedicalRecord()
data class KeptInForObservation(override val flowId: StateMachineRunId,
override val at: Instant,
override val suspendCount: Int,
val by: List<Staff>,
val errors: List<Throwable>) : MedicalRecord()
data class NothingWeCanDo(override val flowId: StateMachineRunId,
override val at: Instant,
override val suspendCount: Int) : MedicalRecord()
}
/** The order of the enum values are in priority order. */
enum class Diagnosis { enum class Diagnosis {
/** /** Retry from last safe point. */
* Retry from last safe point.
*/
DISCHARGE, DISCHARGE,
/** /** Park and await intervention. */
* Please try another member of staff. OVERNIGHT_OBSERVATION,
*/ /** Please try another member of staff. */
NOT_MY_SPECIALTY NOT_MY_SPECIALTY
} }
@ -122,7 +193,7 @@ object StaffedFlowHospital : FlowHospital {
} }
private fun mentionsConstraintViolation(exception: Throwable?): Boolean { private fun mentionsConstraintViolation(exception: Throwable?): Boolean {
return exception != null && (exception is org.hibernate.exception.ConstraintViolationException || mentionsConstraintViolation(exception.cause)) return exception != null && (exception is ConstraintViolationException || mentionsConstraintViolation(exception.cause))
} }
} }
@ -152,4 +223,13 @@ object StaffedFlowHospital : FlowHospital {
} }
} }
} }
/**
* Parks [FinalityHandler]s for observation.
*/
object FinalityDoctor : Staff {
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis {
return if (currentState.flowLogic is FinalityHandler) Diagnosis.OVERNIGHT_OBSERVATION else Diagnosis.NOT_MY_SPECIALTY
}
}
} }

View File

@ -82,6 +82,8 @@ interface StateMachineManager {
* The event may be replayed if a flow fails and attempts to retry. * The event may be replayed if a flow fails and attempts to retry.
*/ */
fun deliverExternalEvent(event: ExternalEvent) fun deliverExternalEvent(event: ExternalEvent)
val flowHospital: StaffedFlowHospital
} }
// These must be idempotent! A later failure in the state transition may error the flow state, and a replay may call // These must be idempotent! A later failure in the state transition may error the flow state, and a replay may call

View File

@ -12,7 +12,7 @@ import java.util.concurrent.ConcurrentHashMap
* transition. * transition.
*/ */
class HospitalisingInterceptor( class HospitalisingInterceptor(
private val flowHospital: FlowHospital, private val flowHospital: StaffedFlowHospital,
private val delegate: TransitionExecutor private val delegate: TransitionExecutor
) : TransitionExecutor { ) : TransitionExecutor {
private val hospitalisedFlows = ConcurrentHashMap<StateMachineRunId, FlowFiber>() private val hospitalisedFlows = ConcurrentHashMap<StateMachineRunId, FlowFiber>()

View File

@ -0,0 +1,95 @@
package net.corda.node.services
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.StateMachineRunId
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.finance.POUNDS
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.issuedBy
import net.corda.node.internal.StartedNode
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.node.services.statemachine.StaffedFlowHospital.MedicalRecord.KeptInForObservation
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.startFlow
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Test
class FinalityHandlerTest {
private lateinit var mockNet: InternalMockNetwork
@After
fun cleanUp() {
mockNet.stopNodes()
}
@Test
fun `sent to flow hospital on error and attempted retry on node restart`() {
// Setup a network where only Alice has the finance CorDapp and it sends a cash tx to Bob who doesn't have the
// CorDapp. Bob's FinalityHandler will error when validating the tx.
mockNet = InternalMockNetwork(cordappPackages = emptyList())
val alice = mockNet.createNode(InternalMockNodeParameters(
legalName = ALICE_NAME,
extraCordappPackages = listOf("net.corda.finance.contracts.asset")
))
var bob = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME))
val stx = TransactionBuilder(mockNet.defaultNotaryIdentity).let {
Cash().generateIssue(
it,
1000.POUNDS.issuedBy(alice.info.singleIdentity().ref(0)),
bob.info.singleIdentity(),
mockNet.defaultNotaryIdentity
)
alice.services.signInitialTransaction(it)
}
val finalityHandlerIdFuture = bob.smm.track()
.updates
.filter { it.logic is FinalityHandler }
.map { it.logic.runId }
.toFuture()
val finalisedTx = alice.services.startFlow(FinalityFlow(stx)).run {
mockNet.runNetwork()
resultFuture.getOrThrow()
}
val finalityHandlerId = finalityHandlerIdFuture.getOrThrow()
bob.assertFlowSentForObservation(finalityHandlerId)
assertThat(bob.getTransaction(finalisedTx.id)).isNull()
bob = mockNet.restartNode(bob)
// Since we've not done anything to fix the orignal error, we expect the finality handler to be sent to the hospital
// again on restart
bob.assertFlowSentForObservation(finalityHandlerId)
assertThat(bob.getTransaction(finalisedTx.id)).isNull()
}
private fun StartedNode<*>.assertFlowSentForObservation(runId: StateMachineRunId) {
val keptInForObservation = smm.flowHospital
.track()
.let { it.updates.startWith(it.snapshot) }
.filter { it.flowId == runId }
.ofType(KeptInForObservation::class.java)
.toBlocking()
.first()
assertThat(keptInForObservation.by).contains(StaffedFlowHospital.FinalityDoctor)
}
private fun StartedNode<*>.getTransaction(id: SecureHash): SignedTransaction? {
return database.transaction {
services.validatedTransactions.getTransaction(id)
}
}
}

View File

@ -692,9 +692,6 @@ class FlowFrameworkPersistenceTests {
fun `flow loaded from checkpoint will respond to messages from before start`() { fun `flow loaded from checkpoint will respond to messages from before start`() {
aliceNode.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) } aliceNode.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) }
bobNode.services.startFlow(ReceiveFlow(alice).nonTerminating()) // Prepare checkpointed receive flow bobNode.services.startFlow(ReceiveFlow(alice).nonTerminating()) // Prepare checkpointed receive flow
// Make sure the add() has finished initial processing.
bobNode.internals.disableDBCloseOnStop()
bobNode.dispose() // kill receiver
val restoredFlow = bobNode.restartAndGetRestoredFlow<ReceiveFlow>() val restoredFlow = bobNode.restartAndGetRestoredFlow<ReceiveFlow>()
assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello") assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello")
} }
@ -752,14 +749,11 @@ class FlowFrameworkPersistenceTests {
//////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////
//region Helpers //region Helpers
private inline fun <reified P : FlowLogic<*>> StartedNode<MockNode>.restartAndGetRestoredFlow() = internals.run { private inline fun <reified P : FlowLogic<*>> StartedNode<MockNode>.restartAndGetRestoredFlow(): P {
disableDBCloseOnStop() // Handover DB to new node copy val newNode = mockNet.restartNode(this)
stop()
val newNode = mockNet.createNode(InternalMockNodeParameters(id, configuration.myLegalName))
newNode.internals.acceptableLiveFiberCountOnStop = 1 newNode.internals.acceptableLiveFiberCountOnStop = 1
manuallyCloseDB()
mockNet.runNetwork() mockNet.runNetwork()
newNode.getSingleFlow<P>().first return newNode.getSingleFlow<P>().first
} }
private fun assertSessionTransfers(vararg expected: SessionTransfer) { private fun assertSessionTransfers(vararg expected: SessionTransfer) {

View File

@ -7,6 +7,8 @@ import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.packageName
import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.MessageRecipients
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap import net.corda.core.utilities.unwrap
@ -15,10 +17,13 @@ import net.corda.node.services.messaging.Message
import net.corda.node.services.persistence.DBTransactionStorage import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.nodeapi.internal.persistence.contextTransaction import net.corda.nodeapi.internal.persistence.contextTransaction
import net.corda.testing.node.internal.InternalMockNetwork import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNetwork.MockNode
import net.corda.testing.node.internal.MessagingServiceSpy import net.corda.testing.node.internal.MessagingServiceSpy
import net.corda.testing.node.internal.newContext import net.corda.testing.node.internal.newContext
import net.corda.testing.node.internal.setMessagingServiceSpy import net.corda.testing.node.internal.setMessagingServiceSpy
import org.assertj.core.api.Assertions import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.hibernate.exception.ConstraintViolationException
import org.junit.After import org.junit.After
import org.junit.Before import org.junit.Before
import org.junit.Test import org.junit.Test
@ -30,21 +35,23 @@ import kotlin.test.assertNull
class RetryFlowMockTest { class RetryFlowMockTest {
private lateinit var mockNet: InternalMockNetwork private lateinit var mockNet: InternalMockNetwork
private lateinit var internalNodeA: StartedNode<InternalMockNetwork.MockNode> private lateinit var nodeA: StartedNode<MockNode>
private lateinit var internalNodeB: StartedNode<InternalMockNetwork.MockNode> private lateinit var nodeB: StartedNode<MockNode>
@Before @Before
fun start() { fun start() {
mockNet = InternalMockNetwork(threadPerNode = true, cordappPackages = listOf(this.javaClass.`package`.name)) mockNet = InternalMockNetwork(threadPerNode = true, cordappPackages = listOf(this.javaClass.packageName))
internalNodeA = mockNet.createNode() nodeA = mockNet.createNode()
internalNodeB = mockNet.createNode() nodeB = mockNet.createNode()
mockNet.startNodes() mockNet.startNodes()
RetryFlow.count = 0 RetryFlow.count = 0
SendAndRetryFlow.count = 0 SendAndRetryFlow.count = 0
RetryInsertFlow.count = 0 RetryInsertFlow.count = 0
} }
private fun <T> StartedNode<InternalMockNetwork.MockNode>.startFlow(logic: FlowLogic<T>): CordaFuture<T> = this.services.startFlow(logic, this.services.newContext()).getOrThrow().resultFuture private fun <T> StartedNode<MockNode>.startFlow(logic: FlowLogic<T>): CordaFuture<T> {
return this.services.startFlow(logic, this.services.newContext()).flatMap { it.resultFuture }
}
@After @After
fun cleanUp() { fun cleanUp() {
@ -53,14 +60,14 @@ class RetryFlowMockTest {
@Test @Test
fun `Single retry`() { fun `Single retry`() {
assertEquals(Unit, internalNodeA.startFlow(RetryFlow(1)).get()) assertEquals(Unit, nodeA.startFlow(RetryFlow(1)).get())
assertEquals(2, RetryFlow.count) assertEquals(2, RetryFlow.count)
} }
@Test @Test
fun `Retry forever`() { fun `Retry forever`() {
Assertions.assertThatThrownBy { assertThatThrownBy {
internalNodeA.startFlow(RetryFlow(Int.MAX_VALUE)).getOrThrow() nodeA.startFlow(RetryFlow(Int.MAX_VALUE)).getOrThrow()
}.isInstanceOf(LimitedRetryCausingError::class.java) }.isInstanceOf(LimitedRetryCausingError::class.java)
assertEquals(5, RetryFlow.count) assertEquals(5, RetryFlow.count)
} }
@ -68,14 +75,14 @@ class RetryFlowMockTest {
@Test @Test
fun `Retry does not set senderUUID`() { fun `Retry does not set senderUUID`() {
val messagesSent = mutableListOf<Message>() val messagesSent = mutableListOf<Message>()
val partyB = internalNodeB.info.legalIdentities.first() val partyB = nodeB.info.legalIdentities.first()
internalNodeA.setMessagingServiceSpy(object : MessagingServiceSpy(internalNodeA.network) { nodeA.setMessagingServiceSpy(object : MessagingServiceSpy(nodeA.network) {
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) { override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
messagesSent.add(message) messagesSent.add(message)
messagingService.send(message, target) messagingService.send(message, target)
} }
}) })
internalNodeA.startFlow(SendAndRetryFlow(1, partyB)).get() nodeA.startFlow(SendAndRetryFlow(1, partyB)).get()
assertNotNull(messagesSent.first().senderUUID) assertNotNull(messagesSent.first().senderUUID)
assertNull(messagesSent.last().senderUUID) assertNull(messagesSent.last().senderUUID)
assertEquals(2, SendAndRetryFlow.count) assertEquals(2, SendAndRetryFlow.count)
@ -83,26 +90,25 @@ class RetryFlowMockTest {
@Test @Test
fun `Retry duplicate insert`() { fun `Retry duplicate insert`() {
assertEquals(Unit, internalNodeA.startFlow(RetryInsertFlow(1)).get()) assertEquals(Unit, nodeA.startFlow(RetryInsertFlow(1)).get())
assertEquals(2, RetryInsertFlow.count) assertEquals(2, RetryInsertFlow.count)
} }
@Test @Test
fun `Patient records do not leak in hospital`() { fun `Patient records do not leak in hospital`() {
val patientCountBefore = StaffedFlowHospital.numberOfPatients assertEquals(Unit, nodeA.startFlow(RetryFlow(1)).get())
assertEquals(Unit, internalNodeA.startFlow(RetryFlow(1)).get())
// Need to make sure the state machine has finished. Otherwise this test is flakey. // Need to make sure the state machine has finished. Otherwise this test is flakey.
mockNet.waitQuiescent() mockNet.waitQuiescent()
assertEquals(patientCountBefore, StaffedFlowHospital.numberOfPatients) assertThat(nodeA.smm.flowHospital.track().snapshot).isEmpty()
assertEquals(2, RetryFlow.count) assertEquals(2, RetryFlow.count)
} }
} }
class LimitedRetryCausingError : org.hibernate.exception.ConstraintViolationException("Test message", SQLException(), "Test constraint") class LimitedRetryCausingError : ConstraintViolationException("Test message", SQLException(), "Test constraint")
class RetryCausingError : SQLException("deadlock") class RetryCausingError : SQLException("deadlock")
class RetryFlow(val i: Int) : FlowLogic<Unit>() { class RetryFlow(private val i: Int) : FlowLogic<Unit>() {
companion object { companion object {
var count = 0 var count = 0
} }
@ -121,7 +127,7 @@ class RetryFlow(val i: Int) : FlowLogic<Unit>() {
} }
@InitiatingFlow @InitiatingFlow
class SendAndRetryFlow(val i: Int, val other: Party) : FlowLogic<Unit>() { class SendAndRetryFlow(private val i: Int, private val other: Party) : FlowLogic<Unit>() {
companion object { companion object {
var count = 0 var count = 0
} }
@ -137,8 +143,9 @@ class SendAndRetryFlow(val i: Int, val other: Party) : FlowLogic<Unit>() {
} }
} }
@Suppress("unused")
@InitiatedBy(SendAndRetryFlow::class) @InitiatedBy(SendAndRetryFlow::class)
class ReceiveFlow2(val other: FlowSession) : FlowLogic<Unit>() { class ReceiveFlow2(private val other: FlowSession) : FlowLogic<Unit>() {
@Suspendable @Suspendable
override fun call() { override fun call() {
val received = other.receive<String>().unwrap { it } val received = other.receive<String>().unwrap { it }
@ -146,7 +153,7 @@ class ReceiveFlow2(val other: FlowSession) : FlowLogic<Unit>() {
} }
} }
class RetryInsertFlow(val i: Int) : FlowLogic<Unit>() { class RetryInsertFlow(private val i: Int) : FlowLogic<Unit>() {
companion object { companion object {
var count = 0 var count = 0
} }

View File

@ -30,7 +30,6 @@ import net.corda.nodeapi.internal.persistence.HibernateConfiguration
import net.corda.testing.core.singleIdentity import net.corda.testing.core.singleIdentity
import net.corda.testing.internal.rigorousMock import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.internal.InternalMockNetwork import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.startFlow import net.corda.testing.node.internal.startFlow
import org.junit.After import org.junit.After
import org.junit.Test import org.junit.Test
@ -71,8 +70,7 @@ class NodePair(private val mockNet: InternalMockNetwork) {
client.services.startFlow(clientLogic) client.services.startFlow(clientLogic)
while (!serverRunning.get()) mockNet.runNetwork(1) while (!serverRunning.get()) mockNet.runNetwork(1)
if (rebootClient) { if (rebootClient) {
client.dispose() client = mockNet.restartNode(client)
client = mockNet.createNode(InternalMockNodeParameters(client.internals.id))
} }
return uncheckedCast(client.smm.allStateMachines.single().stateMachine) return uncheckedCast(client.smm.allStateMachines.single().stateMachine)
} }

View File

@ -385,6 +385,17 @@ open class InternalMockNetwork(private val cordappPackages: List<String>,
return node return node
} }
fun <N : MockNode> restartNode(node: StartedNode<N>, nodeFactory: (MockNodeArgs) -> N): StartedNode<N> {
node.internals.disableDBCloseOnStop()
node.dispose()
return createNode(
InternalMockNodeParameters(legalName = node.internals.configuration.myLegalName, forcedID = node.internals.id),
nodeFactory
)
}
fun restartNode(node: StartedNode<MockNode>): StartedNode<MockNode> = restartNode(node, defaultFactory)
fun baseDirectory(nodeId: Int): Path = filesystem.getPath("/nodes/$nodeId") fun baseDirectory(nodeId: Int): Path = filesystem.getPath("/nodes/$nodeId")
/** /**
@ -439,8 +450,7 @@ open class InternalMockNetwork(private val cordappPackages: List<String>,
messagingNetwork.stop() messagingNetwork.stop()
} }
// Test method to block until all scheduled activity, active flows /** Block until all scheduled activity, active flows and network activity has ceased. */
// and network activity has ceased.
fun waitQuiescent() { fun waitQuiescent() {
busyLatch.await() busyLatch.await()
} }