mirror of
https://github.com/corda/corda.git
synced 2025-04-07 11:27:01 +00:00
CORDA-3381: Errors in vault updates publisher are unsubscribing stopping observers from working (#5912)
* Throw SQLException or PersistenceException plain, that may come out of an unsafe subscriber * Add explanatory comment about why we changed Observer.tee to use unsafe subscribe * Introducing not unsubscribing version of Rx.Subscriber * Wrap PublishSubjects with FlowSafeSubjects in all tests that test Observer.tee * Minor code formatting * Make rawUpdates Rx.Observers not unsubscribe when accessed from CordaServices - Do not allow rawUpdates subscribing from flows * Warning fix: Add else block to when statement * Revert "Wrap PublishSubjects with FlowSafeSubjects in all tests that test Observer.tee" This reverts commit e419af86 * Correcting log message * Improve log message * Add fiber's id to log message and exception message * Added test, asserting FlowSafeSubscriber is alive and re-accessed upon flow retry * Logging flow name instead of flow id at VaultService.rawUpdates subscribing error * Add kdoc to OnNextFailedException * Minor text correction * Update kdocs of FlowSafeSubject/ PreventSubscriptionsSubject * Moved FlowSafeSubject under package node.internal as it is only used by NodeVaultService * Add comment and update kdoc explaining how to subscribe with SafeSubscriber to FlowSafeSubject * Change PreventSubscriptionsSubject#errorAction to be more specific; to return an Exception * Minor text update * Update messy comment * Replace assertThat with assertEquals * Splitting heartBeat to heartBeat1 and hearBeat2 for more clear asserting * Correcting comment * Update messy comment * Splitting heartBeat into heartBeatOnNext and heartBeatOnError * Update test name * Add explanatory comment to test * Update test name * Update test and add test comment * Moving NotarisedTxs from SendStateFlow to VaultObserverExceptionTest inside NodeHandle.getNotarisedTransactionIds * Moving SubscribingRawUpdatesFlow from ErrorHandling to VaultObserverExceptionTest * Update kdoc of FlowSafeSubscriber and FlowSafeSubscriber.onNext * Make kdoc more clear * Throw exception upon accessing VaultService.rawUpdates from within a flow * Changing exception thrown when accessing VaultService.rawUpdates from within a flow to a CordaRuntimeException * Minor kdoc update * Update test comment * Update kdoc of FlowSafeSubscriber * Introducing Observable.flowSafeSubscribe public API method to subscribe with -non unsubscribing- Rx.Subscribers to Observables. It also replaced FlowSafeSubject * Move CustomSafeSubscriber outside test methods * Minor text update * Add timeout to tests * Update kdoc of flowSafeSubscribe * Update kdoc of flowSafeSubscribe * Update kdoc of flowSafeSubscribe * Move FlowSafeSubscriber and flowSafeSubscribe under their own package * Fix detekt issue * Update Detekt baseline * Revert "Update Detekt baseline" This reverts commit 793a8ed9 * Fix Detekt issue * Moved strictMode flag from flowSafeSubscribe to OnFlowSafeSubscribe Moved OnFlowSafeSubscribe into internal package Integration tested flowSafeLooseSubscribe * Suppress Rx Deprecation * Rename flowSafeSubscribe to flowSafeObservable * Renaming flowSafeObservable to continueOnError and FlowSafeSubscriber to ResilientSubscriber
This commit is contained in:
parent
9dec5aac4b
commit
2c9c2985c0
@ -192,6 +192,9 @@ fun <T> Observable<T>.bufferUntilSubscribed(): Observable<T> {
|
||||
@DeleteForDJVM
|
||||
fun <T> Observer<T>.tee(vararg teeTo: Observer<T>): Observer<T> {
|
||||
val subject = PublishSubject.create<T>()
|
||||
// use unsafe subscribe, so that the teed subscribers will not get wrapped with SafeSubscribers,
|
||||
// therefore a potential raw exception (non Rx) coming from a child -unsafe subscribed- observer
|
||||
// will not unsubscribe all of the subscribers under the PublishSubject.
|
||||
subject.unsafeSubscribe(Subscribers.from(this))
|
||||
teeTo.forEach { subject.unsafeSubscribe(Subscribers.from(it)) }
|
||||
return subject
|
||||
|
@ -0,0 +1,13 @@
|
||||
@file:JvmName("Observables")
|
||||
package net.corda.core.observable
|
||||
|
||||
import net.corda.core.observable.internal.OnResilientSubscribe
|
||||
import rx.Observable
|
||||
|
||||
/**
|
||||
* [Observable.continueOnError] is used to return an Observable, through which we can subscribe non unsubscribing [rx.Observer]s
|
||||
* to the source [Observable]. Namely, it makes the [rx.Observer]s resilient to exceptions coming out of [rx.Observer.onNext].
|
||||
*
|
||||
* [Observable.continueOnError] should be called before every subscribe to have the aforementioned effect.
|
||||
*/
|
||||
fun <T> Observable<T>.continueOnError(): Observable<T> = Observable.unsafeCreate(OnResilientSubscribe(this, true))
|
@ -0,0 +1,120 @@
|
||||
package net.corda.core.observable.internal
|
||||
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
import rx.Observable
|
||||
import rx.Observer
|
||||
import rx.Subscriber
|
||||
import rx.exceptions.CompositeException
|
||||
import rx.exceptions.Exceptions
|
||||
import rx.exceptions.OnErrorFailedException
|
||||
import rx.exceptions.OnErrorNotImplementedException
|
||||
import rx.internal.util.ActionSubscriber
|
||||
import rx.observers.SafeSubscriber
|
||||
import rx.plugins.RxJavaHooks
|
||||
import rx.plugins.RxJavaPlugins
|
||||
import rx.subjects.Subject
|
||||
|
||||
/**
|
||||
* Extends [SafeSubscriber] to override [SafeSubscriber.onNext], [SafeSubscriber.onError] and [SafeSubscriber._onError].
|
||||
*
|
||||
* [ResilientSubscriber] will not set [SafeSubscriber.done] flag to true nor will call [SafeSubscriber.unsubscribe] upon
|
||||
* error inside [Observer.onNext]. This way, the [ResilientSubscriber] will not get unsubscribed and therefore the underlying [Observer]
|
||||
* will not get removed.
|
||||
*
|
||||
* An [Observer] that will not get removed due to errors in [onNext] events becomes useful when an unsubscribe could
|
||||
* lead to a malfunctioning CorDapp, due to a single isolated error. If the [Observer] gets removed,
|
||||
* it will no longer be available the next time any events are pushed from the base [Subject].
|
||||
*/
|
||||
@VisibleForTesting
|
||||
class ResilientSubscriber<T>(actual: Subscriber<in T>) : SafeSubscriber<T>(actual) {
|
||||
|
||||
/**
|
||||
* Duplicate of [SafeSubscriber.onNext]. However, it ignores [SafeSubscriber.done] flag.
|
||||
* It only delegates to [SafeSubscriber.onError] if it wraps an [ActionSubscriber] which is
|
||||
* a leaf in an Subscribers' tree structure.
|
||||
*/
|
||||
@Suppress("TooGenericExceptionCaught")
|
||||
override fun onNext(t: T) {
|
||||
try {
|
||||
actual.onNext(t)
|
||||
} catch (e: Throwable) {
|
||||
if (actual is ActionSubscriber) {
|
||||
// this Subscriber wraps an ActionSubscriber which is always a leaf Observer, then call user-defined onError
|
||||
Exceptions.throwOrReport(e, this)
|
||||
} else {
|
||||
// this Subscriber may wrap a non leaf Observer. In case the wrapped Observer is a PublishSubject then we
|
||||
// should not call onError because PublishSubjectState.onError will shut down all of the Observers under it
|
||||
throw OnNextFailedException(
|
||||
"Observer.onNext failed, this is a non leaf ResilientSubscriber, therefore onError will be skipped", e
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Duplicate of [SafeSubscriber.onError]. However, it will not set [SafeSubscriber.done] flag to true.
|
||||
*/
|
||||
override fun onError(e: Throwable) {
|
||||
Exceptions.throwIfFatal(e)
|
||||
_onError(e)
|
||||
}
|
||||
|
||||
/**
|
||||
* Duplicate of [SafeSubscriber._onError]. However, it will not call [Subscriber.unsubscribe].
|
||||
*/
|
||||
@Suppress("TooGenericExceptionCaught")
|
||||
override fun _onError(e: Throwable) {
|
||||
@Suppress("DEPRECATION")
|
||||
RxJavaPlugins.getInstance().errorHandler.handleError(e)
|
||||
try {
|
||||
actual.onError(e)
|
||||
} catch (e: OnErrorNotImplementedException) {
|
||||
throw e
|
||||
} catch (e2: Throwable) {
|
||||
RxJavaHooks.onError(e2)
|
||||
throw OnErrorFailedException(
|
||||
"Error occurred when trying to propagate error to Observer.onError", CompositeException(listOf(e, e2))
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* We throw [OnNextFailedException] to pass the exception back through the preceding [Subscriber] chain
|
||||
* without triggering any [SafeSubscriber.onError]s. Since we are extending an [OnErrorNotImplementedException]
|
||||
* the exception will be re-thrown at [Exceptions.throwOrReport].
|
||||
*/
|
||||
@VisibleForTesting
|
||||
class OnNextFailedException(message: String, cause: Throwable) : OnErrorNotImplementedException(message, cause)
|
||||
|
||||
/**
|
||||
* [OnResilientSubscribe] returns an [Observable] holding a reference to the source [Observable]. Upon subscribing to it,
|
||||
* when reaching [call] method, if the subscriber passed in [isSafeSubscriber] it will unwrap the [Observer] from
|
||||
* the [SafeSubscriber], re-wrap it with [ResilientSubscriber] and then subscribe it to the source [Observable].
|
||||
*
|
||||
* In case we need to subscribe with a [SafeSubscriber] to the source [Observable] via [OnResilientSubscribe], we have to:
|
||||
* 1. Declare a custom SafeSubscriber extending [SafeSubscriber].
|
||||
* 2. Wrap our [rx.Observer] -to be subscribed to the source [Observable]- with the custom SafeSubscriber.
|
||||
* 3. Create a [OnResilientSubscribe] object with [strictMode] = false.
|
||||
* 3. Call [Observable.unsafeCreate] passing in as argument the [OnResilientSubscribe].
|
||||
* 4. Subscribe to the returned [Observable] passing in as argument the custom SafeSubscriber.
|
||||
*/
|
||||
class OnResilientSubscribe<T>(val source: Observable<T>, private val strictMode: Boolean): Observable.OnSubscribe<T> {
|
||||
|
||||
override fun call(subscriber: Subscriber<in T>) {
|
||||
if (isSafeSubscriber(subscriber)) {
|
||||
source.unsafeSubscribe(ResilientSubscriber((subscriber as SafeSubscriber).actual))
|
||||
} else {
|
||||
source.unsafeSubscribe(subscriber)
|
||||
}
|
||||
}
|
||||
|
||||
private fun isSafeSubscriber(subscriber: Subscriber<*>): Boolean {
|
||||
return if (strictMode) {
|
||||
// In strictMode mode we capture SafeSubscriber subclasses as well
|
||||
SafeSubscriber::class.java.isAssignableFrom(subscriber::class.java)
|
||||
} else {
|
||||
subscriber::class == SafeSubscriber::class
|
||||
}
|
||||
}
|
||||
}
|
@ -1,34 +1,49 @@
|
||||
package net.corda.node.services.vault
|
||||
|
||||
import co.paralleluniverse.strands.concurrent.Semaphore
|
||||
import com.r3.dbfailure.contracts.DbFailureContract
|
||||
import com.r3.dbfailure.workflows.CreateStateFlow
|
||||
import com.r3.dbfailure.workflows.CreateStateFlow.Initiator
|
||||
import com.r3.dbfailure.workflows.CreateStateFlow.errorTargetsToNum
|
||||
import com.r3.dbfailure.workflows.DbListenerService
|
||||
import com.r3.dbfailure.workflows.DbListenerService.MakeServiceThrowErrorFlow
|
||||
import com.r3.dbfailure.workflows.SendStateFlow
|
||||
import com.r3.transactionfailure.workflows.ErrorHandling
|
||||
import com.r3.transactionfailure.workflows.ErrorHandling.CheckpointAfterErrorFlow
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.contracts.UniqueIdentifier
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.vault.QueryCriteria
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.Permissions
|
||||
import net.corda.node.services.statemachine.StaffedFlowHospital
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.NodeHandle
|
||||
import net.corda.testing.driver.OutOfProcess
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.node.User
|
||||
import net.corda.testing.node.internal.findCordapp
|
||||
import org.assertj.core.api.Assertions
|
||||
import org.junit.After
|
||||
import org.junit.Assert
|
||||
import org.junit.Test
|
||||
import java.lang.IllegalStateException
|
||||
import java.sql.SQLException
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.TimeoutException
|
||||
import javax.persistence.PersistenceException
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
@ -49,6 +64,10 @@ class VaultObserverExceptionTest {
|
||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
|
||||
StaffedFlowHospital.onFlowAdmitted.clear()
|
||||
DbListenerService.onError = null
|
||||
DbListenerService.safeSubscription = true
|
||||
DbListenerService.onNextVisited = {}
|
||||
DbListenerService.onErrorVisited = null
|
||||
DbListenerService.withCustomSafeSubscriber = false
|
||||
}
|
||||
|
||||
/**
|
||||
@ -74,9 +93,43 @@ class VaultObserverExceptionTest {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||
aliceNode.rpc.startFlow(
|
||||
::Initiator,
|
||||
"Syntax Error in Custom SQL",
|
||||
CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError)
|
||||
CreateStateFlow::Initiator,
|
||||
"Syntax Error in Custom SQL",
|
||||
CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError)
|
||||
).returnValue.then { testControlFuture.complete(false) }
|
||||
val foundExpectedException = testControlFuture.getOrThrow(30.seconds)
|
||||
|
||||
Assert.assertTrue(foundExpectedException)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Causing an SqlException via a syntax error in a vault observer causes the flow to hit the
|
||||
* DatabsaseEndocrinologist in the FlowHospital and being kept for overnight observation - Unsafe subscribe
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun unhandledSqlExceptionFromVaultObserverGetsHospitalisedUnsafeSubscription() {
|
||||
DbListenerService.safeSubscription = false
|
||||
val testControlFuture = openFuture<Boolean>().toCompletableFuture()
|
||||
|
||||
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
|
||||
when (it) {
|
||||
is SQLException -> {
|
||||
testControlFuture.complete(true)
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
driver(DriverParameters(
|
||||
startNodesInProcess = true,
|
||||
cordappsForAllNodes = testCordapps())) {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||
aliceNode.rpc.startFlow(
|
||||
CreateStateFlow::Initiator,
|
||||
"Syntax Error in Custom SQL",
|
||||
CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError)
|
||||
).returnValue.then { testControlFuture.complete(false) }
|
||||
val foundExpectedException = testControlFuture.getOrThrow(30.seconds)
|
||||
|
||||
@ -103,7 +156,7 @@ class VaultObserverExceptionTest {
|
||||
cordappsForAllNodes = testCordapps())) {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||
aliceNode.rpc.startFlow(::Initiator, "Exception", CreateStateFlow.errorTargetsToNum(
|
||||
aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "Exception", CreateStateFlow.errorTargetsToNum(
|
||||
CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions,
|
||||
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
|
||||
waitUntilHospitalised.acquire() // wait here until flow gets hospitalised
|
||||
@ -131,9 +184,9 @@ class VaultObserverExceptionTest {
|
||||
cordappsForAllNodes = testCordapps())) {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||
aliceNode.rpc.startFlow(::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum(
|
||||
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter,
|
||||
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
|
||||
aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum(
|
||||
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter,
|
||||
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
|
||||
waitUntilHospitalised.acquire() // wait here until flow gets hospitalised
|
||||
}
|
||||
|
||||
@ -167,7 +220,7 @@ class VaultObserverExceptionTest {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||
assertFailsWith<TimeoutException>("PersistenceException") {
|
||||
aliceNode.rpc.startFlow(::Initiator, "EntityManager", errorTargetsToNum(
|
||||
aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "EntityManager", errorTargetsToNum(
|
||||
CreateStateFlow.ErrorTarget.TxInvalidState))
|
||||
.returnValue.getOrThrow(30.seconds)
|
||||
}
|
||||
@ -200,7 +253,7 @@ class VaultObserverExceptionTest {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||
val flowHandle = aliceNode.rpc.startFlow(
|
||||
::Initiator, "EntityManager",
|
||||
CreateStateFlow::Initiator, "EntityManager",
|
||||
CreateStateFlow.errorTargetsToNum(
|
||||
CreateStateFlow.ErrorTarget.TxInvalidState,
|
||||
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
|
||||
@ -228,7 +281,7 @@ class VaultObserverExceptionTest {
|
||||
cordappsForAllNodes = testCordapps())) {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||
val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
|
||||
val flowHandle = aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
|
||||
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError,
|
||||
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
|
||||
val flowResult = flowHandle.returnValue
|
||||
@ -251,7 +304,7 @@ class VaultObserverExceptionTest {
|
||||
cordappsForAllNodes = testCordapps())) {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||
val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
|
||||
val flowHandle = aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
|
||||
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError,
|
||||
CreateStateFlow.ErrorTarget.ServiceSwallowErrors))
|
||||
val flowResult = flowHandle.returnValue
|
||||
@ -281,20 +334,20 @@ class VaultObserverExceptionTest {
|
||||
}
|
||||
|
||||
driver(DriverParameters(
|
||||
inMemoryDB = false,
|
||||
startNodesInProcess = true,
|
||||
isDebug = true,
|
||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.transactionfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")))) {
|
||||
inMemoryDB = false,
|
||||
startNodesInProcess = true,
|
||||
isDebug = true,
|
||||
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.transactionfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")))) {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val node = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||
|
||||
node.rpc.startFlow(::CheckpointAfterErrorFlow, CreateStateFlow.errorTargetsToNum(
|
||||
CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions, // throw not persistence exception
|
||||
CreateStateFlow.ErrorTarget.FlowSwallowErrors
|
||||
)
|
||||
)
|
||||
)
|
||||
waitUntilHospitalised.acquire()
|
||||
|
||||
@ -329,9 +382,9 @@ class VaultObserverExceptionTest {
|
||||
cordappsForAllNodes = testCordapps())) {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
|
||||
aliceNode.rpc.startFlow(::Initiator, "Exception", CreateStateFlow.errorTargetsToNum(
|
||||
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter,
|
||||
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
|
||||
aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "Exception", CreateStateFlow.errorTargetsToNum(
|
||||
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter,
|
||||
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
|
||||
waitUntilHospitalised.acquire() // wait here until flow gets hospitalised
|
||||
}
|
||||
|
||||
@ -344,7 +397,7 @@ class VaultObserverExceptionTest {
|
||||
val aliceUser = User("user", "foo", setOf(Permissions.all()))
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser), startInSameProcess = false).getOrThrow()
|
||||
aliceNode.rpc.startFlow(::MakeServiceThrowErrorFlow).returnValue.getOrThrow()
|
||||
aliceNode.rpc.startFlow(::Initiator, "UnrecoverableError", CreateStateFlow.errorTargetsToNum(
|
||||
aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "UnrecoverableError", CreateStateFlow.errorTargetsToNum(
|
||||
CreateStateFlow.ErrorTarget.ServiceThrowUnrecoverableError))
|
||||
|
||||
val terminated = (aliceNode as OutOfProcess).process.waitFor(30, TimeUnit.SECONDS)
|
||||
@ -363,4 +416,459 @@ class VaultObserverExceptionTest {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An error is thrown inside of the [VaultService.rawUpdates] observable while recording a transaction inside of the initiating node.
|
||||
*
|
||||
* This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction
|
||||
* also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the
|
||||
* counterparty node.
|
||||
*
|
||||
* More importantly, the observer listening to the [VaultService.rawUpdates] observable should not unsubscribe.
|
||||
*
|
||||
* Check onNext is visited the correct number of times.
|
||||
*
|
||||
* This test causes 2 failures inside of the observer to ensure that the observer is still subscribed.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `Throw user error in VaultService rawUpdates during FinalityFlow blows up the flow but does not break the Observer - onNext check`() {
|
||||
var observationCounter = 0
|
||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter }
|
||||
|
||||
val rawUpdatesCount = ConcurrentHashMap<Party, Int>()
|
||||
DbListenerService.onNextVisited = { party ->
|
||||
if (rawUpdatesCount.putIfAbsent(party, 1) != null) {
|
||||
rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 }
|
||||
}
|
||||
}
|
||||
|
||||
val user = User("user", "foo", setOf(Permissions.all()))
|
||||
driver(DriverParameters(startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")
|
||||
),inMemoryDB = false)
|
||||
) {
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first()
|
||||
|
||||
val startErrorInObservableWhenConsumingState = {
|
||||
|
||||
val stateId = aliceNode.rpc.startFlow(
|
||||
CreateStateFlow::Initiator,
|
||||
"AllGood",
|
||||
errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed)
|
||||
).returnValue.getOrThrow(30.seconds)
|
||||
|
||||
println("Created new state")
|
||||
|
||||
val flowHandle = aliceNode.rpc.startFlow(
|
||||
SendStateFlow::PassErroneousOwnableState, // throws at consumed state -> should end up in hospital -> flow should hang
|
||||
stateId,
|
||||
errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError),
|
||||
bobNode.nodeInfo.legalIdentities.first()
|
||||
)
|
||||
|
||||
Assertions.assertThatExceptionOfType(TimeoutException::class.java)
|
||||
.isThrownBy { flowHandle.returnValue.getOrThrow(20.seconds) }
|
||||
|
||||
stateId
|
||||
}
|
||||
|
||||
assertEquals(0, notary.getNotarisedTransactionIds().size)
|
||||
|
||||
println("First set of flows")
|
||||
val stateId = startErrorInObservableWhenConsumingState()
|
||||
assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size)
|
||||
assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
|
||||
assertEquals(1, notary.getNotarisedTransactionIds().size)
|
||||
assertEquals(1, observationCounter)
|
||||
assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
|
||||
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
|
||||
|
||||
println("Second set of flows")
|
||||
val stateId2 = startErrorInObservableWhenConsumingState()
|
||||
assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size)
|
||||
assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
|
||||
assertEquals(2, notary.getNotarisedTransactionIds().size)
|
||||
assertEquals(2, observationCounter)
|
||||
assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
|
||||
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An error is thrown inside of the [VaultService.rawUpdates] observable while recording a transaction inside of the initiating node.
|
||||
*
|
||||
* This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction
|
||||
* also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the
|
||||
* counterparty node.
|
||||
*
|
||||
* More importantly, the observer listening to the [VaultService.rawUpdates] observable should not unsubscribe.
|
||||
*
|
||||
* Check onNext and onError are visited the correct number of times.
|
||||
*
|
||||
* This test causes 2 failures inside of the observer to ensure that the observer is still subscribed.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `Throw user error in VaultService rawUpdates during FinalityFlow blows up the flow but does not break the Observer - onNext and onError check`() {
|
||||
var observationCounter = 0
|
||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter }
|
||||
|
||||
val rawUpdatesCount = ConcurrentHashMap<Party, Int>()
|
||||
DbListenerService.onNextVisited = { party ->
|
||||
if (rawUpdatesCount.putIfAbsent(party, 1) != null) {
|
||||
rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 }
|
||||
}
|
||||
}
|
||||
|
||||
DbListenerService.onError = {/*just rethrow - we just want to check that onError gets visited by parties*/ throw it}
|
||||
DbListenerService.onErrorVisited = { party ->
|
||||
if (rawUpdatesCount.putIfAbsent(party, 1) != null) {
|
||||
rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 }
|
||||
}
|
||||
}
|
||||
|
||||
val user = User("user", "foo", setOf(Permissions.all()))
|
||||
driver(DriverParameters(startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")
|
||||
),
|
||||
inMemoryDB = false)
|
||||
) {
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first()
|
||||
|
||||
val startErrorInObservableWhenConsumingState = {
|
||||
|
||||
val stateId = aliceNode.rpc.startFlow(
|
||||
CreateStateFlow::Initiator,
|
||||
"AllGood",
|
||||
// should be a hospital exception
|
||||
errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed)
|
||||
).returnValue.getOrThrow(30.seconds)
|
||||
|
||||
val flowHandle = aliceNode.rpc.startFlow(
|
||||
SendStateFlow::PassErroneousOwnableState,
|
||||
stateId,
|
||||
errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError),
|
||||
bobNode.nodeInfo.legalIdentities.first()
|
||||
)
|
||||
|
||||
Assertions.assertThatExceptionOfType(TimeoutException::class.java)
|
||||
.isThrownBy { flowHandle.returnValue.getOrThrow(20.seconds) }
|
||||
|
||||
stateId
|
||||
}
|
||||
|
||||
assertEquals(0, notary.getNotarisedTransactionIds().size)
|
||||
|
||||
val stateId = startErrorInObservableWhenConsumingState()
|
||||
assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size)
|
||||
assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
|
||||
assertEquals(1, notary.getNotarisedTransactionIds().size)
|
||||
assertEquals(1, observationCounter)
|
||||
assertEquals(3, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
|
||||
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
|
||||
|
||||
val stateId2 = startErrorInObservableWhenConsumingState()
|
||||
assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size)
|
||||
assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
|
||||
assertEquals(2, notary.getNotarisedTransactionIds().size)
|
||||
assertEquals(2, observationCounter)
|
||||
assertEquals(6, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
|
||||
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An error is thrown inside of the [VaultService.rawUpdates] observable while recording a transaction inside of the counterparty node.
|
||||
*
|
||||
* This causes the transaction to not be saved on the local node but the notary still records the transaction as spent.
|
||||
* Observer events are recorded on both the initiating node and the counterparty node.
|
||||
*
|
||||
* More importantly, the observer listening to the [VaultService.rawUpdates] observable should not unsubscribe.
|
||||
*
|
||||
* This test causes 2 failures inside of the observer to ensure that the observer is still subscribed.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `Throw user error in VaultService rawUpdates during counterparty FinalityFlow blows up the flow but does not break the Observer`() {
|
||||
var observationCounter = 0
|
||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter }
|
||||
|
||||
val rawUpdatesCount = ConcurrentHashMap<Party, Int>()
|
||||
DbListenerService.onNextVisited = { party ->
|
||||
if (rawUpdatesCount.putIfAbsent(party, 1) != null) {
|
||||
rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 }
|
||||
}
|
||||
}
|
||||
|
||||
val user = User("user", "foo", setOf(Permissions.all()))
|
||||
driver(DriverParameters(startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")
|
||||
),
|
||||
inMemoryDB = false)
|
||||
) {
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first()
|
||||
|
||||
val startErrorInObservableWhenCreatingSecondState = {
|
||||
|
||||
val stateId = aliceNode.rpc.startFlow(
|
||||
CreateStateFlow::Initiator,
|
||||
"AllGood",
|
||||
errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError)
|
||||
).returnValue.getOrThrow(30.seconds)
|
||||
|
||||
aliceNode.rpc.startFlow(
|
||||
SendStateFlow::PassErroneousOwnableState,
|
||||
stateId,
|
||||
errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError),
|
||||
bobNode.nodeInfo.legalIdentities.first()
|
||||
).returnValue.getOrThrow(20.seconds)
|
||||
|
||||
stateId
|
||||
}
|
||||
|
||||
assertEquals(0, notary.getNotarisedTransactionIds().size)
|
||||
|
||||
val stateId = startErrorInObservableWhenCreatingSecondState()
|
||||
assertEquals(1, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size)
|
||||
assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
|
||||
assertEquals(1, notary.getNotarisedTransactionIds().size)
|
||||
assertEquals(1, observationCounter)
|
||||
assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
|
||||
assertEquals(1, rawUpdatesCount[bobNode.nodeInfo.singleIdentity()])
|
||||
|
||||
val stateId2 = startErrorInObservableWhenCreatingSecondState()
|
||||
assertEquals(1, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size)
|
||||
assertEquals(2, aliceNode.getAllStates(Vault.StateStatus.CONSUMED).size)
|
||||
assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
|
||||
assertEquals(2, notary.getNotarisedTransactionIds().size)
|
||||
assertEquals(2, observationCounter)
|
||||
assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
|
||||
assertEquals(2, rawUpdatesCount[bobNode.nodeInfo.singleIdentity()])
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An error is thrown inside of the [VaultService.updates] observable while recording a transaction inside of the initiating node.
|
||||
*
|
||||
* This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction
|
||||
* also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the
|
||||
* counterparty node.
|
||||
*
|
||||
* More importantly, the observer listening to the [VaultService.updates] observable should not unsubscribe.
|
||||
*
|
||||
* This test causes 2 failures inside of the [rx.Observer] to ensure that the Observer is still subscribed.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `Throw user error in VaultService rawUpdates during FinalityFlow blows up the flow but does not break the Observer`() {
|
||||
var observationCounter = 0
|
||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter }
|
||||
|
||||
val rawUpdatesCount = ConcurrentHashMap<Party, Int>()
|
||||
DbListenerService.onNextVisited = { party ->
|
||||
if (rawUpdatesCount.putIfAbsent(party, 1) != null) {
|
||||
rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 }
|
||||
}
|
||||
}
|
||||
|
||||
val user = User("user", "foo", setOf(Permissions.all()))
|
||||
driver(DriverParameters(startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")
|
||||
),
|
||||
inMemoryDB = false)
|
||||
) {
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first()
|
||||
|
||||
val startErrorInObservableWhenConsumingState = {
|
||||
|
||||
val stateId = aliceNode.rpc.startFlow(
|
||||
CreateStateFlow::Initiator,
|
||||
"AllGood",
|
||||
errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed)
|
||||
).returnValue.getOrThrow(30.seconds)
|
||||
|
||||
val flowHandle = aliceNode.rpc.startFlow(
|
||||
SendStateFlow::PassErroneousOwnableState,
|
||||
stateId,
|
||||
errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError),
|
||||
bobNode.nodeInfo.legalIdentities.first()
|
||||
)
|
||||
|
||||
Assertions.assertThatExceptionOfType(TimeoutException::class.java)
|
||||
.isThrownBy { flowHandle.returnValue.getOrThrow(20.seconds) }
|
||||
|
||||
stateId
|
||||
}
|
||||
|
||||
assertEquals(0, notary.getNotarisedTransactionIds().size)
|
||||
|
||||
val stateId = startErrorInObservableWhenConsumingState()
|
||||
assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size)
|
||||
assertEquals(1, aliceNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
|
||||
assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
|
||||
assertEquals(1, notary.getNotarisedTransactionIds().size)
|
||||
assertEquals(1, observationCounter)
|
||||
assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
|
||||
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
|
||||
|
||||
val stateId2 = startErrorInObservableWhenConsumingState()
|
||||
assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size)
|
||||
assertEquals(2, aliceNode.getAllStates(Vault.StateStatus.UNCONSUMED).size)
|
||||
assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
|
||||
assertEquals(2, notary.getNotarisedTransactionIds().size)
|
||||
assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
|
||||
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `Accessing NodeVaultService rawUpdates from a flow is not allowed` () {
|
||||
val user = User("user", "foo", setOf(Permissions.all()))
|
||||
driver(DriverParameters(startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")
|
||||
),
|
||||
inMemoryDB = false)
|
||||
) {
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
|
||||
val flowHandle = aliceNode.rpc.startFlow(::SubscribingRawUpdatesFlow)
|
||||
|
||||
assertFailsWith<CordaRuntimeException>(
|
||||
"Flow ${SubscribingRawUpdatesFlow::class.java.name} tried to access VaultService.rawUpdates " +
|
||||
"- Rx.Observables should only be accessed outside the context of a flow "
|
||||
) {
|
||||
flowHandle.returnValue.getOrThrow(30.seconds)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `Failing Observer wrapped with ResilientSubscriber will survive and be re-called upon flow retry`() {
|
||||
var onNextCount = 0
|
||||
var onErrorCount = 0
|
||||
DbListenerService.onNextVisited = { _ -> onNextCount++ }
|
||||
DbListenerService.onError = {/*just rethrow - we just want to check that onError gets visited by parties*/ throw it}
|
||||
DbListenerService.onErrorVisited = { _ -> onErrorCount++ }
|
||||
|
||||
val user = User("user", "foo", setOf(Permissions.all()))
|
||||
driver(DriverParameters(startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.transactionfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")),
|
||||
inMemoryDB = false)
|
||||
) {
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
|
||||
assertFailsWith<TimeoutException> {
|
||||
aliceNode.rpc.startFlow(
|
||||
ErrorHandling::CheckpointAfterErrorFlow,
|
||||
CreateStateFlow.errorTargetsToNum(
|
||||
CreateStateFlow.ErrorTarget.ServiceConstraintViolationException,
|
||||
CreateStateFlow.ErrorTarget.FlowSwallowErrors
|
||||
)
|
||||
).returnValue.getOrThrow(20.seconds)
|
||||
}
|
||||
|
||||
assertEquals(4, onNextCount)
|
||||
assertEquals(4, onErrorCount)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `Users may subscribe to NodeVaultService rawUpdates with their own custom SafeSubscribers`() {
|
||||
var onNextCount = 0
|
||||
DbListenerService.onNextVisited = { _ -> onNextCount++ }
|
||||
|
||||
val user = User("user", "foo", setOf(Permissions.all()))
|
||||
driver(DriverParameters(startNodesInProcess = true,
|
||||
cordappsForAllNodes = listOf(
|
||||
findCordapp("com.r3.dbfailure.contracts"),
|
||||
findCordapp("com.r3.dbfailure.workflows"),
|
||||
findCordapp("com.r3.transactionfailure.workflows"),
|
||||
findCordapp("com.r3.dbfailure.schemas")),
|
||||
inMemoryDB = false)
|
||||
) {
|
||||
// Subscribing with custom SafeSubscriber; the custom SafeSubscriber will not get replaced by a ResilientSubscriber
|
||||
// meaning that it will behave as a SafeSubscriber; it will get unsubscribed upon throwing an error.
|
||||
// Because we throw a ConstraintViolationException, the Rx Observer will get unsubscribed but the flow will retry
|
||||
// from previous checkpoint, however the Observer will no longer be there.
|
||||
DbListenerService.withCustomSafeSubscriber = true
|
||||
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
|
||||
|
||||
aliceNode.rpc.startFlow(
|
||||
ErrorHandling::CheckpointAfterErrorFlow,
|
||||
CreateStateFlow.errorTargetsToNum(
|
||||
CreateStateFlow.ErrorTarget.ServiceConstraintViolationException,
|
||||
CreateStateFlow.ErrorTarget.FlowSwallowErrors
|
||||
)
|
||||
).returnValue.getOrThrow(20.seconds)
|
||||
|
||||
assertEquals(1, onNextCount)
|
||||
}
|
||||
}
|
||||
|
||||
private fun NodeHandle.getNotarisedTransactionIds(): List<String> {
|
||||
|
||||
@StartableByRPC
|
||||
class NotarisedTxs : FlowLogic<List<String>>() {
|
||||
override fun call(): List<String> {
|
||||
val session = serviceHub.jdbcSession()
|
||||
val statement = session.createStatement()
|
||||
statement.execute("SELECT TRANSACTION_ID FROM NODE_NOTARY_COMMITTED_TXS;")
|
||||
val result = mutableListOf<String>()
|
||||
while (statement.resultSet.next()) {
|
||||
result.add(statement.resultSet.getString(1))
|
||||
}
|
||||
return result
|
||||
}
|
||||
}
|
||||
|
||||
return rpc.startFlowDynamic(NotarisedTxs::class.java).returnValue.getOrThrow()
|
||||
}
|
||||
|
||||
private fun NodeHandle.getStatesById(id: UniqueIdentifier?, status: Vault.StateStatus): List<StateAndRef<DbFailureContract.TestState>> {
|
||||
return rpc.vaultQueryByCriteria(
|
||||
QueryCriteria.LinearStateQueryCriteria(
|
||||
linearId = if (id != null) listOf(id) else null,
|
||||
status = status
|
||||
), DbFailureContract.TestState::class.java
|
||||
).states
|
||||
}
|
||||
|
||||
private fun NodeHandle.getAllStates(status: Vault.StateStatus): List<StateAndRef<DbFailureContract.TestState>> {
|
||||
return getStatesById(null, status)
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class SubscribingRawUpdatesFlow: FlowLogic<Unit>() {
|
||||
override fun call() {
|
||||
logger.info("Accessing rawUpdates within a flow will throw! ")
|
||||
val rawUpdates = serviceHub.vaultService.rawUpdates // throws
|
||||
logger.info("Code flow should never reach this logging or the following segment! ")
|
||||
rawUpdates.subscribe {
|
||||
println("Code flow should never get in here!")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -2,6 +2,7 @@ package net.corda.node.services.vault
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.containsAny
|
||||
@ -13,6 +14,7 @@ import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.node.services.Vault.ConstraintInfo.Companion.constraintInfo
|
||||
import net.corda.core.node.services.vault.*
|
||||
import net.corda.core.observable.internal.OnResilientSubscribe
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.transactions.*
|
||||
@ -209,7 +211,27 @@ class NodeVaultService(
|
||||
}
|
||||
|
||||
override val rawUpdates: Observable<Vault.Update<ContractState>>
|
||||
get() = mutex.locked { _rawUpdatesPublisher }
|
||||
get() = mutex.locked {
|
||||
FlowStateMachineImpl.currentStateMachine()?.let {
|
||||
// we are inside a flow; we cannot allow flows to subscribe Rx Observers,
|
||||
// because the Observer could reference flow's properties, essentially fiber's properties then,
|
||||
// since it does not unsubscribe on flow's/ fiber's completion,
|
||||
// it could prevent the flow/ fiber -object- get garbage collected.
|
||||
log.error(
|
||||
"Flow ${it.logic::class.java.name} tried to access VaultService.rawUpdates " +
|
||||
"- Rx.Observables should only be accessed outside the context of a flow " +
|
||||
"- aborting the flow "
|
||||
)
|
||||
|
||||
throw CordaRuntimeException(
|
||||
"Flow ${it.logic::class.java.name} tried to access VaultService.rawUpdates " +
|
||||
"- Rx.Observables should only be accessed outside the context of a flow "
|
||||
)
|
||||
}
|
||||
// we are not inside a flow, we are most likely inside a CordaService;
|
||||
// we will expose, by default, subscribing of -non unsubscribing- rx.Observers to rawUpdates.
|
||||
return _rawUpdatesPublisher.resilientOnError()
|
||||
}
|
||||
|
||||
override val updates: Observable<Vault.Update<ContractState>>
|
||||
get() = mutex.locked { _updatesInDbTx }
|
||||
@ -414,7 +436,7 @@ class NodeVaultService(
|
||||
HospitalizeFlowException(wrapped)
|
||||
}
|
||||
}
|
||||
} ?: HospitalizeFlowException(e)
|
||||
} ?: (e as? SQLException ?: (e as? PersistenceException ?: HospitalizeFlowException(e)))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -795,4 +817,7 @@ class NodeVaultService(
|
||||
}
|
||||
return myTypes
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** The Observable returned allows subscribing with custom SafeSubscribers to source [Observable]. */
|
||||
internal fun<T> Observable<T>.resilientOnError(): Observable<T> = Observable.unsafeCreate(OnResilientSubscribe(this, false))
|
@ -3,6 +3,10 @@ package net.corda.node.utilities
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import net.corda.core.internal.bufferUntilSubscribed
|
||||
import net.corda.core.internal.tee
|
||||
import net.corda.core.observable.internal.ResilientSubscriber
|
||||
import net.corda.core.observable.internal.OnNextFailedException
|
||||
import net.corda.core.observable.continueOnError
|
||||
import net.corda.node.services.vault.resilientOnError
|
||||
import net.corda.nodeapi.internal.persistence.*
|
||||
import net.corda.testing.internal.configureDatabase
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
@ -10,9 +14,17 @@ import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Test
|
||||
import rx.Observable
|
||||
import rx.Subscriber
|
||||
import rx.exceptions.CompositeException
|
||||
import rx.exceptions.OnErrorFailedException
|
||||
import rx.exceptions.OnErrorNotImplementedException
|
||||
import rx.internal.util.ActionSubscriber
|
||||
import rx.observers.SafeSubscriber
|
||||
import rx.observers.Subscribers
|
||||
import rx.subjects.PublishSubject
|
||||
import java.io.Closeable
|
||||
import java.lang.IllegalArgumentException
|
||||
import java.lang.IllegalStateException
|
||||
import java.lang.RuntimeException
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
@ -194,7 +206,7 @@ class ObservablesTests {
|
||||
* tee combines [PublishSubject]s under one PublishSubject. We need to make sure that they are not wrapped with a [SafeSubscriber].
|
||||
* Otherwise, if a non Rx exception gets thrown from a subscriber under one of the PublishSubject it will get caught by the
|
||||
* SafeSubscriber wrapping that PublishSubject and will call [PublishSubject.PublishSubjectState.onError], which will
|
||||
* eventually shut down all of the subscribers under that PublishSubjectState.
|
||||
* eventually shut down all of the subscribers under that PublishSubject.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `error in unsafe subscriber won't shutdown subscribers under same publish subject, after tee`() {
|
||||
@ -214,6 +226,200 @@ class ObservablesTests {
|
||||
assertEquals(2, count)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `continueOnError subscribes ResilientSubscribers, wrapped Observers will survive errors from onNext`() {
|
||||
var heartBeat1 = 0
|
||||
var heartBeat2 = 0
|
||||
val source = PublishSubject.create<Int>()
|
||||
val continueOnError = source.continueOnError()
|
||||
continueOnError.subscribe { runNo ->
|
||||
// subscribes with a ResilientSubscriber
|
||||
heartBeat1++
|
||||
if (runNo == 1) {
|
||||
throw IllegalStateException()
|
||||
}
|
||||
}
|
||||
continueOnError.subscribe { runNo ->
|
||||
// subscribes with a ResilientSubscriber
|
||||
heartBeat2++
|
||||
if (runNo == 2) {
|
||||
throw IllegalStateException()
|
||||
}
|
||||
}
|
||||
|
||||
assertFailsWith<OnErrorNotImplementedException> {
|
||||
source.onNext(1) // first observer only will run and throw
|
||||
}
|
||||
assertFailsWith<OnErrorNotImplementedException> {
|
||||
source.onNext(2) // first observer will run, second observer will run and throw
|
||||
}
|
||||
source.onNext(3) // both observers will run
|
||||
assertEquals(3, heartBeat1)
|
||||
assertEquals(2, heartBeat2)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `PublishSubject unsubscribes ResilientSubscribers only upon explicitly calling onError`() {
|
||||
var heartBeat = 0
|
||||
val source = PublishSubject.create<Int>()
|
||||
source.continueOnError().subscribe { heartBeat += it }
|
||||
source.continueOnError().subscribe { heartBeat += it }
|
||||
source.onNext(1)
|
||||
// send an onError event
|
||||
assertFailsWith<CompositeException> {
|
||||
source.onError(IllegalStateException()) // all ResilientSubscribers under PublishSubject get unsubscribed here
|
||||
}
|
||||
source.onNext(1)
|
||||
assertEquals(2, heartBeat)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `PublishSubject wrapped with a SafeSubscriber shuts down the whole structure, if one of them is unsafe and it throws`() {
|
||||
var heartBeat = 0
|
||||
val source = PublishSubject.create<Int>()
|
||||
source.unsafeSubscribe(Subscribers.create { runNo -> // subscribes unsafe; It does not wrap with ResilientSubscriber
|
||||
heartBeat++
|
||||
if (runNo == 1) {
|
||||
throw IllegalStateException()
|
||||
}
|
||||
})
|
||||
source.continueOnError().subscribe { heartBeat += it }
|
||||
// wrapping PublishSubject with a SafeSubscriber
|
||||
val sourceWrapper = SafeSubscriber(Subscribers.from(source))
|
||||
assertFailsWith<OnErrorFailedException> {
|
||||
sourceWrapper.onNext(1)
|
||||
}
|
||||
sourceWrapper.onNext(2)
|
||||
assertEquals(1, heartBeat)
|
||||
}
|
||||
|
||||
/**
|
||||
* A [ResilientSubscriber] that is NOT a leaf in a subscribers structure will not call [onError]
|
||||
* if an error occurs during its [onNext] event processing.
|
||||
*
|
||||
* The reason why it should not call its onError is: if it wraps a [PublishSubject], calling [ResilientSubscriber.onError]
|
||||
* will then call [PublishSubject.onError] which will shut down all the subscribers under the [PublishSubject].
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `PublishSubject wrapped with a ResilientSubscriber will preserve the structure, if one of its children subscribers is unsafe and it throws`() {
|
||||
var heartBeat = 0
|
||||
val source = PublishSubject.create<Int>()
|
||||
source.unsafeSubscribe(Subscribers.create { runNo ->
|
||||
heartBeat++
|
||||
if (runNo == 1) {
|
||||
throw IllegalStateException()
|
||||
}
|
||||
})
|
||||
source.continueOnError().subscribe { heartBeat++ }
|
||||
// wrap PublishSubject with a ResilientSubscriber
|
||||
val sourceWrapper = ResilientSubscriber(Subscribers.from(source))
|
||||
assertFailsWith<OnNextFailedException>("Observer.onNext failed, this is a non leaf ResilientSubscriber, therefore onError will be skipped") {
|
||||
sourceWrapper.onNext(1)
|
||||
}
|
||||
sourceWrapper.onNext(2)
|
||||
assertEquals(3, heartBeat)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `throwing inside onNext of a ResilientSubscriber leaf subscriber will call onError`() {
|
||||
var heartBeatOnNext = 0
|
||||
var heartBeatOnError = 0
|
||||
val source = PublishSubject.create<Int>()
|
||||
// add a leaf ResilientSubscriber
|
||||
source.continueOnError().subscribe({
|
||||
heartBeatOnNext++
|
||||
throw IllegalStateException()
|
||||
}, {
|
||||
heartBeatOnError++
|
||||
})
|
||||
|
||||
source.onNext(1)
|
||||
source.onNext(1)
|
||||
assertEquals(2, heartBeatOnNext)
|
||||
assertEquals(2, heartBeatOnError)
|
||||
}
|
||||
|
||||
/**
|
||||
* In this test ResilientSubscriber throws an OnNextFailedException which is a OnErrorNotImplementedException.
|
||||
* Because its underlying subscriber is not an ActionSubscriber, it will not be considered as a leaf ResilientSubscriber.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `throwing ResilientSubscriber at onNext will wrap with a Rx OnErrorNotImplementedException`() {
|
||||
val resilientSubscriber = ResilientSubscriber<Int>(Subscribers.create { throw IllegalStateException() })
|
||||
assertFailsWith<OnErrorNotImplementedException> { // actually fails with an OnNextFailedException
|
||||
resilientSubscriber.onNext(1)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `throwing inside ResilientSubscriber onError will wrap with a Rx OnErrorFailedException`() {
|
||||
val resilientSubscriber = ResilientSubscriber<Int>(
|
||||
ActionSubscriber(
|
||||
{ throw IllegalStateException() },
|
||||
{ throw IllegalStateException() },
|
||||
null
|
||||
)
|
||||
)
|
||||
assertFailsWith<OnErrorFailedException> {
|
||||
resilientSubscriber.onNext(1)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* In this test we create a chain of Subscribers with this the following order:
|
||||
* ResilientSubscriber_X -> PublishSubject -> ResilientSubscriber_Y
|
||||
*
|
||||
* ResilientSubscriber_Y.onNext throws an error, since ResilientSubscriber_Y.onError is not defined,
|
||||
* it will throw a OnErrorNotImplementedException. Then it will be propagated back until ResilientSubscriber_X.
|
||||
* ResilientSubscriber_X will identify it is a not leaf subscriber and therefore will rethrow it as OnNextFailedException.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `propagated Rx exception will be rethrown at ResilientSubscriber onError`() {
|
||||
val source = PublishSubject.create<Int>()
|
||||
source.continueOnError().subscribe { throw IllegalStateException("123") } // will give a leaf ResilientSubscriber
|
||||
val sourceWrapper = ResilientSubscriber(Subscribers.from(source)) // will give an inner ResilientSubscriber
|
||||
|
||||
assertFailsWith<OnNextFailedException>("Observer.onNext failed, this is a non leaf ResilientSubscriber, therefore onError will be skipped") {
|
||||
// IllegalStateException will be wrapped and rethrown as a OnErrorNotImplementedException in leaf ResilientSubscriber,
|
||||
// will be caught by inner ResilientSubscriber and just be rethrown
|
||||
sourceWrapper.onNext(1)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `test OnResilientSubscribe strictMode = true replaces SafeSubscriber subclass`() {
|
||||
var heartBeat = 0
|
||||
val customSafeSubscriber = CustomSafeSubscriber(
|
||||
Subscribers.create<Int> {
|
||||
heartBeat++
|
||||
throw IllegalArgumentException()
|
||||
})
|
||||
|
||||
val source = PublishSubject.create<Int>()
|
||||
source.continueOnError().subscribe(customSafeSubscriber) // it should replace CustomSafeSubscriber with ResilientSubscriber
|
||||
|
||||
assertFailsWith<OnErrorNotImplementedException> { source.onNext(1) }
|
||||
assertFailsWith<OnErrorNotImplementedException> { source.onNext(1) }
|
||||
assertEquals(2, heartBeat)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `test OnResilientSubscribe strictMode = false will not replace SafeSubscriber subclass`() {
|
||||
var heartBeat = 0
|
||||
val customSafeSubscriber = CustomSafeSubscriber(
|
||||
Subscribers.create<Int> {
|
||||
heartBeat++
|
||||
throw IllegalArgumentException()
|
||||
})
|
||||
|
||||
val source = PublishSubject.create<Int>()
|
||||
source.resilientOnError().subscribe(customSafeSubscriber) // it should not replace CustomSafeSubscriber with ResilientSubscriber
|
||||
|
||||
assertFailsWith<OnErrorNotImplementedException> { source.onNext(1) }
|
||||
source.onNext(1)
|
||||
assertEquals(1, heartBeat)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `combine tee and bufferUntilDatabaseCommit`() {
|
||||
val database = createDatabase()
|
||||
@ -359,4 +565,6 @@ class ObservablesTests {
|
||||
|
||||
subscription3.unsubscribe()
|
||||
}
|
||||
|
||||
class CustomSafeSubscriber<T>(actual: Subscriber<in T>): SafeSubscriber<T>(actual)
|
||||
}
|
@ -1,12 +1,13 @@
|
||||
package com.r3.dbfailure.contracts
|
||||
|
||||
import com.r3.dbfailure.schemas.DbFailureSchemaV1
|
||||
import net.corda.core.contracts.CommandAndState
|
||||
import net.corda.core.contracts.CommandData
|
||||
import net.corda.core.contracts.Contract
|
||||
import net.corda.core.contracts.LinearState
|
||||
import net.corda.core.contracts.OwnableState
|
||||
import net.corda.core.contracts.UniqueIdentifier
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.schemas.PersistentState
|
||||
import net.corda.core.schemas.QueryableState
|
||||
@ -21,23 +22,31 @@ class DbFailureContract : Contract {
|
||||
|
||||
class TestState(
|
||||
override val linearId: UniqueIdentifier,
|
||||
val particpant: Party,
|
||||
override val participants: List<AbstractParty>,
|
||||
val randomValue: String?,
|
||||
val errorTarget: Int = 0
|
||||
) : LinearState, QueryableState {
|
||||
val errorTarget: Int = 0,
|
||||
override val owner: AbstractParty
|
||||
) : LinearState, QueryableState, OwnableState {
|
||||
|
||||
override val participants: List<AbstractParty> = listOf(particpant)
|
||||
|
||||
override fun supportedSchemas(): Iterable<MappedSchema> = listOf(DbFailureSchemaV1)
|
||||
|
||||
override fun generateMappedObject(schema: MappedSchema): PersistentState {
|
||||
return if (schema is DbFailureSchemaV1){
|
||||
DbFailureSchemaV1.PersistentTestState( particpant.name.toString(), randomValue, errorTarget, linearId.id)
|
||||
DbFailureSchemaV1.PersistentTestState( participants.toString(), randomValue, errorTarget, linearId.id)
|
||||
}
|
||||
else {
|
||||
throw IllegalArgumentException("Unsupported schema $schema")
|
||||
}
|
||||
}
|
||||
|
||||
override fun withNewOwner(newOwner: AbstractParty): CommandAndState {
|
||||
return CommandAndState(Commands.Send(), TestState(this.linearId, this.participants.plus(newOwner).toSet().toList(), this.randomValue, this.errorTarget, newOwner))
|
||||
}
|
||||
|
||||
fun withNewOwnerAndErrorTarget(newOwner: AbstractParty, errorTarget: Int): CommandAndState {
|
||||
return CommandAndState(Commands.Send(), TestState(this.linearId, this.participants.plus(newOwner).toSet().toList(), this.randomValue, errorTarget, newOwner))
|
||||
}
|
||||
}
|
||||
|
||||
override fun verify(tx: LedgerTransaction) {
|
||||
@ -46,5 +55,6 @@ class DbFailureContract : Contract {
|
||||
|
||||
interface Commands : CommandData{
|
||||
class Create: Commands
|
||||
class Send : Commands
|
||||
}
|
||||
}
|
@ -20,14 +20,16 @@ object CreateStateFlow {
|
||||
// 1000s control exception handlling in the service/vault listener
|
||||
enum class ErrorTarget(val targetNumber: Int) {
|
||||
NoError(0),
|
||||
ServiceSqlSyntaxError(1),
|
||||
ServiceNullConstraintViolation(2),
|
||||
ServiceValidUpdate(3),
|
||||
ServiceReadState(4),
|
||||
ServiceCheckForState(5),
|
||||
ServiceThrowInvalidParameter(6),
|
||||
ServiceThrowMotherOfAllExceptions(7),
|
||||
ServiceThrowUnrecoverableError(8),
|
||||
ServiceSqlSyntaxError(10000),
|
||||
ServiceNullConstraintViolation(20000),
|
||||
ServiceValidUpdate(30000),
|
||||
ServiceReadState(40000),
|
||||
ServiceCheckForState(50000),
|
||||
ServiceThrowInvalidParameter(60000),
|
||||
ServiceThrowMotherOfAllExceptions(70000),
|
||||
ServiceThrowUnrecoverableError(80000),
|
||||
ServiceSqlSyntaxErrorOnConsumed(90000),
|
||||
ServiceConstraintViolationException(1000000),
|
||||
TxInvalidState(10),
|
||||
FlowSwallowErrors(100),
|
||||
ServiceSwallowErrors(1000)
|
||||
@ -40,7 +42,7 @@ object CreateStateFlow {
|
||||
private val targetMap = ErrorTarget.values().associateBy(ErrorTarget::targetNumber)
|
||||
|
||||
fun getServiceTarget(target: Int?): ErrorTarget {
|
||||
return target?.let { targetMap.getValue(it % 10) } ?: CreateStateFlow.ErrorTarget.NoError
|
||||
return target?.let { targetMap.getValue(((it/10000) % 1000)*10000) } ?: CreateStateFlow.ErrorTarget.NoError
|
||||
}
|
||||
|
||||
fun getServiceExceptionHandlingTarget(target: Int?): ErrorTarget {
|
||||
@ -69,10 +71,11 @@ object CreateStateFlow {
|
||||
val txTarget = getTxTarget(errorTarget)
|
||||
logger.info("Test flow: The tx error target is $txTarget")
|
||||
val state = DbFailureContract.TestState(
|
||||
UniqueIdentifier(),
|
||||
ourIdentity,
|
||||
if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else randomValue,
|
||||
errorTarget)
|
||||
UniqueIdentifier(),
|
||||
listOf(ourIdentity),
|
||||
if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else randomValue,
|
||||
errorTarget, ourIdentity
|
||||
)
|
||||
val txCommand = Command(DbFailureContract.Commands.Create(), ourIdentity.owningKey)
|
||||
|
||||
logger.info("Test flow: tx builder")
|
||||
|
@ -4,33 +4,145 @@ import com.r3.dbfailure.contracts.DbFailureContract
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.AppServiceHub
|
||||
import net.corda.core.node.services.CordaService
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import org.hibernate.exception.ConstraintViolationException
|
||||
import rx.Subscriber
|
||||
import rx.observers.SafeSubscriber
|
||||
import rx.observers.Subscribers
|
||||
import java.lang.IllegalStateException
|
||||
import java.security.InvalidParameterException
|
||||
import java.sql.SQLException
|
||||
|
||||
@CordaService
|
||||
class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
|
||||
|
||||
companion object {
|
||||
val log = contextLogger()
|
||||
|
||||
var onError: ((Throwable) -> Unit)? = null
|
||||
|
||||
// make the service throw an unrecoverable error (should be executed in an outOfProcess node so that it wont halt testing jvm)
|
||||
var throwUnrecoverableError = false
|
||||
|
||||
var safeSubscription = true
|
||||
var withCustomSafeSubscriber = false
|
||||
|
||||
var onNextVisited: (Party) -> Unit = {}
|
||||
var onErrorVisited: ((Party) -> Unit)? = null
|
||||
}
|
||||
|
||||
init {
|
||||
val onNext: (Vault.Update<ContractState>) -> Unit =
|
||||
{ (_, produced) ->
|
||||
produced.forEach {
|
||||
val contractState = it.state.data as? DbFailureContract.TestState
|
||||
@Suppress("TooGenericExceptionCaught") // this is fully intentional here, to allow twiddling with exceptions
|
||||
try {
|
||||
{ (consumed, produced) ->
|
||||
|
||||
onNextVisited(services.myInfo.legalIdentities.first())
|
||||
|
||||
produced.forEach {
|
||||
val contractState = it.state.data as? DbFailureContract.TestState
|
||||
@Suppress("TooGenericExceptionCaught") // this is fully intentional here, to allow twiddling with exceptions
|
||||
try {
|
||||
when (CreateStateFlow.getServiceTarget(contractState?.errorTarget)) {
|
||||
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError -> {
|
||||
log.info("Fail with syntax error on raw statement")
|
||||
val session = services.jdbcSession()
|
||||
val statement = session.createStatement()
|
||||
statement.execute(
|
||||
"UPDATE FAIL_TEST_STATES \n" +
|
||||
"BLAAA RANDOM_VALUE = NULL\n" +
|
||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||
)
|
||||
log.info("SQL result: ${statement.resultSet}")
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceNullConstraintViolation -> {
|
||||
log.info("Fail with null constraint violation on raw statement")
|
||||
val session = services.jdbcSession()
|
||||
val statement = session.createStatement()
|
||||
statement.execute(
|
||||
"UPDATE FAIL_TEST_STATES \n" +
|
||||
"SET RANDOM_VALUE = NULL\n" +
|
||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||
)
|
||||
log.info("SQL result: ${statement.resultSet}")
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceValidUpdate -> {
|
||||
log.info("Update current statement")
|
||||
val session = services.jdbcSession()
|
||||
val statement = session.createStatement()
|
||||
statement.execute(
|
||||
"UPDATE FAIL_TEST_STATES \n" +
|
||||
"SET RANDOM_VALUE = '${contractState!!.randomValue} Updated by service'\n" +
|
||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||
)
|
||||
log.info("SQL result: ${statement.resultSet}")
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceReadState -> {
|
||||
log.info("Read current state from db")
|
||||
val session = services.jdbcSession()
|
||||
val statement = session.createStatement()
|
||||
statement.execute(
|
||||
"SELECT * FROM FAIL_TEST_STATES \n" +
|
||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||
)
|
||||
log.info("SQL result: ${statement.resultSet}")
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceCheckForState -> {
|
||||
log.info("Check for currently written state in the db")
|
||||
val session = services.jdbcSession()
|
||||
val statement = session.createStatement()
|
||||
val rs = statement.executeQuery(
|
||||
"SELECT COUNT(*) FROM FAIL_TEST_STATES \n" +
|
||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||
)
|
||||
val numOfRows = if (rs.next()) rs.getInt("COUNT(*)") else 0
|
||||
log.info(
|
||||
"Found a state with tx:ind ${it.ref.txhash}:${it.ref.index} in " +
|
||||
"TEST_FAIL_STATES: ${if (numOfRows > 0) "Yes" else "No"}"
|
||||
)
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter -> {
|
||||
log.info("Throw InvalidParameterException")
|
||||
throw InvalidParameterException("Toys out of pram")
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions -> {
|
||||
log.info("Throw Exception")
|
||||
throw Exception("Mother of all exceptions")
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceThrowUnrecoverableError -> {
|
||||
// this bit of code should only work in a OutOfProcess node,
|
||||
// otherwise it will kill the testing jvm (including the testing thread)
|
||||
if (throwUnrecoverableError) {
|
||||
log.info("Throw Unrecoverable error")
|
||||
throw OutOfMemoryError("Unrecoverable error")
|
||||
}
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceConstraintViolationException -> {
|
||||
log.info("Throw ConstraintViolationException")
|
||||
throw ConstraintViolationException("Dummy Hibernate Exception ", SQLException(), " Will cause flow retry!")
|
||||
}
|
||||
else -> {
|
||||
// do nothing, everything else must be handled elsewhere
|
||||
}
|
||||
}
|
||||
} catch (t: Throwable) {
|
||||
if (CreateStateFlow.getServiceExceptionHandlingTarget(contractState?.errorTarget)
|
||||
== CreateStateFlow.ErrorTarget.ServiceSwallowErrors
|
||||
) {
|
||||
log.warn("Service not letting errors escape", t)
|
||||
} else {
|
||||
throw t
|
||||
}
|
||||
}
|
||||
}
|
||||
consumed.forEach {
|
||||
val contractState = it.state.data as? DbFailureContract.TestState
|
||||
log.info("Test Service: Got state ${if (contractState == null) "null" else " test state with error target ${contractState.errorTarget}"}")
|
||||
when (CreateStateFlow.getServiceTarget(contractState?.errorTarget)) {
|
||||
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError -> {
|
||||
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed -> {
|
||||
log.info("Fail with syntax error on raw statement")
|
||||
val session = services.jdbcSession()
|
||||
val statement = session.createStatement()
|
||||
@ -41,85 +153,33 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
|
||||
)
|
||||
log.info("SQL result: ${statement.resultSet}")
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceNullConstraintViolation -> {
|
||||
log.info("Fail with null constraint violation on raw statement")
|
||||
val session = services.jdbcSession()
|
||||
val statement = session.createStatement()
|
||||
statement.execute(
|
||||
"UPDATE FAIL_TEST_STATES \n" +
|
||||
"SET RANDOM_VALUE = NULL\n" +
|
||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||
)
|
||||
log.info("SQL result: ${statement.resultSet}")
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceValidUpdate -> {
|
||||
log.info("Update current statement")
|
||||
val session = services.jdbcSession()
|
||||
val statement = session.createStatement()
|
||||
statement.execute(
|
||||
"UPDATE FAIL_TEST_STATES \n" +
|
||||
"SET RANDOM_VALUE = '${contractState!!.randomValue} Updated by service'\n" +
|
||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||
)
|
||||
log.info("SQL result: ${statement.resultSet}")
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceReadState -> {
|
||||
log.info("Read current state from db")
|
||||
val session = services.jdbcSession()
|
||||
val statement = session.createStatement()
|
||||
statement.execute(
|
||||
"SELECT * FROM FAIL_TEST_STATES \n" +
|
||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||
)
|
||||
log.info("SQL result: ${statement.resultSet}")
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceCheckForState -> {
|
||||
log.info("Check for currently written state in the db")
|
||||
val session = services.jdbcSession()
|
||||
val statement = session.createStatement()
|
||||
val rs = statement.executeQuery(
|
||||
"SELECT COUNT(*) FROM FAIL_TEST_STATES \n" +
|
||||
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
|
||||
)
|
||||
val numOfRows = if (rs.next()) rs.getInt("COUNT(*)") else 0
|
||||
log.info("Found a state with tx:ind ${it.ref.txhash}:${it.ref.index} in " +
|
||||
"TEST_FAIL_STATES: ${if (numOfRows > 0) "Yes" else "No"}")
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter -> {
|
||||
log.info("Throw InvalidParameterException")
|
||||
throw InvalidParameterException("Toys out of pram")
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions -> {
|
||||
log.info("Throw Exception")
|
||||
throw Exception("Mother of all exceptions")
|
||||
}
|
||||
CreateStateFlow.ErrorTarget.ServiceThrowUnrecoverableError -> {
|
||||
// this bit of code should only work in a OutOfProcess node,
|
||||
// otherwise it will kill the testing jvm (including the testing thread)
|
||||
if (throwUnrecoverableError) {
|
||||
log.info("Throw Unrecoverable error")
|
||||
throw OutOfMemoryError("Unrecoverable error")
|
||||
}
|
||||
}
|
||||
else -> {
|
||||
// do nothing, everything else must be handled elsewhere
|
||||
}
|
||||
}
|
||||
} catch (t: Throwable) {
|
||||
if (CreateStateFlow.getServiceExceptionHandlingTarget(contractState?.errorTarget)
|
||||
== CreateStateFlow.ErrorTarget.ServiceSwallowErrors) {
|
||||
log.warn("Service not letting errors escape", t)
|
||||
} else {
|
||||
throw t
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (onError != null) {
|
||||
services.vaultService.rawUpdates.subscribe(onNext, onError) // onError is defined
|
||||
val onErrorWrapper: ((Throwable) -> Unit)? = {
|
||||
onErrorVisited?.let {
|
||||
it(services.myInfo.legalIdentities.first())
|
||||
}
|
||||
onError!!(it)
|
||||
}
|
||||
services.vaultService.rawUpdates.subscribe(onNext, onErrorWrapper) // onError is defined
|
||||
} else if (onErrorVisited != null) {
|
||||
throw IllegalStateException("A DbListenerService.onError needs to be defined!")
|
||||
} else {
|
||||
services.vaultService.rawUpdates.subscribe(onNext)
|
||||
if (safeSubscription) {
|
||||
if (withCustomSafeSubscriber) {
|
||||
services.vaultService.rawUpdates.subscribe(CustomSafeSubscriber(Subscribers.create(onNext)))
|
||||
} else {
|
||||
services.vaultService.rawUpdates.subscribe(onNext)
|
||||
}
|
||||
} else {
|
||||
services.vaultService.rawUpdates.unsafeSubscribe(Subscribers.create(onNext))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -130,4 +190,6 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
|
||||
throwUnrecoverableError = true
|
||||
}
|
||||
}
|
||||
|
||||
class CustomSafeSubscriber<T>(actual: Subscriber<in T>): SafeSubscriber<T>(actual)
|
||||
}
|
@ -0,0 +1,88 @@
|
||||
package com.r3.dbfailure.workflows
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.r3.dbfailure.contracts.DbFailureContract
|
||||
import net.corda.core.contracts.UniqueIdentifier
|
||||
import net.corda.core.flows.FinalityFlow
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.ReceiveFinalityFlow
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.vault.QueryCriteria
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.unwrap
|
||||
|
||||
object SendStateFlow {
|
||||
|
||||
/**
|
||||
* Creates a [DbFailureContract.TestState], signs it, collects a signature from a separate node and then calls [FinalityFlow] flow.
|
||||
* Can throw in various stages
|
||||
*/
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
class PassErroneousOwnableState(private val stateId: UniqueIdentifier, private val errorTarget: Int, private val counterParty: Party) :
|
||||
FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
logger.info("Test flow: starting")
|
||||
val notary = serviceHub.networkMapCache.notaryIdentities[0]
|
||||
logger.info("Test flow: create counterparty session")
|
||||
val recipientSession = initiateFlow(counterParty)
|
||||
|
||||
val queryCriteria = QueryCriteria.LinearStateQueryCriteria(linearId = listOf(stateId), status = Vault.StateStatus.UNCONSUMED)
|
||||
val inputState = serviceHub.vaultService.queryBy(DbFailureContract.TestState::class.java, queryCriteria).states.singleOrNull()
|
||||
?: throw FlowException("Failed to find single state for linear id $stateId")
|
||||
|
||||
logger.info("Test flow: tx builder")
|
||||
val commandAndState = inputState.state.data.withNewOwnerAndErrorTarget(counterParty, errorTarget)
|
||||
val txBuilder = TransactionBuilder(notary)
|
||||
.addInputState(inputState)
|
||||
.addOutputState(commandAndState.ownableState)
|
||||
.addCommand(commandAndState.command, listOf(ourIdentity.owningKey, counterParty.owningKey))
|
||||
|
||||
|
||||
logger.info("Test flow: verify")
|
||||
txBuilder.verify(serviceHub)
|
||||
|
||||
val signedTx = serviceHub.signInitialTransaction(txBuilder)
|
||||
|
||||
logger.info("Test flow: send for counterparty signing")
|
||||
recipientSession.send(signedTx)
|
||||
logger.info("Test flow: Waiting to receive counter signed transaction")
|
||||
val counterSignedTx = recipientSession.receive<SignedTransaction>().unwrap { it }
|
||||
logger.info("Test flow: Received counter sigend transaction, invoking finality")
|
||||
subFlow(FinalityFlow(counterSignedTx, recipientSession))
|
||||
|
||||
logger.info("Test flow: Finishing")
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(PassErroneousOwnableState::class)
|
||||
class PassErroneousOwnableStateReceiver(private val otherSide: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
logger.info("Test flow counterparty: starting")
|
||||
val signedTx = otherSide.receive<SignedTransaction>().unwrap { it }
|
||||
logger.info("Test flow counterparty: received TX, signing")
|
||||
val counterSignedTx = serviceHub.addSignature(signedTx)
|
||||
logger.info("Test flow counterparty: calling hookBeforeCounterPartyAnswers")
|
||||
logger.info("Test flow counterparty: Answer with countersigned transaction")
|
||||
otherSide.send(counterSignedTx)
|
||||
logger.info("Test flow counterparty: calling hookAfterCounterPartyAnswers")
|
||||
// Not ideal that we have to do this check, but we must as FinalityFlow does not send locally
|
||||
if (!serviceHub.myInfo.isLegalIdentity(otherSide.counterparty)) {
|
||||
logger.info("Test flow counterparty: Waiting for finality")
|
||||
subFlow(ReceiveFinalityFlow(otherSide))
|
||||
}
|
||||
logger.info("Test flow counterparty: Finishing")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -30,9 +30,10 @@ object ErrorHandling {
|
||||
val txTarget = CreateStateFlow.getTxTarget(errorTarget)
|
||||
val state = DbFailureContract.TestState(
|
||||
UniqueIdentifier(),
|
||||
ourIdentity,
|
||||
listOf(ourIdentity),
|
||||
if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else "valid hibernate value",
|
||||
errorTarget)
|
||||
errorTarget,
|
||||
ourIdentity)
|
||||
val txCommand = Command(DbFailureContract.Commands.Create(), ourIdentity.owningKey)
|
||||
val txBuilder = TransactionBuilder(notary).addOutputState(state).addCommand(txCommand)
|
||||
val signedTx = serviceHub.signInitialTransaction(txBuilder)
|
||||
@ -50,5 +51,4 @@ object ErrorHandling {
|
||||
hookAfterSecondCheckpoint.invoke() // should be never executed
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user