Merge remote-tracking branch 'open/master' into os-merge-21280c0

# Conflicts:
#	node/src/main/kotlin/net/corda/node/services/statemachine/FlowHospital.kt
#	node/src/main/kotlin/net/corda/node/services/statemachine/PropagatingFlowHospital.kt
This commit is contained in:
Shams Asari 2018-06-07 18:08:37 +01:00
commit 7886ef9450
20 changed files with 588 additions and 174 deletions

View File

@ -16,6 +16,7 @@ import net.corda.core.internal.ResolveTransactionsFlow
import net.corda.core.internal.pushToLoggingContext
import net.corda.core.node.StatesToRecord
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.trace
import net.corda.core.utilities.unwrap
import java.security.SignatureException
@ -43,9 +44,9 @@ class ReceiveTransactionFlow @JvmOverloads constructor(private val otherSideSess
TransactionVerificationException::class)
override fun call(): SignedTransaction {
if (checkSufficientSignatures) {
logger.trace("Receiving a transaction from ${otherSideSession.counterparty}")
logger.trace { "Receiving a transaction from ${otherSideSession.counterparty}" }
} else {
logger.trace("Receiving a transaction (but without checking the signatures) from ${otherSideSession.counterparty}")
logger.trace { "Receiving a transaction (but without checking the signatures) from ${otherSideSession.counterparty}" }
}
val stx = otherSideSession.receive<SignedTransaction>().unwrap {
it.pushToLoggingContext()

View File

@ -388,7 +388,8 @@ fun TransactionBuilder.toWireTransaction(services: ServicesForResolution, serial
fun TransactionBuilder.toLedgerTransaction(services: ServicesForResolution, serializationContext: SerializationContext) = toLedgerTransactionWithContext(services, serializationContext)
/** Convenience method to get the package name of a class literal. */
val KClass<*>.packageName: String get() = java.`package`.name
val KClass<*>.packageName: String get() = java.packageName
val Class<*>.packageName: String get() = `package`.name
inline val Class<*>.isAbstractClass: Boolean get() = Modifier.isAbstract(modifiers)

View File

@ -171,12 +171,11 @@ class AttachmentSerializationTest {
}
private fun rebootClientAndGetAttachmentContent(checkAttachmentsOnLoad: Boolean = true): String {
client.dispose()
client = mockNet.createNode(InternalMockNodeParameters(client.internals.id, client.internals.configuration.myLegalName), nodeFactory = { args ->
client = mockNet.restartNode(client) { args ->
object : InternalMockNetwork.MockNode(args) {
override fun start() = super.start().apply { attachments.checkAttachmentsOnLoad = checkAttachmentsOnLoad }
}
})
}
return (client.smm.allStateMachines[0].stateMachine.resultFuture.apply { mockNet.runNetwork() }.getOrThrow() as ClientResult).attachmentContent
}

View File

@ -132,6 +132,22 @@ you require the hash of the node's installed app which supplies the specified co
hash constraints, you almost always want "whatever the current code is" and not a hard-coded hash. So this automatic
constraint placeholder is useful.
FinalityFlow
------------
It's possible to encounter contract contraint issues when notarising transactions with the ``FinalityFlow`` on a network
containing multiple versions of the same CorDapp. This will happen when using hash contraints or with zone contraints
if the zone whitelist has missing CorDapp versions. If a participating party fails to validate the **notarised** transaction
then we have a scenerio where the members of the network do not have a consistent view of the ledger.
Therfore, if the finality handler flow (which is run on the counterparty) errors for any reason it will always be sent to
the flow hospital. From there it's suspended waiting to be retried on node restart. This gives the node operator the opportunity
to recover from those errors, which in the case of contract constraint voilations means either updating the CorDapp or
adding its hash to the zone whitelist.
.. note:: This is a temporary issue in the current version of Corda, until we implement some missing features which will
enable a seemless handling of differences in CorDapp versions.
CorDapps as attachments
-----------------------

View File

@ -519,6 +519,18 @@ We can also choose to send the transaction to additional parties who aren't one
Only one party has to call ``FinalityFlow`` for a given transaction to be recorded by all participants. It does
**not** need to be called by each participant individually.
Because the transaction has already been notarised and the input states consumed, if the participants when receiving the
transaction fail to verify it, or the receiving flow (the finality handler) fails due to some other error, we then have
the scenerio where not all parties have the correct up to date view of the ledger. To recover from this the finality handler
is automatically sent to the flow hospital where it's suspended and retried from its last checkpoint on node restart.
This gives the node operator the opportunity to recover from the error. Until the issue is resolved the node will continue
to retry the flow on each startup.
.. note:: It's possible to forcibly terminate the erroring finality handler using the ``killFlow`` RPC but at the risk
of an inconsistent view of the ledger.
.. note:: A future release will allow retrying hospitalised flows without restarting the node, i.e. via RPC.
CollectSignaturesFlow/SignTransactionFlow
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The list of parties who need to sign a transaction is dictated by the transaction's commands. Once we've signed a

View File

@ -21,6 +21,11 @@ done by specifying the installation target on the command line:
sudo -H pip install --install-option '--install-data=/usr/local' Sphinx
sudo -H pip install --install-option '--install-data=/usr/local' sphinx_rtd_theme
.. warning:: When installing Sphinx, you may see the following error message: "Found existing installation: six 1.4.1
Cannot uninstall 'six'. It is a distutils installed project and thus we cannot accurately determine which files
belong to it which would lead to only a partial uninstall.". If so, run the install with the
``--ignore-installed six`` flag.
Build
-----

View File

@ -8,11 +8,19 @@ Unreleased
==========
* Introduced a hierarchy of ``DatabaseMigrationException``s, allowing ``NodeStartup`` to gracefully inform users of problems related to database migrations before exiting with a non-zero code.
* Introducing the flow hospital - a component of the node that manages flows that have errored and whether they should
be retried from their previous checkpoints or have their errors propagate. Currently it will respond to any error that
occurs during the resolution of a received transaction as part of ``FinalityFlow``. In such a scenerio the receiving
flow will be parked and retried on node restart. This is to allow the node operator to rectify the situation as otherwise
the node will have an incomplete view of the ledger.
* Fixed an issue preventing out of process nodes started by the ``Driver`` from logging to file.
* Fixed an issue with ``CashException`` not being able to deserialise after the introduction of AMQP for RPC.
* Removed -xmx VM argument from Explorer's Capsule setup. This helps avoiding out of memory errors.
* Removed -Xmx VM argument from Explorer's Capsule setup. This helps avoiding out of memory errors.
* New ``killFlow`` RPC for killing stuck flows.
* Shell now kills an ongoing flow when CTRL+C is pressed in the terminal.
@ -26,7 +34,8 @@ Unreleased
* Improved audit trail for ``FinalityFlow`` and related sub-flows.
* ``NodeStartup`` will now only print node's configuration if ``devMode`` is ``true``, avoiding the risk of printing passwords in a production setup.
* The node's configuration is only printed on startup if ``devMode`` is ``true``, avoiding the risk of printing passwords
in a production setup.
* SLF4J's MDC will now only be printed to the console if not empty. No more log lines ending with "{}".

View File

@ -0,0 +1,253 @@
Deterministic Corda Modules
===========================
A Corda contract's verify function should always produce the same results for the same input data. To that end,
Corda provides the following modules:
#. ``core-deterministic``
#. ``serialization-deterministic``
#. ``jdk8u-deterministic``
These are reduced version of Corda's ``core`` and ``serialization`` modules and the OpenJDK 8 ``rt.jar``, where the
non-deterministic functionality has been removed. The intention here is that all CorDapp classes required for
contract verification should be compiled against these modules to prevent them containing non-deterministic behaviour.
.. note:: These modules are only a development aid. They cannot guarantee determinism without also including
deterministic versions of all their dependent libraries, e.g. ``kotlin-stdlib``.
Generating the Deterministic Modules
------------------------------------
JDK 8
``jdk8u-deterministic`` is a "pseudo JDK" image that we can point the Java and Kotlin compilers to. It downloads the
``rt.jar`` containing a deterministic subset of the Java 8 APIs from the Artifactory.
To build a new version of this JAR and upload it to the Artifactory, see the ``create-jdk8u`` module. This is a
standalone Gradle project within the Corda repository that will clone the ``deterministic-jvm8`` branch of Corda's
`OpenJDK repository <https://github.com/corda/openjdk>`_ and then build it. (This currently requires a C++ compiler,
GNU Make and a UNIX-like development environment.)
Corda Modules
``core-deterministic`` and ``serialization-deterministic`` are generated from Corda's ``core`` and ``serialization``
modules respectively using both `ProGuard <https://www.guardsquare.com/en/proguard>`_ and Corda's ``JarFilter`` Gradle
plugin. Corda developers configure these tools by applying Corda's ``@Deterministic`` and ``@NonDeterministic``
annotations to elements of ``core`` and ``serialization`` as described `here <deterministic_annotations_>`_.
The build generates each of Corda's deterministic JARs in six steps:
#. Some *very few* classes in the original JAR must be replaced completely. This is typically because the original
class uses something like ``ThreadLocal``, which is not available in the deterministic Java APIs, and yet the
class is still required by the deterministic JAR. We must keep such classes to a minimum!
#. The patched JAR is analysed by ProGuard for the first time using the following rule:
.. sourcecode:: groovy
keep '@interface net.corda.core.Deterministic { *; }'
..
ProGuard works by calculating how much code is reachable from given "entry points", and in our case these entry
points are the ``@Deterministic`` classes. The unreachable classes are then discarded by ProGuard's ``shrink``
option.
#. The remaining classes may still contain non-deterministic code. However, there is no way of writing a ProGuard rule
explicitly to discard anything. Consider the following class:
.. sourcecode:: kotlin
@CordaSerializable
@Deterministic
data class UniqueIdentifier(val externalId: String?, val id: UUID) : Comparable<UniqueIdentifier> {
@NonDeterministic constructor(externalId: String?) : this(externalId, UUID.randomUUID())
@NonDeterministic constructor() : this(null)
...
}
..
While CorDapps will definitely need to handle ``UniqueIdentifier`` objects, both of the secondary constructors
generate a new random ``UUID`` and so are non-deterministic. Hence the next "determinising" step is to pass the
classes to the ``JarFilter`` tool, which strips out all of the elements which have been annotated as
``@NonDeterministic`` and stubs out any functions annotated with ``@NonDeterministicStub``. (Stub functions that
return a value will throw ``UnsupportedOperationException``, whereas ``void`` or ``Unit`` stubs will do nothing.)
#. After the ``@NonDeterministic`` elements have been filtered out, the classes are rescanned using ProGuard to remove
any more code that has now become unreachable.
#. The remaining classes define our deterministic subset. However, the ``@kotlin.Metadata`` annotations on the compiled
Kotlin classes still contain references to all of the functions and properties that ProGuard has deleted. Therefore
we now use the ``JarFilter`` to delete these references, as otherwise the Kotlin compiler will pretend that the
deleted functions and properties are still present.
#. Finally, we use ProGuard again to validate our JAR against the deterministic ``rt.jar``:
.. sourcecode:: groovy
task checkDeterminism(type: ProGuardTask, dependsOn: jdkTask) {
injars metafix
libraryjars "$deterministic_jdk_home/jre/lib/rt.jar"
configurations.runtimeLibraries.forEach {
libraryjars it.path, filter: '!META-INF/versions/**'
}
keepattributes '*'
dontpreverify
dontobfuscate
dontoptimize
verbose
keep 'class *'
}
..
This step will fail if ProGuard spots any Java API references that still cannot be satisfied by the deterministic
``rt.jar``, and hence it will break the build.
Testing the Deterministic Modules
---------------------------------
The ``core-deterministic:testing`` module executes some basic JUnit tests for the ``core-deterministic`` and
``serialization-deterministic`` JARs. These tests are compiled against the deterministic ``rt.jar``, although
they are still executed using the full JDK.
The ``testing`` module also has two sub-modules:
``core-deterministic:testing:data``
This module generates test data such as serialised transactions and elliptic curve key pairs using the full
non-deterministic ``core`` library and JDK. This data is all written into a single JAR which the ``testing``
module adds to its classpath.
``core-deterministic:testing:common``
This module provides the test classes which the ``testing`` and ``data`` modules need to share. It is therefore
compiled against the deterministic API subset.
.. _deterministic_annotations:
Applying @Deterministic and @NonDeterministic annotations
---------------------------------------------------------
Corda developers need to understand how to annotate classes in the ``core`` and ``serialization`` modules correctly
in order to maintain the deterministic JARs.
.. note:: Every Kotlin class still has its own ``.class`` file, even when all of those classes share the same
source file. Also, annotating the file:
.. sourcecode:: kotlin
@file:Deterministic
package net.corda.core.internal
..
*does not* automatically annotate any class declared *within* this file. It merely annotates any
accompanying Kotlin ``xxxKt`` class.
For more information about how ``JarFilter`` is processing the byte-code inside ``core`` and ``serialization``,
use Gradle's ``--info`` or ``--debug`` command-line options.
Deterministic Classes
Classes that *must* be included in the deterministic JAR should be annotated as ``@Deterministic``.
.. sourcecode:: kotlin
@Target(FILE, CLASS)
@Retention(BINARY)
@CordaInternal
annotation class Deterministic
..
To preserve any Kotlin functions, properties or type aliases that have been declared outside of a ``class``,
you should annotate the source file's ``package`` declaration instead:
.. sourcecode:: kotlin
@file:JvmName("InternalUtils")
@file:Deterministic
package net.corda.core.internal
infix fun Temporal.until(endExclusive: Temporal): Duration = Duration.between(this, endExclusive)
..
Non-Deterministic Elements
Elements that *must* be deleted from classes in the deterministic JAR should be annotated as ``@NonDeterministic``.
.. sourcecode:: kotlin
@Target(
FILE,
CLASS,
CONSTRUCTOR,
FUNCTION,
PROPERTY_GETTER,
PROPERTY_SETTER,
PROPERTY,
FIELD,
TYPEALIAS
)
@Retention(BINARY)
@CordaInternal
annotation class NonDeterministic
..
You must also ensure that a deterministic class's primary constructor does not reference any classes that are
not available in the deterministic ``rt.jar``, nor have any non-deterministic default parameter values such as
``UUID.randomUUID()``. The biggest risk here would be that ``JarFilter`` would delete the primary constructor
and that the class could no longer be instantiated, although ``JarFilter`` will print a warning in this case.
However, it is also likely that the "determinised" class would have a different serialisation signature than
its non-deterministic version and so become unserialisable on the deterministic JVM.
Be aware that package-scoped Kotlin properties are all initialised within a common ``<clinit>`` block inside
their host ``.class`` file. This means that when ``JarFilter`` deletes these properties, it cannot also remove
their initialisation code. For example:
.. sourcecode:: kotlin
package net.corda.core
@NonDeterministic
val map: MutableMap<String, String> = ConcurrentHashMap()
..
In this case, ``JarFilter`` would delete the ``map`` property but the ``<clinit>`` block would still create
an instance of ``ConcurrentHashMap``. The solution here is to refactor the property into its own file and then
annotate the file itself as ``@NonDeterministic`` instead.
Non-Deterministic Function Stubs
Sometimes it is impossible to delete a function entirely. Or a function may have some non-deterministic code
embedded inside it that cannot be removed. For these rare cases, there is the ``@NonDeterministicStub``
annotation:
.. sourcecode:: kotlin
@Target(
CONSTRUCTOR,
FUNCTION,
PROPERTY_GETTER,
PROPERTY_SETTER
)
@Retention(BINARY)
@CordaInternal
annotation class NonDeterministicStub
..
This annotation instructs ``JarFilter`` to replace the function's body with either an empty body (for functions
that return ``void`` or ``Unit``) or one that throws ``UnsupportedOperationException``. For example:
.. sourcecode:: kotlin
fun necessaryCode() {
nonDeterministicOperations()
otherOperations()
}
@NonDeterministicStub
private fun nonDeterministicOperations() {
// etc
}
..

View File

@ -71,5 +71,6 @@ We look forward to seeing what you can do with Corda!
release-process-index.rst
corda-repo-layout.rst
deterministic-modules.rst
building-the-docs.rst
json.rst

View File

@ -1,33 +0,0 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
package net.corda.node.services.statemachine
/**
* A flow hospital is a class that is notified when a flow transitions into an error state due to an uncaught exception
* or internal error condition, and when it becomes clean again (e.g. due to a resume).
* Also see [net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor].
*/
interface FlowHospital {
/**
* The flow running in [flowFiber] has errored.
*/
fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>)
/**
* The flow running in [flowFiber] has cleaned, possibly as a result of a flow hospital resume.
*/
fun flowCleaned(flowFiber: FlowFiber)
/**
* The flow has been removed from the state machine.
*/
fun flowRemoved(flowFiber: FlowFiber)
}

View File

@ -1,35 +0,0 @@
/*
* R3 Proprietary and Confidential
*
* Copyright (c) 2018 R3 Limited. All rights reserved.
*
* The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law.
*
* Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited.
*/
package net.corda.node.services.statemachine
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
/**
* A simple [FlowHospital] implementation that immediately triggers error propagation when a flow dirties.
*/
object PropagatingFlowHospital : FlowHospital {
private val log = loggerFor<PropagatingFlowHospital>()
override fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
log.debug { "Flow ${flowFiber.id} in state $currentState encountered error" }
flowFiber.scheduleEvent(Event.StartErrorPropagation)
for ((index, error) in errors.withIndex()) {
log.warn("Flow ${flowFiber.id} is propagating error [$index] ", error)
}
}
override fun flowCleaned(flowFiber: FlowFiber) {
throw IllegalStateException("Flow ${flowFiber.id} cleaned after error propagation triggered")
}
override fun flowRemoved(flowFiber: FlowFiber) {}
}

