When a non-database exception is thrown out of a `withEntityManager`
block, always check if the session needs to be rolled back.
This means if a database error is caught and a new non-database error is
thrown out of the `withEntityManager` block, the transaction is still
rolled back. The flow can then continue progressing as normal.
* CORDA-3722 withEntityManager can rollback its session
Improve the handling of database transactions when using
`withEntityManager` inside a flow.
Extra changes have been included to improve the safety and
correctness of Corda around handling database transactions.
This focuses on allowing flows to catch errors that occur inside an
entity manager and handle them accordingly.
Errors can be caught in two places:
- Inside `withEntityManager`
- Outside `withEntityManager`
Further changes have been included to ensure that transactions are
rolled back correctly.
Errors caught inside `withEntityManager` require the flow to manually
`flush` the current session (the entity manager's individual session).
By manually flushing the session, a `try-catch` block can be placed
around the `flush` call, allowing possible exceptions to be caught.
Once an error is thrown from a call to `flush`, it is no longer possible
to use the same entity manager to trigger any database operations. The
only possible option is to rollback the changes from that session.
The flow can continue executing updates within the same session but they
will never be committed. What happens in this situation should be handled
by the flow. Explicitly restricting the scenario requires a lot of effort
and code. Instead, we should rely on the developer to control complex
workflows.
To continue updating the database after an error like this occurs, a new
`withEntityManager` block should be used (after catching the previous
error).
Exceptions can be caught around `withEntityManager` blocks. This allows
errors to be handled in the same way as stated above, except the need to
manually `flush` the session is removed. `withEntityManager` will
automatically `flush` a session if it has not been marked for rollback
due to an earlier error.
A `try-catch` can then be placed around the whole of the
`withEntityManager` block, allowing the error to be caught while not
committing any changes to the underlying database transaction.
To make `withEntityManager` blocks work like mini database transactions,
save points have been utilised. A new savepoint is created when opening
a `withEntityManager` block (along with a new session). It is then used
as a reference point to rollback to if the session errors and needs to
roll back. The savepoint is then released (independently from
completing successfully or failing).
Using save points means, that either all the statements inside the
entity manager are executed, or none of them are.
- A new session is created every time an entity manager is requested,
but this does not replace the flow's main underlying database session.
- `CordaPersistence.transaction` can now determine whether it needs
to execute its extra error handling code. This is needed to allow errors
escape `withEntityManager` blocks while allowing some of our exception
handling around subscribers (in `NodeVaultService`) to continue to work.
## Summary
This change deals with multiple issues:
* Errors that occur during flow initialisation.
* Errors that occur when handling the outcome of an existing flow error.
* Failures to rollback and close a database transaction when an error
occurs in `TransitionExecutorImpl`.
* Removal of create and commit transaction actions around retrying a flow.
## Errors that occur during flow initialisation
Flow initialisation has been moved into the try/catch that exists inside
`FlowStateMachineImpl.run`. This means if an error is thrown all the way
out of `initialiseFlow` (which should rarely happen) it will be caught
and move into a flow's standard error handling path. The flow should
then properly terminate.
`Event.Error` was changed to make the choice to rollback be optional.
Errors during flow initialisation cause the flow to not have a open
database transaction. Therefore there is no need to rollback.
## Errors that occur when handling the outcome of an existing flow error
When an error occurs a flow goes to the flow hospital and is given an
outcome event to address the original error. If the transition that was
processing the error outcome event (`StartErrorPropagation` and
`RetryFlowFromSafePoint`) has an error then the flow aborts and
nothing happens. This means that the flow is left in a runnable state.
To resolve this, we now retry the original error outcome event whenever
another error occurs doing so.
This is done by adding a new staff member that looks for
`ErrorStateTransitionException` thrown in the error code path of
`TransitionExecutorImpl`. It then takes the last outcome for that flow
and schedules it to run again. This scheduling runs with a backoff.
This means that a flow will continually retry the original error outcome
event until it completes it successfully.
## Failures to rollback and close a database transaction when an error occurs in `TransitionExecutorImpl`
Rolling back and closing the database transaction inside of
`TransitionExecutorImpl` is now done inside individual try/catch blocks
as this should not prevent the flow from continuing.
## Removal of create and commit transaction actions around retrying a flow
The database commit that occurs after retrying a flow can fail which
required some custom code just for that event to prevent inconsistent
behaviour. The transaction was only needed for reading checkpoints from
the database, therefore the transaction was moved into
`retryFlowFromSafePoint` instead and the commit removed.
If we need to commit data inside of `retryFlowFromSafePoint` in the
future, a commit should be added directly to `retryFlowFromSafePoint`.
The commit should occur before the flow is started on a new fiber.
* Run serialisation tests with both in-process and out-of-process nodes.
* Add custom serialisers and whitelists to Driver's AMQPServerSerializationScheme.
* CORDA-3669 Do not execute `ExecuteAsyncOperation` multiple times
When a `FlowExternalOperation` or `FlowExternalAsyncOperation` executes
and completes a flag (`isFlowResumed`) is switched to true.
This flag was used inside of `DoRemainingWorkTransition` to decide
whether to skip over the execution of an event.
Since this flag was being switched to true when the external operation's
future completed, it was possible for _unexpected_ events to be placed
in the fiber's queue that would retrigger the
`FlowIORequest.ExecuteAsyncOperation`, that is held as the checkpoint's
next `FlowIORequest`to process.
By using the existing `StateMachineState.isTransactionTracked` (and
renaming it to `isWaitingForFuture`) we can decide to not process the
`FlowIORequest.ExecuteAsyncOperation` if it has already been called
before. This moves this code path in line with
`FlowIORequest.WaitForLedgerCommit`.
Random `DoRemainingWork` events can now be pushed to the fiber's queue
without causing the `FlowIORequest.ExecuteAsyncOperation` to execute
again.
* CORDA-3651: addManifest now uses separate files for reading and writing.
* CORDA-3651: The jar scanning loader now closes itsself.
Co-authored-by: Adel El-Beik <adelel-beik@19LDN-MAC108.local>
* CORDA-3644: Scan the CorDapp classloader directly for SerializationWhitelist.
* CORDA-3644: Filter CorDapps from out-of-process node classpaths by their manifest attributes. Also exclude directories and blatant test artifacts.
* Fix IRS Demo - its "tests" artifact had a non-standard classifier of "test".
* CORDA-3484: Now cope with 2 contract jars with same hash but different name, we just select one and use that.
* ENT-3584: Contract jars are now generated on the fly.
* CORDA-3584: Reverted changes to CordappProviderImpl. Exception is raised if node started with multiple jars with same hash.
* ENT-3584: Fixing test failure.
* CORDA-3584: Switch to test extension method instead of reflection to access internal member.
* ENT-3584: Address review comment. Dont fully qualify exception.
* CORDA-3584: Address review comment and converted lazy to a resettable one.
* CORDA-3584: Removed unused logger.
* CORDA-3584: Fixed visibility.
* CORDA-3584: Removed synchronized
* CORDA-3584: Removed CordappResolver
* CORDA-3584: Reverted change in gradle file and fixed test.
* CORDA-3584: Removed V3 from test description as it wasn't actually V3 specific.
* CORDA-3584: Address review comment. Let classes be garbage collected.
* Throw SQLException or PersistenceException plain, that may come out of an unsafe subscriber
* Add explanatory comment about why we changed Observer.tee to use unsafe subscribe
* Introducing not unsubscribing version of Rx.Subscriber
* Wrap PublishSubjects with FlowSafeSubjects in all tests that test Observer.tee
* Minor code formatting
* Make rawUpdates Rx.Observers not unsubscribe when accessed from CordaServices - Do not allow rawUpdates subscribing from flows
* Warning fix: Add else block to when statement
* Revert "Wrap PublishSubjects with FlowSafeSubjects in all tests that test Observer.tee"
This reverts commit e419af86
* Correcting log message
* Improve log message
* Add fiber's id to log message and exception message
* Added test, asserting FlowSafeSubscriber is alive and re-accessed upon flow retry
* Logging flow name instead of flow id at VaultService.rawUpdates subscribing error
* Add kdoc to OnNextFailedException
* Minor text correction
* Update kdocs of FlowSafeSubject/ PreventSubscriptionsSubject
* Moved FlowSafeSubject under package node.internal as it is only used by NodeVaultService
* Add comment and update kdoc explaining how to subscribe with SafeSubscriber to FlowSafeSubject
* Change PreventSubscriptionsSubject#errorAction to be more specific; to return an Exception
* Minor text update
* Update messy comment
* Replace assertThat with assertEquals
* Splitting heartBeat to heartBeat1 and hearBeat2 for more clear asserting
* Correcting comment
* Update messy comment
* Splitting heartBeat into heartBeatOnNext and heartBeatOnError
* Update test name
* Add explanatory comment to test
* Update test name
* Update test and add test comment
* Moving NotarisedTxs from SendStateFlow to VaultObserverExceptionTest inside NodeHandle.getNotarisedTransactionIds
* Moving SubscribingRawUpdatesFlow from ErrorHandling to VaultObserverExceptionTest
* Update kdoc of FlowSafeSubscriber and FlowSafeSubscriber.onNext
* Make kdoc more clear
* Throw exception upon accessing VaultService.rawUpdates from within a flow
* Changing exception thrown when accessing VaultService.rawUpdates from within a flow to a CordaRuntimeException
* Minor kdoc update
* Update test comment
* Update kdoc of FlowSafeSubscriber
* Introducing Observable.flowSafeSubscribe public API method to subscribe with -non unsubscribing- Rx.Subscribers to Observables. It also replaced FlowSafeSubject
* Move CustomSafeSubscriber outside test methods
* Minor text update
* Add timeout to tests
* Update kdoc of flowSafeSubscribe
* Update kdoc of flowSafeSubscribe
* Update kdoc of flowSafeSubscribe
* Move FlowSafeSubscriber and flowSafeSubscribe under their own package
* Fix detekt issue
* Update Detekt baseline
* Revert "Update Detekt baseline"
This reverts commit 793a8ed9
* Fix Detekt issue
* Moved strictMode flag from flowSafeSubscribe to OnFlowSafeSubscribe
Moved OnFlowSafeSubscribe into internal package
Integration tested flowSafeLooseSubscribe
* Suppress Rx Deprecation
* Rename flowSafeSubscribe to flowSafeObservable
* Renaming flowSafeObservable to continueOnError and FlowSafeSubscriber to ResilientSubscriber
* [NOTICK] Add a custom detekt rule for tests with no timeout, and fix remaining missing timeouts
* [NOTICK] Add a test for custom detekt rules and tidying
* add timeout annotation to new test
Co-authored-by: Stefano Franz <roastario@gmail.com>
* TM-197 Setting bouncy castle provider in order for the test to pass
* TM-197 setting timeout for all builds at 3 hours
* TM-197 ignoring unstable tests
* TM-197 switching 4.3 to use local k8s instances and also make the maximum duration of builds 3 hours, fix 1 test and ignore 2 flaky ones
* update to use local-k8s version of the plugin
Co-authored-by: Stefano Franz <roastario@gmail.com>
* Make tee not wrap PublishSubjects in SafeSubscribers, otherwise a non Rx exception from an unsafe observer shuts down all other observers under the same PublishSubject
* Throw SQLException or PersistenceException plain, that may come out of an unsafe subscriber
* Revert "Throw SQLException or PersistenceException plain, that may come out of an unsafe subscriber"
This reverts commit c7b8af3fa6.
* Update Detekt baseline
* Just passing in the exception was printing the entire stacktrace to the log, now we just pring the exception and message
* Updating exception message
Observers registered on NodeVaultService#rawUpdates, if they throw an exception when called from serviceHub#recordTransactions and if this exception is not handled by the flow hospital, then this leads to the transaction not being recorded in the local vault. This could get the ledger in an out of sync state.
In the specific case this happens within FinalityFlow#notariseAndRecord this leads to the transaction being notarized but not recorded in the local vault nor broadcasted in any counter party. The -failed to be recorded locally- transaction and its output states are not visible to any vault, and its input states not able to consumed by a new transaction, since they are recorded as consumed within the Notary. In this specific case we need not loose, by any means, the current transaction.
We will handle all cases by catching all exceptions thrown from serviceHub#recordTransactions, wrapping them with a HospitalizeFlowException and throwing it instead. The flow will get to the hospital for observation to be retried from previous checkpoint on next node restart.
Use `flowId` from `ExternalMessageEvent` when failing to init sessions instead of generating
a new random UUID. The a `flowId` is generated and stored inside the event after the
state machine work that done previously.
Cap the default size of the external operation thread pool to 10 or
the maximum number of available processors, whichever is smaller.
Set the minimum size of the thread pool to 1. Meaning that only a
single thread is used unless the node actually starts to use
`FlowExternalOperation` which consumes threads from this pool.
* CORDA-2942: Allow exception from `CordaService` creation to propagate
It will ultimately be thrown from Node's `start()` method terminating the node start-up sequence.
* CORDA-2942: Be lenient when retrievign the name of the Notary
Some tests setup such that they do nto have Notary running.
* CORDA-3549: Improve stability of `CordaServiceLifecycleFatalTests`
* CORDA-3549: Bump-up reps count to ensure that test is definitely not flaky when executed by CI
(once proved the number of reps will be reduced)
* CORDA-3549: Making Detekt happier
* CORDA-2942: Ensure `NodeLifecycleEventsDistributor` cleans-up smoothly when node shuts down
Deprecate FlowAsyncOperation and reimplement public versions FlowExternalOperation and FlowExternalAsyncOperation.
await added to FlowLogic to allow easy calling from both Java and Kotlin. There are two overrides of await (one for FlowExternalOperation and FlowExternalAsyncOperation).
Implementations of FlowExternalOperation return a result (written as blocking code) from their execute function. This operation will then be executed using a thread provided by the externalOperationExecutor.
Implementations of FlowExternalAsyncOperation return a future from their execute function. This operation must be executed on a newly spawned thread or one provided by a thread pool. It is up to developers to handle threading in this scenario.
The default thread pool (externalOperationExecutor) can be configured through the flowExternalOperationThreadPoolSize node config.
The current implementation leaves FlowAsyncOperation alone, meaning that any developers that have used it (even though it is internal) won't need to change their apps. If this was not concern I would delete it completely and replumb the state machine code. Instead, it has been marked with @DoNotImplement and executeAsync is annotated with @Deprecated
* CORDA-2942: Port minimal set of changes to make lifecycle events work
... and make codebase compile.
* CORDA-2942: Undo some changes which are not strictly speaking necessary
* CORDA-2942: Make `NodeServicesContext` leaner and delete `extensions-api` module
* CORDA-2942: Reduce even more number of files affected
* CORDA-2942: Integration test fix
* CORDA-2942: Make events `AfterStart` and `BeforeStop` generic w.r.t. `NodeServicesContext`
* CORDA-2942: `NodeLifecycleObserverService` and a set of integration tests.
Public API violations are expected as well as integration tests failing.
* CORDA-2942: Re-work to introduce `ServiceLifecycleObserver`
* CORDA-2942: Explicitly mention a type of exception that may be thrown for some events.
* CORDA-2942: Register `ServiceLifecycleObserver` through `AppServiceHub`
* CORDA-2942: Fix integration test + KDocs update
* CORDA-2942: Detekt and `api-current` update
* CORDA-2942: Improvement to `CordaServiceLifecycleFatalTests`
... or else it has side effects on other tests.
* CORDA-2942: Add an integration test for new API use in Java
Driver test is written in Kotlin, but services definition is written in Java.
Also KDocs improvements.
* CORDA-2942: Documentation and release notes update
* CORDA-2942: First set of changes following review by @mnesbit
* CORDA-2942: Second set of changes following review by @mnesbit
* CORDA-2942: Added multi-threaded test
* CORDA-2942: Fixes
* CORDA-2942: Undo changes to `api-current.txt`
* CORDA-2942: Bare mimimum change to `api-current.txt` for CI gate to pass.
* CORDA-2942: Address review feedback from @rick-r3
* CORDA-2942: Detekt update
* CORDA-2942: Delete `ServiceLifecycleObserverPriority` and replace it with `Int` after discussion with @mnesbit
* CORDA-2942: Introduce more `NodeLifecycleEvent` and switch services to listen for those events
* CORDA-2942: Few more changes after input from @rick-r3
* First stub on integration test
Unfinished - hang on issue and pay
* CORDA-2942: Switch to use out-of-process nodes for the inetgration test
Currently Alice and Notary stuck waiting to hear from each other.
* CORDA-2942: Extra log lines during event distribution
* CORDA-2942: Asynchronously distribute lifecycle events
* CORDA-2942: Await for complete P2P client start-up
Next step: Add vault query to integration test
* CORDA-2942: Asynchronously distribute lifecycle events
Next step: Improve integration test
* CORDA-2942: Fix test broken by recent changes and improve logging
* CORDA-2942: Improvement of the test to be able to monitor actions performed by @CordaService in the remote process
* CORDA-2942: Add node re-start step to the integration test
* CORDA-2942: Remove `CORDAPP_STOPPED` event for now
* CORDA-2942: s/CORDAPP_STARTED/STATE_MACHINE_STARTED/
* CORDA-2942: Inverse the meaning of `priority` as requested by @rick-r3
* CORDA-2942: Register `AppServiceHubImpl` for lifecycle events and put a warning when SMM is not ready.