diff --git a/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt b/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt index 9b2f591211..36a7b1cd21 100644 --- a/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt @@ -16,6 +16,7 @@ import net.corda.core.internal.ResolveTransactionsFlow import net.corda.core.internal.pushToLoggingContext import net.corda.core.node.StatesToRecord import net.corda.core.transactions.SignedTransaction +import net.corda.core.utilities.trace import net.corda.core.utilities.unwrap import java.security.SignatureException @@ -43,9 +44,9 @@ class ReceiveTransactionFlow @JvmOverloads constructor(private val otherSideSess TransactionVerificationException::class) override fun call(): SignedTransaction { if (checkSufficientSignatures) { - logger.trace("Receiving a transaction from ${otherSideSession.counterparty}") + logger.trace { "Receiving a transaction from ${otherSideSession.counterparty}" } } else { - logger.trace("Receiving a transaction (but without checking the signatures) from ${otherSideSession.counterparty}") + logger.trace { "Receiving a transaction (but without checking the signatures) from ${otherSideSession.counterparty}" } } val stx = otherSideSession.receive().unwrap { it.pushToLoggingContext() diff --git a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt index 8257623faf..c1755ee690 100644 --- a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt +++ b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt @@ -388,7 +388,8 @@ fun TransactionBuilder.toWireTransaction(services: ServicesForResolution, serial fun TransactionBuilder.toLedgerTransaction(services: ServicesForResolution, serializationContext: SerializationContext) = toLedgerTransactionWithContext(services, serializationContext) /** Convenience method to get the package name of a class literal. */ -val KClass<*>.packageName: String get() = java.`package`.name +val KClass<*>.packageName: String get() = java.packageName +val Class<*>.packageName: String get() = `package`.name inline val Class<*>.isAbstractClass: Boolean get() = Modifier.isAbstract(modifiers) diff --git a/core/src/test/kotlin/net/corda/core/serialization/AttachmentSerializationTest.kt b/core/src/test/kotlin/net/corda/core/serialization/AttachmentSerializationTest.kt index 1db3abe500..cc8e1bfa8e 100644 --- a/core/src/test/kotlin/net/corda/core/serialization/AttachmentSerializationTest.kt +++ b/core/src/test/kotlin/net/corda/core/serialization/AttachmentSerializationTest.kt @@ -171,12 +171,11 @@ class AttachmentSerializationTest { } private fun rebootClientAndGetAttachmentContent(checkAttachmentsOnLoad: Boolean = true): String { - client.dispose() - client = mockNet.createNode(InternalMockNodeParameters(client.internals.id, client.internals.configuration.myLegalName), nodeFactory = { args -> + client = mockNet.restartNode(client) { args -> object : InternalMockNetwork.MockNode(args) { override fun start() = super.start().apply { attachments.checkAttachmentsOnLoad = checkAttachmentsOnLoad } } - }) + } return (client.smm.allStateMachines[0].stateMachine.resultFuture.apply { mockNet.runNetwork() }.getOrThrow() as ClientResult).attachmentContent } diff --git a/docs/source/api-contract-constraints.rst b/docs/source/api-contract-constraints.rst index 8da496e45c..34d3b54f35 100644 --- a/docs/source/api-contract-constraints.rst +++ b/docs/source/api-contract-constraints.rst @@ -132,6 +132,22 @@ you require the hash of the node's installed app which supplies the specified co hash constraints, you almost always want "whatever the current code is" and not a hard-coded hash. So this automatic constraint placeholder is useful. +FinalityFlow +------------ + +It's possible to encounter contract contraint issues when notarising transactions with the ``FinalityFlow`` on a network +containing multiple versions of the same CorDapp. This will happen when using hash contraints or with zone contraints +if the zone whitelist has missing CorDapp versions. If a participating party fails to validate the **notarised** transaction +then we have a scenerio where the members of the network do not have a consistent view of the ledger. + +Therfore, if the finality handler flow (which is run on the counterparty) errors for any reason it will always be sent to +the flow hospital. From there it's suspended waiting to be retried on node restart. This gives the node operator the opportunity +to recover from those errors, which in the case of contract constraint voilations means either updating the CorDapp or +adding its hash to the zone whitelist. + +.. note:: This is a temporary issue in the current version of Corda, until we implement some missing features which will + enable a seemless handling of differences in CorDapp versions. + CorDapps as attachments ----------------------- diff --git a/docs/source/api-flows.rst b/docs/source/api-flows.rst index a4bdc011cf..7ff52ee634 100644 --- a/docs/source/api-flows.rst +++ b/docs/source/api-flows.rst @@ -519,6 +519,18 @@ We can also choose to send the transaction to additional parties who aren't one Only one party has to call ``FinalityFlow`` for a given transaction to be recorded by all participants. It does **not** need to be called by each participant individually. +Because the transaction has already been notarised and the input states consumed, if the participants when receiving the +transaction fail to verify it, or the receiving flow (the finality handler) fails due to some other error, we then have +the scenerio where not all parties have the correct up to date view of the ledger. To recover from this the finality handler +is automatically sent to the flow hospital where it's suspended and retried from its last checkpoint on node restart. +This gives the node operator the opportunity to recover from the error. Until the issue is resolved the node will continue +to retry the flow on each startup. + +.. note:: It's possible to forcibly terminate the erroring finality handler using the ``killFlow`` RPC but at the risk + of an inconsistent view of the ledger. + +.. note:: A future release will allow retrying hospitalised flows without restarting the node, i.e. via RPC. + CollectSignaturesFlow/SignTransactionFlow ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The list of parties who need to sign a transaction is dictated by the transaction's commands. Once we've signed a diff --git a/docs/source/building-the-docs.rst b/docs/source/building-the-docs.rst index 020c2b35fa..4ece4e4997 100644 --- a/docs/source/building-the-docs.rst +++ b/docs/source/building-the-docs.rst @@ -21,6 +21,11 @@ done by specifying the installation target on the command line: sudo -H pip install --install-option '--install-data=/usr/local' Sphinx sudo -H pip install --install-option '--install-data=/usr/local' sphinx_rtd_theme + +.. warning:: When installing Sphinx, you may see the following error message: "Found existing installation: six 1.4.1 + Cannot uninstall 'six'. It is a distutils installed project and thus we cannot accurately determine which files + belong to it which would lead to only a partial uninstall.". If so, run the install with the + ``--ignore-installed six`` flag. Build ----- diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 107045daed..759831aa2a 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -8,11 +8,19 @@ Unreleased ========== * Introduced a hierarchy of ``DatabaseMigrationException``s, allowing ``NodeStartup`` to gracefully inform users of problems related to database migrations before exiting with a non-zero code. +* Introducing the flow hospital - a component of the node that manages flows that have errored and whether they should + be retried from their previous checkpoints or have their errors propagate. Currently it will respond to any error that + occurs during the resolution of a received transaction as part of ``FinalityFlow``. In such a scenerio the receiving + flow will be parked and retried on node restart. This is to allow the node operator to rectify the situation as otherwise + the node will have an incomplete view of the ledger. + * Fixed an issue preventing out of process nodes started by the ``Driver`` from logging to file. * Fixed an issue with ``CashException`` not being able to deserialise after the introduction of AMQP for RPC. -* Removed -xmx VM argument from Explorer's Capsule setup. This helps avoiding out of memory errors. +* Removed -Xmx VM argument from Explorer's Capsule setup. This helps avoiding out of memory errors. + +* New ``killFlow`` RPC for killing stuck flows. * Shell now kills an ongoing flow when CTRL+C is pressed in the terminal. @@ -26,7 +34,8 @@ Unreleased * Improved audit trail for ``FinalityFlow`` and related sub-flows. -* ``NodeStartup`` will now only print node's configuration if ``devMode`` is ``true``, avoiding the risk of printing passwords in a production setup. +* The node's configuration is only printed on startup if ``devMode`` is ``true``, avoiding the risk of printing passwords + in a production setup. * SLF4J's MDC will now only be printed to the console if not empty. No more log lines ending with "{}". diff --git a/docs/source/deterministic-modules.rst b/docs/source/deterministic-modules.rst new file mode 100644 index 0000000000..435b75c7fc --- /dev/null +++ b/docs/source/deterministic-modules.rst @@ -0,0 +1,253 @@ +Deterministic Corda Modules +=========================== + +A Corda contract's verify function should always produce the same results for the same input data. To that end, +Corda provides the following modules: + + #. ``core-deterministic`` + #. ``serialization-deterministic`` + #. ``jdk8u-deterministic`` + +These are reduced version of Corda's ``core`` and ``serialization`` modules and the OpenJDK 8 ``rt.jar``, where the +non-deterministic functionality has been removed. The intention here is that all CorDapp classes required for +contract verification should be compiled against these modules to prevent them containing non-deterministic behaviour. + +.. note:: These modules are only a development aid. They cannot guarantee determinism without also including + deterministic versions of all their dependent libraries, e.g. ``kotlin-stdlib``. + +Generating the Deterministic Modules +------------------------------------ + +JDK 8 + ``jdk8u-deterministic`` is a "pseudo JDK" image that we can point the Java and Kotlin compilers to. It downloads the + ``rt.jar`` containing a deterministic subset of the Java 8 APIs from the Artifactory. + + To build a new version of this JAR and upload it to the Artifactory, see the ``create-jdk8u`` module. This is a + standalone Gradle project within the Corda repository that will clone the ``deterministic-jvm8`` branch of Corda's + `OpenJDK repository `_ and then build it. (This currently requires a C++ compiler, + GNU Make and a UNIX-like development environment.) + +Corda Modules + ``core-deterministic`` and ``serialization-deterministic`` are generated from Corda's ``core`` and ``serialization`` + modules respectively using both `ProGuard `_ and Corda's ``JarFilter`` Gradle + plugin. Corda developers configure these tools by applying Corda's ``@Deterministic`` and ``@NonDeterministic`` + annotations to elements of ``core`` and ``serialization`` as described `here `_. + +The build generates each of Corda's deterministic JARs in six steps: + + #. Some *very few* classes in the original JAR must be replaced completely. This is typically because the original + class uses something like ``ThreadLocal``, which is not available in the deterministic Java APIs, and yet the + class is still required by the deterministic JAR. We must keep such classes to a minimum! + #. The patched JAR is analysed by ProGuard for the first time using the following rule: + + .. sourcecode:: groovy + + keep '@interface net.corda.core.Deterministic { *; }' + + .. + + ProGuard works by calculating how much code is reachable from given "entry points", and in our case these entry + points are the ``@Deterministic`` classes. The unreachable classes are then discarded by ProGuard's ``shrink`` + option. + #. The remaining classes may still contain non-deterministic code. However, there is no way of writing a ProGuard rule + explicitly to discard anything. Consider the following class: + + .. sourcecode:: kotlin + + @CordaSerializable + @Deterministic + data class UniqueIdentifier(val externalId: String?, val id: UUID) : Comparable { + @NonDeterministic constructor(externalId: String?) : this(externalId, UUID.randomUUID()) + @NonDeterministic constructor() : this(null) + ... + } + + .. + + While CorDapps will definitely need to handle ``UniqueIdentifier`` objects, both of the secondary constructors + generate a new random ``UUID`` and so are non-deterministic. Hence the next "determinising" step is to pass the + classes to the ``JarFilter`` tool, which strips out all of the elements which have been annotated as + ``@NonDeterministic`` and stubs out any functions annotated with ``@NonDeterministicStub``. (Stub functions that + return a value will throw ``UnsupportedOperationException``, whereas ``void`` or ``Unit`` stubs will do nothing.) + #. After the ``@NonDeterministic`` elements have been filtered out, the classes are rescanned using ProGuard to remove + any more code that has now become unreachable. + #. The remaining classes define our deterministic subset. However, the ``@kotlin.Metadata`` annotations on the compiled + Kotlin classes still contain references to all of the functions and properties that ProGuard has deleted. Therefore + we now use the ``JarFilter`` to delete these references, as otherwise the Kotlin compiler will pretend that the + deleted functions and properties are still present. + #. Finally, we use ProGuard again to validate our JAR against the deterministic ``rt.jar``: + + .. sourcecode:: groovy + + task checkDeterminism(type: ProGuardTask, dependsOn: jdkTask) { + injars metafix + + libraryjars "$deterministic_jdk_home/jre/lib/rt.jar" + + configurations.runtimeLibraries.forEach { + libraryjars it.path, filter: '!META-INF/versions/**' + } + + keepattributes '*' + dontpreverify + dontobfuscate + dontoptimize + verbose + + keep 'class *' + } + + .. + + This step will fail if ProGuard spots any Java API references that still cannot be satisfied by the deterministic + ``rt.jar``, and hence it will break the build. + +Testing the Deterministic Modules +--------------------------------- + +The ``core-deterministic:testing`` module executes some basic JUnit tests for the ``core-deterministic`` and +``serialization-deterministic`` JARs. These tests are compiled against the deterministic ``rt.jar``, although +they are still executed using the full JDK. + +The ``testing`` module also has two sub-modules: + +``core-deterministic:testing:data`` + This module generates test data such as serialised transactions and elliptic curve key pairs using the full + non-deterministic ``core`` library and JDK. This data is all written into a single JAR which the ``testing`` + module adds to its classpath. + +``core-deterministic:testing:common`` + This module provides the test classes which the ``testing`` and ``data`` modules need to share. It is therefore + compiled against the deterministic API subset. + + +.. _deterministic_annotations: + +Applying @Deterministic and @NonDeterministic annotations +--------------------------------------------------------- + +Corda developers need to understand how to annotate classes in the ``core`` and ``serialization`` modules correctly +in order to maintain the deterministic JARs. + +.. note:: Every Kotlin class still has its own ``.class`` file, even when all of those classes share the same + source file. Also, annotating the file: + + .. sourcecode:: kotlin + + @file:Deterministic + package net.corda.core.internal + + .. + + *does not* automatically annotate any class declared *within* this file. It merely annotates any + accompanying Kotlin ``xxxKt`` class. + +For more information about how ``JarFilter`` is processing the byte-code inside ``core`` and ``serialization``, +use Gradle's ``--info`` or ``--debug`` command-line options. + +Deterministic Classes + Classes that *must* be included in the deterministic JAR should be annotated as ``@Deterministic``. + + .. sourcecode:: kotlin + + @Target(FILE, CLASS) + @Retention(BINARY) + @CordaInternal + annotation class Deterministic + .. + + To preserve any Kotlin functions, properties or type aliases that have been declared outside of a ``class``, + you should annotate the source file's ``package`` declaration instead: + + .. sourcecode:: kotlin + + @file:JvmName("InternalUtils") + @file:Deterministic + package net.corda.core.internal + + infix fun Temporal.until(endExclusive: Temporal): Duration = Duration.between(this, endExclusive) + + .. + +Non-Deterministic Elements + Elements that *must* be deleted from classes in the deterministic JAR should be annotated as ``@NonDeterministic``. + + .. sourcecode:: kotlin + + @Target( + FILE, + CLASS, + CONSTRUCTOR, + FUNCTION, + PROPERTY_GETTER, + PROPERTY_SETTER, + PROPERTY, + FIELD, + TYPEALIAS + ) + @Retention(BINARY) + @CordaInternal + annotation class NonDeterministic + + .. + + You must also ensure that a deterministic class's primary constructor does not reference any classes that are + not available in the deterministic ``rt.jar``, nor have any non-deterministic default parameter values such as + ``UUID.randomUUID()``. The biggest risk here would be that ``JarFilter`` would delete the primary constructor + and that the class could no longer be instantiated, although ``JarFilter`` will print a warning in this case. + However, it is also likely that the "determinised" class would have a different serialisation signature than + its non-deterministic version and so become unserialisable on the deterministic JVM. + + Be aware that package-scoped Kotlin properties are all initialised within a common ```` block inside + their host ``.class`` file. This means that when ``JarFilter`` deletes these properties, it cannot also remove + their initialisation code. For example: + + .. sourcecode:: kotlin + + package net.corda.core + + @NonDeterministic + val map: MutableMap = ConcurrentHashMap() + + .. + + In this case, ``JarFilter`` would delete the ``map`` property but the ```` block would still create + an instance of ``ConcurrentHashMap``. The solution here is to refactor the property into its own file and then + annotate the file itself as ``@NonDeterministic`` instead. + +Non-Deterministic Function Stubs + Sometimes it is impossible to delete a function entirely. Or a function may have some non-deterministic code + embedded inside it that cannot be removed. For these rare cases, there is the ``@NonDeterministicStub`` + annotation: + + .. sourcecode:: kotlin + + @Target( + CONSTRUCTOR, + FUNCTION, + PROPERTY_GETTER, + PROPERTY_SETTER + ) + @Retention(BINARY) + @CordaInternal + annotation class NonDeterministicStub + + .. + + This annotation instructs ``JarFilter`` to replace the function's body with either an empty body (for functions + that return ``void`` or ``Unit``) or one that throws ``UnsupportedOperationException``. For example: + + .. sourcecode:: kotlin + + fun necessaryCode() { + nonDeterministicOperations() + otherOperations() + } + + @NonDeterministicStub + private fun nonDeterministicOperations() { + // etc + } + + .. + diff --git a/docs/source/index.rst b/docs/source/index.rst index d0f9cbb4b9..f2ce85fe13 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -71,5 +71,6 @@ We look forward to seeing what you can do with Corda! release-process-index.rst corda-repo-layout.rst + deterministic-modules.rst building-the-docs.rst json.rst diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowHospital.kt deleted file mode 100644 index 994127c3b1..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowHospital.kt +++ /dev/null @@ -1,33 +0,0 @@ -/* - * R3 Proprietary and Confidential - * - * Copyright (c) 2018 R3 Limited. All rights reserved. - * - * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. - * - * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. - */ - -package net.corda.node.services.statemachine - -/** - * A flow hospital is a class that is notified when a flow transitions into an error state due to an uncaught exception - * or internal error condition, and when it becomes clean again (e.g. due to a resume). - * Also see [net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor]. - */ -interface FlowHospital { - /** - * The flow running in [flowFiber] has errored. - */ - fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List) - - /** - * The flow running in [flowFiber] has cleaned, possibly as a result of a flow hospital resume. - */ - fun flowCleaned(flowFiber: FlowFiber) - - /** - * The flow has been removed from the state machine. - */ - fun flowRemoved(flowFiber: FlowFiber) -} diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt index 4e06845d80..835238333a 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt @@ -102,6 +102,8 @@ class MultiThreadedStateMachineManager( val timedFlows = ConcurrentHashMap() } + override val flowHospital: StaffedFlowHospital = StaffedFlowHospital() + private val concurrentBox = ConcurrentBox(InnerState()) private val scheduler = FiberExecutorScheduler("Flow fiber scheduler", executor) @@ -760,7 +762,7 @@ class MultiThreadedStateMachineManager( private fun makeTransitionExecutor(): TransitionExecutor { val interceptors = ArrayList() - interceptors.add { HospitalisingInterceptor(StaffedFlowHospital, it) } + interceptors.add { HospitalisingInterceptor(flowHospital, it) } if (serviceHub.configuration.devMode) { interceptors.add { DumpHistoryOnErrorInterceptor(it) } interceptors.add { MetricInterceptor(metrics, it) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/PropagatingFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/PropagatingFlowHospital.kt deleted file mode 100644 index 120f671e83..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/PropagatingFlowHospital.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * R3 Proprietary and Confidential - * - * Copyright (c) 2018 R3 Limited. All rights reserved. - * - * The intellectual and technical concepts contained herein are proprietary to R3 and its suppliers and are protected by trade secret law. - * - * Distribution of this file or any portion thereof via any medium without the express permission of R3 is strictly prohibited. - */ - -package net.corda.node.services.statemachine - -import net.corda.core.utilities.debug -import net.corda.core.utilities.loggerFor - -/** - * A simple [FlowHospital] implementation that immediately triggers error propagation when a flow dirties. - */ -object PropagatingFlowHospital : FlowHospital { - private val log = loggerFor() - - override fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List) { - log.debug { "Flow ${flowFiber.id} in state $currentState encountered error" } - flowFiber.scheduleEvent(Event.StartErrorPropagation) - for ((index, error) in errors.withIndex()) { - log.warn("Flow ${flowFiber.id} is propagating error [$index] ", error) - } - } - - override fun flowCleaned(flowFiber: FlowFiber) { - throw IllegalStateException("Flow ${flowFiber.id} cleaned after error propagation triggered") - } - - override fun flowRemoved(flowFiber: FlowFiber) {} -} diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 3a6bdb8b0c..52d326fa21 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -99,6 +99,8 @@ class SingleThreadedStateMachineManager( val timedFlows = HashMap() } + override val flowHospital: StaffedFlowHospital = StaffedFlowHospital() + private val mutex = ThreadBox(InnerState()) private val scheduler = FiberExecutorScheduler("Same thread scheduler", executor) private val timeoutScheduler = Executors.newScheduledThreadPool(1) @@ -113,13 +115,11 @@ class SingleThreadedStateMachineManager( private val ourSenderUUID = serviceHub.networkService.ourSenderUUID private var checkpointSerializationContext: SerializationContext? = null - private var tokenizableServices: List? = null private var actionExecutor: ActionExecutor? = null override val allStateMachines: List> get() = mutex.locked { flows.values.map { it.fiber.logic } } - private val totalStartedFlows = metrics.counter("Flows.Started") private val totalFinishedFlows = metrics.counter("Flows.Finished") @@ -133,7 +133,6 @@ class SingleThreadedStateMachineManager( override fun start(tokenizableServices: List) { checkQuasarJavaAgentPresence() - this.tokenizableServices = tokenizableServices val checkpointSerializationContext = SerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext( SerializeAsTokenContextImpl(tokenizableServices, SerializationDefaults.SERIALIZATION_FACTORY, SerializationDefaults.CHECKPOINT_CONTEXT, serviceHub) ) @@ -763,7 +762,7 @@ class SingleThreadedStateMachineManager( private fun makeTransitionExecutor(): TransitionExecutor { val interceptors = ArrayList() - interceptors.add { HospitalisingInterceptor(StaffedFlowHospital, it) } + interceptors.add { HospitalisingInterceptor(flowHospital, it) } if (serviceHub.configuration.devMode) { interceptors.add { DumpHistoryOnErrorInterceptor(it) } } @@ -787,7 +786,7 @@ class SingleThreadedStateMachineManager( require(lastState.pendingDeduplicationHandlers.isEmpty()) require(lastState.isRemoved) require(lastState.checkpoint.subFlowStack.size == 1) - sessionToFlow.none { it.value == flow.fiber.id } + require(flow.fiber.id !in sessionToFlow.values) flow.resultFuture.set(removalReason.flowReturnValue) lastState.flowLogic.progressTracker?.currentStep = ProgressTracker.DONE changesPublisher.onNext(StateMachineManager.Change.Removed(lastState.flowLogic, Try.Success(removalReason.flowReturnValue))) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt index eb7806e9b8..a08ebada2e 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt @@ -1,89 +1,160 @@ package net.corda.node.services.statemachine import net.corda.core.flows.StateMachineRunId +import net.corda.core.internal.ThreadBox import net.corda.core.internal.TimedFlow -import net.corda.core.utilities.loggerFor +import net.corda.core.internal.bufferUntilSubscribed +import net.corda.core.messaging.DataFeed +import net.corda.core.utilities.contextLogger +import net.corda.node.services.FinalityHandler +import org.hibernate.exception.ConstraintViolationException +import rx.subjects.PublishSubject import java.sql.SQLException import java.time.Instant -import java.util.concurrent.ConcurrentHashMap +import java.util.* /** * This hospital consults "staff" to see if they can automatically diagnose and treat flows. */ -object StaffedFlowHospital : FlowHospital { - private val log = loggerFor() +class StaffedFlowHospital { + private companion object { + private val log = contextLogger() + private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist, DoctorTimeout, FinalityDoctor) + } - private val staff = listOf(DeadlockNurse, DuplicateInsertSpecialist, DoctorTimeout) - - private val patients = ConcurrentHashMap() - - val numberOfPatients get() = patients.size + private val mutex = ThreadBox(object { + val patients = HashMap() + val recordsPublisher = PublishSubject.create() + }) class MedicalHistory { - val records: MutableList = mutableListOf() - - sealed class Record(val suspendCount: Int) { - class Admitted(val at: Instant, suspendCount: Int) : Record(suspendCount) { - override fun toString() = "Admitted(at=$at, suspendCount=$suspendCount)" - } - - class Discharged(val at: Instant, suspendCount: Int, val by: Staff, val error: Throwable) : Record(suspendCount) { - override fun toString() = "Discharged(at=$at, suspendCount=$suspendCount, by=$by)" - } - } + internal val records: MutableList = mutableListOf() fun notDischargedForTheSameThingMoreThan(max: Int, by: Staff): Boolean { - val lastAdmittanceSuspendCount = (records.last() as MedicalHistory.Record.Admitted).suspendCount - return records.filterIsInstance(MedicalHistory.Record.Discharged::class.java).filter { it.by == by && it.suspendCount == lastAdmittanceSuspendCount }.count() <= max + val lastAdmittanceSuspendCount = (records.last() as MedicalRecord.Admitted).suspendCount + return records + .filterIsInstance() + .count { by in it.by && it.suspendCount == lastAdmittanceSuspendCount } <= max } override fun toString(): String = "${this.javaClass.simpleName}(records = $records)" } - override fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List) { + /** + * The flow running in [flowFiber] has errored. + */ + fun flowErrored(flowFiber: FlowFiber, currentState: StateMachineState, errors: List) { log.info("Flow ${flowFiber.id} admitted to hospital in state $currentState") - val medicalHistory = patients.computeIfAbsent(flowFiber.id) { MedicalHistory() } - medicalHistory.records += MedicalHistory.Record.Admitted(Instant.now(), currentState.checkpoint.numberOfSuspends) - for ((index, error) in errors.withIndex()) { - log.info("Flow ${flowFiber.id} has error [$index]", error) - if (!errorIsDischarged(flowFiber, currentState, error, medicalHistory)) { - // If any error isn't discharged, then we propagate. - log.warn("Flow ${flowFiber.id} error was not discharged, propagating.") - flowFiber.scheduleEvent(Event.StartErrorPropagation) - return + val suspendCount = currentState.checkpoint.numberOfSuspends + + val event = mutex.locked { + val medicalHistory = patients.computeIfAbsent(flowFiber.id) { MedicalHistory() } + + val admitted = MedicalRecord.Admitted(flowFiber.id, Instant.now(), suspendCount) + medicalHistory.records += admitted + recordsPublisher.onNext(admitted) + + val report = consultStaff(flowFiber, currentState, errors, medicalHistory) + + val (newRecord, event) = when (report.diagnosis) { + Diagnosis.DISCHARGE -> { + log.info("Flow ${flowFiber.id} error discharged from hospital by ${report.by}") + Pair(MedicalRecord.Discharged(flowFiber.id, Instant.now(), suspendCount, report.by, errors), Event.RetryFlowFromSafePoint) + } + Diagnosis.OVERNIGHT_OBSERVATION -> { + log.info("Flow ${flowFiber.id} error kept for overnight observation by ${report.by}") + // We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart + Pair(MedicalRecord.KeptInForObservation(flowFiber.id, Instant.now(), suspendCount, report.by, errors), null) + } + Diagnosis.NOT_MY_SPECIALTY -> { + // None of the staff care for these errors so we let them propagate + log.info("Flow ${flowFiber.id} error allowed to propagate") + Pair(MedicalRecord.NothingWeCanDo(flowFiber.id, Instant.now(), suspendCount), Event.StartErrorPropagation) + } } + + medicalHistory.records += newRecord + recordsPublisher.onNext(newRecord) + event + } + + if (event != null) { + flowFiber.scheduleEvent(event) } - // If all are discharged, retry. - flowFiber.scheduleEvent(Event.RetryFlowFromSafePoint) } - private fun errorIsDischarged(flowFiber: FlowFiber, currentState: StateMachineState, error: Throwable, medicalHistory: MedicalHistory): Boolean { - for (staffMember in staff) { - val diagnosis = staffMember.consult(flowFiber, currentState, error, medicalHistory) - if (diagnosis == Diagnosis.DISCHARGE) { - medicalHistory.records += MedicalHistory.Record.Discharged(Instant.now(), currentState.checkpoint.numberOfSuspends, staffMember, error) - log.info("Flow ${flowFiber.id} error discharged from hospital by $staffMember") - return true - } - } - return false + private fun consultStaff(flowFiber: FlowFiber, + currentState: StateMachineState, + errors: List, + medicalHistory: MedicalHistory): ConsultationReport { + return errors + .mapIndexed { index, error -> + log.info("Flow ${flowFiber.id} has error [$index]", error) + val diagnoses: Map> = staff.groupBy { it.consult(flowFiber, currentState, error, medicalHistory) } + // We're only interested in the highest priority diagnosis for the error + val (diagnosis, by) = diagnoses.entries.minBy { it.key }!! + ConsultationReport(error, diagnosis, by) + } + // And we're only interested in the error with the highest priority diagnosis + .minBy { it.diagnosis }!! } + private data class ConsultationReport(val error: Throwable, val diagnosis: Diagnosis, val by: List) + + /** + * The flow running in [flowFiber] has cleaned, possibly as a result of a flow hospital resume. + */ // It's okay for flows to be cleaned... we fix them now! - override fun flowCleaned(flowFiber: FlowFiber) {} + fun flowCleaned(flowFiber: FlowFiber) = Unit - override fun flowRemoved(flowFiber: FlowFiber) { - patients.remove(flowFiber.id) + /** + * The flow has been removed from the state machine. + */ + fun flowRemoved(flowFiber: FlowFiber) { + mutex.locked { patients.remove(flowFiber.id) } } + // TODO MedicalRecord subtypes can expose the Staff class, something which we probably don't want when wiring this method to RPC + /** Returns a stream of medical records as flows pass through the hospital. */ + fun track(): DataFeed, MedicalRecord> { + return mutex.locked { + DataFeed(patients.values.flatMap { it.records }, recordsPublisher.bufferUntilSubscribed()) + } + } + + sealed class MedicalRecord { + abstract val flowId: StateMachineRunId + abstract val at: Instant + abstract val suspendCount: Int + + data class Admitted(override val flowId: StateMachineRunId, + override val at: Instant, + override val suspendCount: Int) : MedicalRecord() + + data class Discharged(override val flowId: StateMachineRunId, + override val at: Instant, + override val suspendCount: Int, + val by: List, + val errors: List) : MedicalRecord() + + data class KeptInForObservation(override val flowId: StateMachineRunId, + override val at: Instant, + override val suspendCount: Int, + val by: List, + val errors: List) : MedicalRecord() + + data class NothingWeCanDo(override val flowId: StateMachineRunId, + override val at: Instant, + override val suspendCount: Int) : MedicalRecord() + } + + /** The order of the enum values are in priority order. */ enum class Diagnosis { - /** - * Retry from last safe point. - */ + /** Retry from last safe point. */ DISCHARGE, - /** - * Please try another member of staff. - */ + /** Park and await intervention. */ + OVERNIGHT_OBSERVATION, + /** Please try another member of staff. */ NOT_MY_SPECIALTY } @@ -122,7 +193,7 @@ object StaffedFlowHospital : FlowHospital { } private fun mentionsConstraintViolation(exception: Throwable?): Boolean { - return exception != null && (exception is org.hibernate.exception.ConstraintViolationException || mentionsConstraintViolation(exception.cause)) + return exception != null && (exception is ConstraintViolationException || mentionsConstraintViolation(exception.cause)) } } @@ -152,4 +223,13 @@ object StaffedFlowHospital : FlowHospital { } } } -} \ No newline at end of file + + /** + * Parks [FinalityHandler]s for observation. + */ + object FinalityDoctor : Staff { + override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: MedicalHistory): Diagnosis { + return if (currentState.flowLogic is FinalityHandler) Diagnosis.OVERNIGHT_OBSERVATION else Diagnosis.NOT_MY_SPECIALTY + } + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index d7f4166829..c69590f71f 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -92,6 +92,8 @@ interface StateMachineManager { * The event may be replayed if a flow fails and attempts to retry. */ fun deliverExternalEvent(event: ExternalEvent) + + val flowHospital: StaffedFlowHospital } // These must be idempotent! A later failure in the state transition may error the flow state, and a replay may call diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt index 7d480f5f9f..231a3b2467 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/interceptors/HospitalisingInterceptor.kt @@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap * transition. */ class HospitalisingInterceptor( - private val flowHospital: FlowHospital, + private val flowHospital: StaffedFlowHospital, private val delegate: TransitionExecutor ) : TransitionExecutor { private val hospitalisedFlows = ConcurrentHashMap() diff --git a/node/src/test/kotlin/net/corda/node/services/FinalityHandlerTest.kt b/node/src/test/kotlin/net/corda/node/services/FinalityHandlerTest.kt new file mode 100644 index 0000000000..8f4dfc408e --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/FinalityHandlerTest.kt @@ -0,0 +1,95 @@ +package net.corda.node.services + +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FinalityFlow +import net.corda.core.flows.StateMachineRunId +import net.corda.core.toFuture +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.getOrThrow +import net.corda.finance.POUNDS +import net.corda.finance.contracts.asset.Cash +import net.corda.finance.issuedBy +import net.corda.node.internal.StartedNode +import net.corda.node.services.statemachine.StaffedFlowHospital +import net.corda.node.services.statemachine.StaffedFlowHospital.MedicalRecord.KeptInForObservation +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.singleIdentity +import net.corda.testing.node.internal.InternalMockNetwork +import net.corda.testing.node.internal.InternalMockNodeParameters +import net.corda.testing.node.internal.startFlow +import org.assertj.core.api.Assertions.assertThat +import org.junit.After +import org.junit.Test + +class FinalityHandlerTest { + private lateinit var mockNet: InternalMockNetwork + + @After + fun cleanUp() { + mockNet.stopNodes() + } + + @Test + fun `sent to flow hospital on error and attempted retry on node restart`() { + // Setup a network where only Alice has the finance CorDapp and it sends a cash tx to Bob who doesn't have the + // CorDapp. Bob's FinalityHandler will error when validating the tx. + mockNet = InternalMockNetwork(cordappPackages = emptyList()) + + val alice = mockNet.createNode(InternalMockNodeParameters( + legalName = ALICE_NAME, + extraCordappPackages = listOf("net.corda.finance.contracts.asset") + )) + + var bob = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME)) + + val stx = TransactionBuilder(mockNet.defaultNotaryIdentity).let { + Cash().generateIssue( + it, + 1000.POUNDS.issuedBy(alice.info.singleIdentity().ref(0)), + bob.info.singleIdentity(), + mockNet.defaultNotaryIdentity + ) + alice.services.signInitialTransaction(it) + } + + val finalityHandlerIdFuture = bob.smm.track() + .updates + .filter { it.logic is FinalityHandler } + .map { it.logic.runId } + .toFuture() + + val finalisedTx = alice.services.startFlow(FinalityFlow(stx)).run { + mockNet.runNetwork() + resultFuture.getOrThrow() + } + val finalityHandlerId = finalityHandlerIdFuture.getOrThrow() + + bob.assertFlowSentForObservation(finalityHandlerId) + assertThat(bob.getTransaction(finalisedTx.id)).isNull() + + bob = mockNet.restartNode(bob) + // Since we've not done anything to fix the orignal error, we expect the finality handler to be sent to the hospital + // again on restart + bob.assertFlowSentForObservation(finalityHandlerId) + assertThat(bob.getTransaction(finalisedTx.id)).isNull() + } + + private fun StartedNode<*>.assertFlowSentForObservation(runId: StateMachineRunId) { + val keptInForObservation = smm.flowHospital + .track() + .let { it.updates.startWith(it.snapshot) } + .filter { it.flowId == runId } + .ofType(KeptInForObservation::class.java) + .toBlocking() + .first() + assertThat(keptInForObservation.by).contains(StaffedFlowHospital.FinalityDoctor) + } + + private fun StartedNode<*>.getTransaction(id: SecureHash): SignedTransaction? { + return database.transaction { + services.validatedTransactions.getTransaction(id) + } + } +} diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index 052a5c4037..9806f83078 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -702,9 +702,6 @@ class FlowFrameworkPersistenceTests { fun `flow loaded from checkpoint will respond to messages from before start`() { aliceNode.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) } bobNode.services.startFlow(ReceiveFlow(alice).nonTerminating()) // Prepare checkpointed receive flow - // Make sure the add() has finished initial processing. - bobNode.internals.disableDBCloseOnStop() - bobNode.dispose() // kill receiver val restoredFlow = bobNode.restartAndGetRestoredFlow() assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello") } @@ -762,14 +759,11 @@ class FlowFrameworkPersistenceTests { //////////////////////////////////////////////////////////////////////////////////////////////////////////// //region Helpers - private inline fun > StartedNode.restartAndGetRestoredFlow() = internals.run { - disableDBCloseOnStop() // Handover DB to new node copy - stop() - val newNode = mockNet.createNode(InternalMockNodeParameters(id, configuration.myLegalName)) + private inline fun > StartedNode.restartAndGetRestoredFlow(): P { + val newNode = mockNet.restartNode(this) newNode.internals.acceptableLiveFiberCountOnStop = 1 - manuallyCloseDB() mockNet.runNetwork() - newNode.getSingleFlow

().first + return newNode.getSingleFlow

().first } private fun assertSessionTransfers(vararg expected: SessionTransfer) { diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt index dbcd28796a..fca7e8687b 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt @@ -7,6 +7,8 @@ import net.corda.core.flows.FlowSession import net.corda.core.flows.InitiatedBy import net.corda.core.flows.InitiatingFlow import net.corda.core.identity.Party +import net.corda.core.internal.concurrent.flatMap +import net.corda.core.internal.packageName import net.corda.core.messaging.MessageRecipients import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.unwrap @@ -15,10 +17,13 @@ import net.corda.node.services.messaging.Message import net.corda.node.services.persistence.DBTransactionStorage import net.corda.nodeapi.internal.persistence.contextTransaction import net.corda.testing.node.internal.InternalMockNetwork +import net.corda.testing.node.internal.InternalMockNetwork.MockNode import net.corda.testing.node.internal.MessagingServiceSpy import net.corda.testing.node.internal.newContext import net.corda.testing.node.internal.setMessagingServiceSpy -import org.assertj.core.api.Assertions +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.hibernate.exception.ConstraintViolationException import org.junit.After import org.junit.Before import org.junit.Test @@ -30,21 +35,23 @@ import kotlin.test.assertNull class RetryFlowMockTest { private lateinit var mockNet: InternalMockNetwork - private lateinit var internalNodeA: StartedNode - private lateinit var internalNodeB: StartedNode + private lateinit var nodeA: StartedNode + private lateinit var nodeB: StartedNode @Before fun start() { - mockNet = InternalMockNetwork(threadPerNode = true, cordappPackages = listOf(this.javaClass.`package`.name)) - internalNodeA = mockNet.createNode() - internalNodeB = mockNet.createNode() + mockNet = InternalMockNetwork(threadPerNode = true, cordappPackages = listOf(this.javaClass.packageName)) + nodeA = mockNet.createNode() + nodeB = mockNet.createNode() mockNet.startNodes() RetryFlow.count = 0 SendAndRetryFlow.count = 0 RetryInsertFlow.count = 0 } - private fun StartedNode.startFlow(logic: FlowLogic): CordaFuture = this.services.startFlow(logic, this.services.newContext()).getOrThrow().resultFuture + private fun StartedNode.startFlow(logic: FlowLogic): CordaFuture { + return this.services.startFlow(logic, this.services.newContext()).flatMap { it.resultFuture } + } @After fun cleanUp() { @@ -53,14 +60,14 @@ class RetryFlowMockTest { @Test fun `Single retry`() { - assertEquals(Unit, internalNodeA.startFlow(RetryFlow(1)).get()) + assertEquals(Unit, nodeA.startFlow(RetryFlow(1)).get()) assertEquals(2, RetryFlow.count) } @Test fun `Retry forever`() { - Assertions.assertThatThrownBy { - internalNodeA.startFlow(RetryFlow(Int.MAX_VALUE)).getOrThrow() + assertThatThrownBy { + nodeA.startFlow(RetryFlow(Int.MAX_VALUE)).getOrThrow() }.isInstanceOf(LimitedRetryCausingError::class.java) assertEquals(5, RetryFlow.count) } @@ -68,14 +75,14 @@ class RetryFlowMockTest { @Test fun `Retry does not set senderUUID`() { val messagesSent = mutableListOf() - val partyB = internalNodeB.info.legalIdentities.first() - internalNodeA.setMessagingServiceSpy(object : MessagingServiceSpy(internalNodeA.network) { + val partyB = nodeB.info.legalIdentities.first() + nodeA.setMessagingServiceSpy(object : MessagingServiceSpy(nodeA.network) { override fun send(message: Message, target: MessageRecipients, sequenceKey: Any) { messagesSent.add(message) messagingService.send(message, target) } }) - internalNodeA.startFlow(SendAndRetryFlow(1, partyB)).get() + nodeA.startFlow(SendAndRetryFlow(1, partyB)).get() assertNotNull(messagesSent.first().senderUUID) assertNull(messagesSent.last().senderUUID) assertEquals(2, SendAndRetryFlow.count) @@ -83,26 +90,25 @@ class RetryFlowMockTest { @Test fun `Retry duplicate insert`() { - assertEquals(Unit, internalNodeA.startFlow(RetryInsertFlow(1)).get()) + assertEquals(Unit, nodeA.startFlow(RetryInsertFlow(1)).get()) assertEquals(2, RetryInsertFlow.count) } @Test fun `Patient records do not leak in hospital`() { - val patientCountBefore = StaffedFlowHospital.numberOfPatients - assertEquals(Unit, internalNodeA.startFlow(RetryFlow(1)).get()) + assertEquals(Unit, nodeA.startFlow(RetryFlow(1)).get()) // Need to make sure the state machine has finished. Otherwise this test is flakey. mockNet.waitQuiescent() - assertEquals(patientCountBefore, StaffedFlowHospital.numberOfPatients) + assertThat(nodeA.smm.flowHospital.track().snapshot).isEmpty() assertEquals(2, RetryFlow.count) } } -class LimitedRetryCausingError : org.hibernate.exception.ConstraintViolationException("Test message", SQLException(), "Test constraint") +class LimitedRetryCausingError : ConstraintViolationException("Test message", SQLException(), "Test constraint") class RetryCausingError : SQLException("deadlock") -class RetryFlow(val i: Int) : FlowLogic() { +class RetryFlow(private val i: Int) : FlowLogic() { companion object { var count = 0 } @@ -121,7 +127,7 @@ class RetryFlow(val i: Int) : FlowLogic() { } @InitiatingFlow -class SendAndRetryFlow(val i: Int, val other: Party) : FlowLogic() { +class SendAndRetryFlow(private val i: Int, private val other: Party) : FlowLogic() { companion object { var count = 0 } @@ -137,8 +143,9 @@ class SendAndRetryFlow(val i: Int, val other: Party) : FlowLogic() { } } +@Suppress("unused") @InitiatedBy(SendAndRetryFlow::class) -class ReceiveFlow2(val other: FlowSession) : FlowLogic() { +class ReceiveFlow2(private val other: FlowSession) : FlowLogic() { @Suspendable override fun call() { val received = other.receive().unwrap { it } @@ -146,7 +153,7 @@ class ReceiveFlow2(val other: FlowSession) : FlowLogic() { } } -class RetryInsertFlow(val i: Int) : FlowLogic() { +class RetryInsertFlow(private val i: Int) : FlowLogic() { companion object { var count = 0 } @@ -166,4 +173,4 @@ class RetryInsertFlow(val i: Int) : FlowLogic() { val tx = DBTransactionStorage.DBTransaction("Foo") contextTransaction.session.save(tx) } -} \ No newline at end of file +} diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt index b234afa306..a87144157b 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt @@ -40,7 +40,6 @@ import net.corda.nodeapi.internal.persistence.HibernateConfiguration import net.corda.testing.core.singleIdentity import net.corda.testing.internal.rigorousMock import net.corda.testing.node.internal.InternalMockNetwork -import net.corda.testing.node.internal.InternalMockNodeParameters import net.corda.testing.node.internal.startFlow import org.junit.After import org.junit.Test @@ -81,8 +80,7 @@ class NodePair(private val mockNet: InternalMockNetwork) { client.services.startFlow(clientLogic) while (!serverRunning.get()) mockNet.runNetwork(1) if (rebootClient) { - client.dispose() - client = mockNet.createNode(InternalMockNodeParameters(client.internals.id)) + client = mockNet.restartNode(client) } return uncheckedCast(client.smm.allStateMachines.single().stateMachine) } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt index f2115e32a1..f43b356e6b 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt @@ -395,6 +395,17 @@ open class InternalMockNetwork(private val cordappPackages: List, return node } + fun restartNode(node: StartedNode, nodeFactory: (MockNodeArgs) -> N): StartedNode { + node.internals.disableDBCloseOnStop() + node.dispose() + return createNode( + InternalMockNodeParameters(legalName = node.internals.configuration.myLegalName, forcedID = node.internals.id), + nodeFactory + ) + } + + fun restartNode(node: StartedNode): StartedNode = restartNode(node, defaultFactory) + fun baseDirectory(nodeId: Int): Path = filesystem.getPath("/nodes/$nodeId") /** @@ -449,8 +460,7 @@ open class InternalMockNetwork(private val cordappPackages: List, messagingNetwork.stop() } - // Test method to block until all scheduled activity, active flows - // and network activity has ceased. + /** Block until all scheduled activity, active flows and network activity has ceased. */ fun waitQuiescent() { busyLatch.await() }