View File

@ -99,6 +99,8 @@ class SingleThreadedStateMachineManager(
val timedFlows = HashMap<StateMachineRunId, ScheduledTimeout>()
}
override val flowHospital: StaffedFlowHospital = StaffedFlowHospital()
private val mutex = ThreadBox(InnerState())
private val scheduler = FiberExecutorScheduler("Same thread scheduler", executor)
private val timeoutScheduler = Executors.newScheduledThreadPool(1)
@ -113,13 +115,11 @@ class SingleThreadedStateMachineManager(
private val ourSenderUUID = serviceHub.networkService.ourSenderUUID
private var checkpointSerializationContext: SerializationContext? = null
private var tokenizableServices: List<Any>? = null
private var actionExecutor: ActionExecutor? = null
override val allStateMachines: List<FlowLogic<*>>
get() = mutex.locked { flows.values.map { it.fiber.logic } }
private val totalStartedFlows = metrics.counter("Flows.Started")
private val totalFinishedFlows = metrics.counter("Flows.Finished")
@ -133,7 +133,6 @@ class SingleThreadedStateMachineManager(
override fun start(tokenizableServices: List<Any>) {
checkQuasarJavaAgentPresence()
this.tokenizableServices = tokenizableServices
val checkpointSerializationContext = SerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext(
SerializeAsTokenContextImpl(tokenizableServices, SerializationDefaults.SERIALIZATION_FACTORY, SerializationDefaults.CHECKPOINT_CONTEXT, serviceHub)
)
@ -763,7 +762,7 @@ class SingleThreadedStateMachineManager(
private fun makeTransitionExecutor(): TransitionExecutor {
val interceptors = ArrayList<TransitionInterceptor>()
interceptors.add { HospitalisingInterceptor(StaffedFlowHospital, it) }
interceptors.add { HospitalisingInterceptor(flowHospital, it) }
if (serviceHub.configuration.devMode) {
interceptors.add { DumpHistoryOnErrorInterceptor(it) }
}
@ -787,7 +786,7 @@ class SingleThreadedStateMachineManager(
require(lastState.pendingDeduplicationHandlers.isEmpty())
require(lastState.isRemoved)
require(lastState.checkpoint.subFlowStack.size == 1)
sessionToFlow.none { it.value == flow.fiber.id }
require(flow.fiber.id !in sessionToFlow.values)
flow.resultFuture.set(removalReason.flowReturnValue)
lastState.flowLogic.progressTracker?.currentStep = ProgressTracker.DONE
changesPublisher.onNext(StateMachineManager.Change.Removed(lastState.flowLogic, Try.Success(removalReason.flowReturnValue)))

View File

@ -1,89 +1,160 @@
package net.corda.node.services.statemachine
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.TimedFlow
import net.corda.core.utilities.loggerFor
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.messaging.DataFeed
import net.corda.core.utilities.contextLogger
import net.corda.node.services.FinalityHandler
import org.hibernate.exception.ConstraintViolationException
import rx.subjects.PublishSubject
import java.sql.SQLException
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.*
/**
* This hospital consults "staff" to see if they can automatically diagnose and treat flows.
*/
object StaffedFlowHospital : FlowHospital {
private val log = loggerFor<StaffedFlowHospital>()
class StaffedFlowHospital {
private companion object {
private val log = contextLogger()
private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist, DoctorTimeout, FinalityDoctor)
}
private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist, DoctorTimeout)
private val patients = ConcurrentHashMap<StateMachineRunId, MedicalHistory>()
val numberOfPatients get() = patients.size
private val mutex = ThreadBox(object {
val patients = HashMap<StateMachineRunId, MedicalHistory>()
val recordsPublisher = PublishSubject.create<MedicalRecord>()
})
class MedicalHistory {
val records: MutableList<Record> = mutableListOf()
sealed class Record(val suspendCount: Int) {
class Admitted(val at: Instant, suspendCount: Int) : Record(suspendCount) {
override fun toString() = "Admitted(at=$at, suspendCount=$suspendCount)"
}
class Discharged(val at: Instant, suspendCount: Int, val by: Staff, val error: Throwable) : Record(suspendCount) {
override fun toString() = "Discharged(at=$at, suspendCount=$suspendCount, by=$by)"
}
}
internal val records: MutableList<MedicalRecord> = mutableListOf()
fun notDischargedForTheSameThingMoreThan(max: Int, by: Staff): Boolean {
val lastAdmittanceSuspendCount = (records.last() as MedicalHistory.Record.Admitted).suspendCount
return records.filterIsInstance(MedicalHistory.Record.Discharged::class.java).filter { it.by == by && it.suspendCount == lastAdmittanceSuspendCount }.count() <= max
val lastAdmittanceSuspendCount = (records.last() as MedicalRecord.Admitted).suspendCount
return records
.filterIsInstance<MedicalRecord.Discharged>()
.count { by in it.by && it.suspendCount == lastAdmittanceSuspendCount } <= max
}
override fun toString(): String = "${this.javaClass.simpleName}(records = $records)"
}
override fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
/**
* The flow running in [flowFiber] has errored.
*/
fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState")
val medicalHistory = patients.computeIfAbsent(flowFiber.id) { MedicalHistory() }
medicalHistory.records += MedicalHistory.Record.Admitted(Instant.now(), currentState.checkpoint.numberOfSuspends)
for ((index, error) in errors.withIndex()) {
log.info("Flow ${flowFiber.id} has error [$index]", error)
if (!errorIsDischarged(flowFiber, currentState, error, medicalHistory)) {
// If any error isn't discharged, then we propagate.
log.warn("Flow ${flowFiber.id} error was not discharged, propagating.")
flowFiber.scheduleEvent(Event.StartErrorPropagation)
return
val suspendCount = currentState.checkpoint.numberOfSuspends
val event = mutex.locked {
val medicalHistory = patients.computeIfAbsent(flowFiber.id) { MedicalHistory() }
val admitted = MedicalRecord.Admitted(flowFiber.id, Instant.now(), suspendCount)
medicalHistory.records += admitted
recordsPublisher.onNext(admitted)
val report = consultStaff(flowFiber, currentState, errors, medicalHistory)
val (newRecord, event) = when (report.diagnosis) {
Diagnosis.DISCHARGE -> {
log.info("Flow ${flowFiber.id} error discharged from hospital by ${report.by}")
Pair(MedicalRecord.Discharged(flowFiber.id, Instant.now(), suspendCount, report.by, errors), Event.RetryFlowFromSafePoint)
}
Diagnosis.OVERNIGHT_OBSERVATION -> {
log.info("Flow ${flowFiber.id} error kept for overnight observation by ${report.by}")
// We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart
Pair(MedicalRecord.KeptInForObservation(flowFiber.id, Instant.now(), suspendCount, report.by, errors), null)
}
Diagnosis.NOT_MY_SPECIALTY -> {
// None of the staff care for these errors so we let them propagate
log.info("Flow ${flowFiber.id} error allowed to propagate")
Pair(MedicalRecord.NothingWeCanDo(flowFiber.id, Instant.now(), suspendCount), Event.StartErrorPropagation)
}
}
medicalHistory.records += newRecord
recordsPublisher.onNext(newRecord)
event
}
if (event != null) {
flowFiber.scheduleEvent(event)
}
// If all are discharged, retry.
flowFiber.scheduleEvent(Event.RetryFlowFromSafePoint)
}
private fun errorIsDischarged(flowFiber: FlowFiber, currentState: StateMachineState, error: Throwable, medicalHistory: MedicalHistory): Boolean {
for (staffMember in staff) {
val diagnosis = staffMember.consult(flowFiber, currentState, error, medicalHistory)
if (diagnosis == Diagnosis.DISCHARGE) {
medicalHistory.records += MedicalHistory.Record.Discharged(Instant.now(), currentState.checkpoint.numberOfSuspends, staffMember, error)
log.info("Flow ${flowFiber.id} error discharged from hospital by $staffMember")
return true
}
}
return false
private fun consultStaff(flowFiber: FlowFiber,
currentState: StateMachineState,
errors: List<Throwable>,
medicalHistory: MedicalHistory): ConsultationReport {
return errors
.mapIndexed { index, error ->
log.info("Flow ${flowFiber.id} has error [$index]", error)
val diagnoses: Map<Diagnosis, List<Staff>> = staff.groupBy { it.consult(flowFiber, currentState, error, medicalHistory) }
// We're only interested in the highest priority diagnosis for the error
val (diagnosis, by) = diagnoses.entries.minBy { it.key }!!
ConsultationReport(error, diagnosis, by)
}
// And we're only interested in the error with the highest priority diagnosis
.minBy { it.diagnosis }!!
}
private data class ConsultationReport(val error: Throwable, val diagnosis: Diagnosis, val by: List<Staff>)
/**
* The flow running in [flowFiber] has cleaned, possibly as a result of a flow hospital resume.
*/
// It's okay for flows to be cleaned... we fix them now!
override fun flowCleaned(flowFiber: FlowFiber) {}
fun flowCleaned(flowFiber: FlowFiber) = Unit
override fun flowRemoved(flowFiber: FlowFiber) {
patients.remove(flowFiber.id)
/**
* The flow has been removed from the state machine.
*/
fun flowRemoved(flowFiber: FlowFiber) {
mutex.locked { patients.remove(flowFiber.id) }
}
// TODO MedicalRecord subtypes can expose the Staff class, something which we probably don't want when wiring this method to RPC
/** Returns a stream of medical records as flows pass through the hospital. */
fun track(): DataFeed<List<MedicalRecord>, MedicalRecord> {
return mutex.locked {
DataFeed(patients.values.flatMap { it.records }, recordsPublisher.bufferUntilSubscribed())
}
}
sealed class MedicalRecord {
abstract val flowId: StateMachineRunId
abstract val at: Instant
abstract val suspendCount: Int
data class Admitted(override val flowId: StateMachineRunId,
override val at: Instant,
override val suspendCount: Int) : MedicalRecord()
data class Discharged(override val flowId: StateMachineRunId,
override val at: Instant,
override val suspendCount: Int,
val by: List<Staff>,
val errors: List<Throwable>) : MedicalRecord()
data class KeptInForObservation(override val flowId: StateMachineRunId,
override val at: Instant,
override val suspendCount: Int,
val by: List<Staff>,
val errors: List<Throwable>) : MedicalRecord()
data class NothingWeCanDo(override val flowId: StateMachineRunId,
override val at: Instant,
override val suspendCount: Int) : MedicalRecord()
}
/** The order of the enum values are in priority order. */
enum class Diagnosis {
/**
* Retry from last safe point.
*/
/** Retry from last safe point. */
DISCHARGE,
/**
* Please try another member of staff.
*/
/** Park and await intervention. */
OVERNIGHT_OBSERVATION,
/** Please try another member of staff. */
NOT_MY_SPECIALTY
}
@ -122,7 +193,7 @@ object StaffedFlowHospital : FlowHospital {
}
private fun mentionsConstraintViolation(exception: Throwable?): Boolean {
return exception != null && (exception is org.hibernate.exception.ConstraintViolationException || mentionsConstraintViolation(exception.cause))
return exception != null && (exception is ConstraintViolationException || mentionsConstraintViolation(exception.cause))
}
}
@ -152,4 +223,13 @@ object StaffedFlowHospital : FlowHospital {
}
}
}
}
/**
* Parks [FinalityHandler]s for observation.
*/
object FinalityDoctor : Staff {
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis {
return if (currentState.flowLogic is FinalityHandler) Diagnosis.OVERNIGHT_OBSERVATION else Diagnosis.NOT_MY_SPECIALTY
}
}
}

View File

@ -92,6 +92,8 @@ interface StateMachineManager {
* The event may be replayed if a flow fails and attempts to retry.
*/
fun deliverExternalEvent(event: ExternalEvent)
val flowHospital: StaffedFlowHospital
}
// These must be idempotent! A later failure in the state transition may error the flow state, and a replay may call

View File

@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
* transition.
*/
class HospitalisingInterceptor(
private val flowHospital: FlowHospital,
private val flowHospital: StaffedFlowHospital,
private val delegate: TransitionExecutor
) : TransitionExecutor {
private val hospitalisedFlows = ConcurrentHashMap<StateMachineRunId, FlowFiber>()

View File

@ -0,0 +1,95 @@
package net.corda.node.services
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.StateMachineRunId
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.finance.POUNDS
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.issuedBy
import net.corda.node.internal.StartedNode
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.node.services.statemachine.StaffedFlowHospital.MedicalRecord.KeptInForObservation
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.startFlow
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Test
class FinalityHandlerTest {
private lateinit var mockNet: InternalMockNetwork
@After
fun cleanUp() {
mockNet.stopNodes()
}
@Test
fun `sent to flow hospital on error and attempted retry on node restart`() {
// Setup a network where only Alice has the finance CorDapp and it sends a cash tx to Bob who doesn't have the
// CorDapp. Bob's FinalityHandler will error when validating the tx.
mockNet = InternalMockNetwork(cordappPackages = emptyList())
val alice = mockNet.createNode(InternalMockNodeParameters(
legalName = ALICE_NAME,
extraCordappPackages = listOf("net.corda.finance.contracts.asset")
))
var bob = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME))
val stx = TransactionBuilder(mockNet.defaultNotaryIdentity).let {
Cash().generateIssue(
it,
1000.POUNDS.issuedBy(alice.info.singleIdentity().ref(0)),
bob.info.singleIdentity(),
mockNet.defaultNotaryIdentity
)
alice.services.signInitialTransaction(it)
}
val finalityHandlerIdFuture = bob.smm.track()
.updates
.filter { it.logic is FinalityHandler }
.map { it.logic.runId }
.toFuture()
val finalisedTx = alice.services.startFlow(FinalityFlow(stx)).run {
mockNet.runNetwork()
resultFuture.getOrThrow()
}
val finalityHandlerId = finalityHandlerIdFuture.getOrThrow()
bob.assertFlowSentForObservation(finalityHandlerId)
assertThat(bob.getTransaction(finalisedTx.id)).isNull()
bob = mockNet.restartNode(bob)
// Since we've not done anything to fix the orignal error, we expect the finality handler to be sent to the hospital
// again on restart
bob.assertFlowSentForObservation(finalityHandlerId)
assertThat(bob.getTransaction(finalisedTx.id)).isNull()
}
private fun StartedNode<*>.assertFlowSentForObservation(runId: StateMachineRunId) {
val keptInForObservation = smm.flowHospital
.track()
.let { it.updates.startWith(it.snapshot) }
.filter { it.flowId == runId }
.ofType(KeptInForObservation::class.java)
.toBlocking()
.first()
assertThat(keptInForObservation.by).contains(StaffedFlowHospital.FinalityDoctor)
}
private fun StartedNode<*>.getTransaction(id: SecureHash): SignedTransaction? {
return database.transaction {
services.validatedTransactions.getTransaction(id)
}
}
}

View File

@ -702,9 +702,6 @@ class FlowFrameworkPersistenceTests {
fun `flow loaded from checkpoint will respond to messages from before start`() {
aliceNode.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) }
bobNode.services.startFlow(ReceiveFlow(alice).nonTerminating()) // Prepare checkpointed receive flow
// Make sure the add() has finished initial processing.
bobNode.internals.disableDBCloseOnStop()
bobNode.dispose() // kill receiver
val restoredFlow = bobNode.restartAndGetRestoredFlow<ReceiveFlow>()
assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello")
}
@ -762,14 +759,11 @@ class FlowFrameworkPersistenceTests {
////////////////////////////////////////////////////////////////////////////////////////////////////////////
//region Helpers
private inline fun <reified P : FlowLogic<*>> StartedNode<MockNode>.restartAndGetRestoredFlow() = internals.run {
disableDBCloseOnStop() // Handover DB to new node copy
stop()
val newNode = mockNet.createNode(InternalMockNodeParameters(id, configuration.myLegalName))
private inline fun <reified P : FlowLogic<*>> StartedNode<MockNode>.restartAndGetRestoredFlow(): P {
val newNode = mockNet.restartNode(this)
newNode.internals.acceptableLiveFiberCountOnStop = 1
manuallyCloseDB()
mockNet.runNetwork()
newNode.getSingleFlow<P>().first
return newNode.getSingleFlow<P>().first
}
private fun assertSessionTransfers(vararg expected: SessionTransfer) {

View File

@ -7,6 +7,8 @@ import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.packageName
import net.corda.core.messaging.MessageRecipients
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
@ -15,10 +17,13 @@ import net.corda.node.services.messaging.Message
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.nodeapi.internal.persistence.contextTransaction
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNetwork.MockNode
import net.corda.testing.node.internal.MessagingServiceSpy
import net.corda.testing.node.internal.newContext
import net.corda.testing.node.internal.setMessagingServiceSpy
import org.assertj.core.api.Assertions
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.hibernate.exception.ConstraintViolationException
import org.junit.After
import org.junit.Before
import org.junit.Test
@ -30,21 +35,23 @@ import kotlin.test.assertNull
class RetryFlowMockTest {
private lateinit var mockNet: InternalMockNetwork
private lateinit var internalNodeA: StartedNode<InternalMockNetwork.MockNode>
private lateinit var internalNodeB: StartedNode<InternalMockNetwork.MockNode>
private lateinit var nodeA: StartedNode<MockNode>
private lateinit var nodeB: StartedNode<MockNode>
@Before
fun start() {
mockNet = InternalMockNetwork(threadPerNode = true, cordappPackages = listOf(this.javaClass.`package`.name))
internalNodeA = mockNet.createNode()
internalNodeB = mockNet.createNode()
mockNet = InternalMockNetwork(threadPerNode = true, cordappPackages = listOf(this.javaClass.packageName))
nodeA = mockNet.createNode()
nodeB = mockNet.createNode()
mockNet.startNodes()
RetryFlow.count = 0
SendAndRetryFlow.count = 0
RetryInsertFlow.count = 0
}
private fun <T> StartedNode<InternalMockNetwork.MockNode>.startFlow(logic: FlowLogic<T>): CordaFuture<T> = this.services.startFlow(logic, this.services.newContext()).getOrThrow().resultFuture
private fun <T> StartedNode<MockNode>.startFlow(logic: FlowLogic<T>): CordaFuture<T> {
return this.services.startFlow(logic, this.services.newContext()).flatMap { it.resultFuture }
}
@After
fun cleanUp() {
@ -53,14 +60,14 @@ class RetryFlowMockTest {
@Test
fun `Single retry`() {
assertEquals(Unit, internalNodeA.startFlow(RetryFlow(1)).get())
assertEquals(Unit, nodeA.startFlow(RetryFlow(1)).get())
assertEquals(2, RetryFlow.count)
}
@Test
fun `Retry forever`() {
Assertions.assertThatThrownBy {
internalNodeA.startFlow(RetryFlow(Int.MAX_VALUE)).getOrThrow()
assertThatThrownBy {
nodeA.startFlow(RetryFlow(Int.MAX_VALUE)).getOrThrow()
}.isInstanceOf(LimitedRetryCausingError::class.java)
assertEquals(5, RetryFlow.count)
}
@ -68,14 +75,14 @@ class RetryFlowMockTest {
@Test
fun `Retry does not set senderUUID`() {
val messagesSent = mutableListOf<Message>()
val partyB = internalNodeB.info.legalIdentities.first()
internalNodeA.setMessagingServiceSpy(object : MessagingServiceSpy(internalNodeA.network) {
val partyB = nodeB.info.legalIdentities.first()
nodeA.setMessagingServiceSpy(object : MessagingServiceSpy(nodeA.network) {
override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) {
messagesSent.add(message)
messagingService.send(message, target)
}
})
internalNodeA.startFlow(SendAndRetryFlow(1, partyB)).get()
nodeA.startFlow(SendAndRetryFlow(1, partyB)).get()
assertNotNull(messagesSent.first().senderUUID)
assertNull(messagesSent.last().senderUUID)
assertEquals(2, SendAndRetryFlow.count)
@ -83,26 +90,25 @@ class RetryFlowMockTest {
@Test
fun `Retry duplicate insert`() {
assertEquals(Unit, internalNodeA.startFlow(RetryInsertFlow(1)).get())
assertEquals(Unit, nodeA.startFlow(RetryInsertFlow(1)).get())
assertEquals(2, RetryInsertFlow.count)
}
@Test
fun `Patient records do not leak in hospital`() {
val patientCountBefore = StaffedFlowHospital.numberOfPatients
assertEquals(Unit, internalNodeA.startFlow(RetryFlow(1)).get())
assertEquals(Unit, nodeA.startFlow(RetryFlow(1)).get())
// Need to make sure the state machine has finished. Otherwise this test is flakey.
mockNet.waitQuiescent()
assertEquals(patientCountBefore, StaffedFlowHospital.numberOfPatients)
assertThat(nodeA.smm.flowHospital.track().snapshot).isEmpty()
assertEquals(2, RetryFlow.count)
}
}
class LimitedRetryCausingError : org.hibernate.exception.ConstraintViolationException("Test message", SQLException(), "Test constraint")
class LimitedRetryCausingError : ConstraintViolationException("Test message", SQLException(), "Test constraint")
class RetryCausingError : SQLException("deadlock")
class RetryFlow(val i: Int) : FlowLogic<Unit>() {
class RetryFlow(private val i: Int) : FlowLogic<Unit>() {
companion object {
var count = 0
}
@ -121,7 +127,7 @@ class RetryFlow(val i: Int) : FlowLogic<Unit>() {
}
@InitiatingFlow
class SendAndRetryFlow(val i: Int, val other: Party) : FlowLogic<Unit>() {
class SendAndRetryFlow(private val i: Int, private val other: Party) : FlowLogic<Unit>() {
companion object {
var count = 0
}
@ -137,8 +143,9 @@ class SendAndRetryFlow(val i: Int, val other: Party) : FlowLogic<Unit>() {
}
}
@Suppress("unused")
@InitiatedBy(SendAndRetryFlow::class)
class ReceiveFlow2(val other: FlowSession) : FlowLogic<Unit>() {
class ReceiveFlow2(private val other: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val received = other.receive<String>().unwrap { it }
@ -146,7 +153,7 @@ class ReceiveFlow2(val other: FlowSession) : FlowLogic<Unit>() {
}
}
class RetryInsertFlow(val i: Int) : FlowLogic<Unit>() {
class RetryInsertFlow(private val i: Int) : FlowLogic<Unit>() {
companion object {
var count = 0
}
@ -166,4 +173,4 @@ class RetryInsertFlow(val i: Int) : FlowLogic<Unit>() {
val tx = DBTransactionStorage.DBTransaction("Foo")
contextTransaction.session.save(tx)
}
}
}

View File

@ -40,7 +40,6 @@ import net.corda.nodeapi.internal.persistence.HibernateConfiguration
import net.corda.testing.core.singleIdentity
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.startFlow
import org.junit.After
import org.junit.Test
@ -81,8 +80,7 @@ class NodePair(private val mockNet: InternalMockNetwork) {
client.services.startFlow(clientLogic)
while (!serverRunning.get()) mockNet.runNetwork(1)
if (rebootClient) {
client.dispose()
client = mockNet.createNode(InternalMockNodeParameters(client.internals.id))
client = mockNet.restartNode(client)
}
return uncheckedCast(client.smm.allStateMachines.single().stateMachine)
}

View File

@ -395,6 +395,17 @@ open class InternalMockNetwork(private val cordappPackages: List<String>,
return node
}
fun <N : MockNode> restartNode(node: StartedNode<N>, nodeFactory: (MockNodeArgs) -> N): StartedNode<N> {
node.internals.disableDBCloseOnStop()
node.dispose()
return createNode(
InternalMockNodeParameters(legalName = node.internals.configuration.myLegalName, forcedID = node.internals.id),
nodeFactory
)
}
fun restartNode(node: StartedNode<MockNode>): StartedNode<MockNode> = restartNode(node, defaultFactory)
fun baseDirectory(nodeId: Int): Path = filesystem.getPath("/nodes/$nodeId")
/**
@ -449,8 +460,7 @@ open class InternalMockNetwork(private val cordappPackages: List<String>,
messagingNetwork.stop()
}
// Test method to block until all scheduled activity, active flows
// and network activity has ceased.
/** Block until all scheduled activity, active flows and network activity has ceased. */
fun waitQuiescent() {
busyLatch.await()
}