diff --git a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt index fcfd9a5986..3d170290b5 100644 --- a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt +++ b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt @@ -192,6 +192,9 @@ fun Observable.bufferUntilSubscribed(): Observable { @DeleteForDJVM fun Observer.tee(vararg teeTo: Observer): Observer { val subject = PublishSubject.create() + // use unsafe subscribe, so that the teed subscribers will not get wrapped with SafeSubscribers, + // therefore a potential raw exception (non Rx) coming from a child -unsafe subscribed- observer + // will not unsubscribe all of the subscribers under the PublishSubject. subject.unsafeSubscribe(Subscribers.from(this)) teeTo.forEach { subject.unsafeSubscribe(Subscribers.from(it)) } return subject diff --git a/core/src/main/kotlin/net/corda/core/observable/Observables.kt b/core/src/main/kotlin/net/corda/core/observable/Observables.kt new file mode 100644 index 0000000000..c299ab8401 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/observable/Observables.kt @@ -0,0 +1,13 @@ +@file:JvmName("Observables") +package net.corda.core.observable + +import net.corda.core.observable.internal.OnResilientSubscribe +import rx.Observable + +/** + * [Observable.continueOnError] is used to return an Observable, through which we can subscribe non unsubscribing [rx.Observer]s + * to the source [Observable]. Namely, it makes the [rx.Observer]s resilient to exceptions coming out of [rx.Observer.onNext]. + * + * [Observable.continueOnError] should be called before every subscribe to have the aforementioned effect. + */ +fun Observable.continueOnError(): Observable = Observable.unsafeCreate(OnResilientSubscribe(this, true)) \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/observable/internal/ResilientSubscriber.kt b/core/src/main/kotlin/net/corda/core/observable/internal/ResilientSubscriber.kt new file mode 100644 index 0000000000..074a17a719 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/observable/internal/ResilientSubscriber.kt @@ -0,0 +1,120 @@ +package net.corda.core.observable.internal + +import net.corda.core.internal.VisibleForTesting +import rx.Observable +import rx.Observer +import rx.Subscriber +import rx.exceptions.CompositeException +import rx.exceptions.Exceptions +import rx.exceptions.OnErrorFailedException +import rx.exceptions.OnErrorNotImplementedException +import rx.internal.util.ActionSubscriber +import rx.observers.SafeSubscriber +import rx.plugins.RxJavaHooks +import rx.plugins.RxJavaPlugins +import rx.subjects.Subject + +/** + * Extends [SafeSubscriber] to override [SafeSubscriber.onNext], [SafeSubscriber.onError] and [SafeSubscriber._onError]. + * + * [ResilientSubscriber] will not set [SafeSubscriber.done] flag to true nor will call [SafeSubscriber.unsubscribe] upon + * error inside [Observer.onNext]. This way, the [ResilientSubscriber] will not get unsubscribed and therefore the underlying [Observer] + * will not get removed. + * + * An [Observer] that will not get removed due to errors in [onNext] events becomes useful when an unsubscribe could + * lead to a malfunctioning CorDapp, due to a single isolated error. If the [Observer] gets removed, + * it will no longer be available the next time any events are pushed from the base [Subject]. + */ +@VisibleForTesting +class ResilientSubscriber(actual: Subscriber) : SafeSubscriber(actual) { + + /** + * Duplicate of [SafeSubscriber.onNext]. However, it ignores [SafeSubscriber.done] flag. + * It only delegates to [SafeSubscriber.onError] if it wraps an [ActionSubscriber] which is + * a leaf in an Subscribers' tree structure. + */ + @Suppress("TooGenericExceptionCaught") + override fun onNext(t: T) { + try { + actual.onNext(t) + } catch (e: Throwable) { + if (actual is ActionSubscriber) { + // this Subscriber wraps an ActionSubscriber which is always a leaf Observer, then call user-defined onError + Exceptions.throwOrReport(e, this) + } else { + // this Subscriber may wrap a non leaf Observer. In case the wrapped Observer is a PublishSubject then we + // should not call onError because PublishSubjectState.onError will shut down all of the Observers under it + throw OnNextFailedException( + "Observer.onNext failed, this is a non leaf ResilientSubscriber, therefore onError will be skipped", e + ) + } + } + } + + /** + * Duplicate of [SafeSubscriber.onError]. However, it will not set [SafeSubscriber.done] flag to true. + */ + override fun onError(e: Throwable) { + Exceptions.throwIfFatal(e) + _onError(e) + } + + /** + * Duplicate of [SafeSubscriber._onError]. However, it will not call [Subscriber.unsubscribe]. + */ + @Suppress("TooGenericExceptionCaught") + override fun _onError(e: Throwable) { + @Suppress("DEPRECATION") + RxJavaPlugins.getInstance().errorHandler.handleError(e) + try { + actual.onError(e) + } catch (e: OnErrorNotImplementedException) { + throw e + } catch (e2: Throwable) { + RxJavaHooks.onError(e2) + throw OnErrorFailedException( + "Error occurred when trying to propagate error to Observer.onError", CompositeException(listOf(e, e2)) + ) + } + } +} + +/** + * We throw [OnNextFailedException] to pass the exception back through the preceding [Subscriber] chain + * without triggering any [SafeSubscriber.onError]s. Since we are extending an [OnErrorNotImplementedException] + * the exception will be re-thrown at [Exceptions.throwOrReport]. + */ +@VisibleForTesting +class OnNextFailedException(message: String, cause: Throwable) : OnErrorNotImplementedException(message, cause) + +/** + * [OnResilientSubscribe] returns an [Observable] holding a reference to the source [Observable]. Upon subscribing to it, + * when reaching [call] method, if the subscriber passed in [isSafeSubscriber] it will unwrap the [Observer] from + * the [SafeSubscriber], re-wrap it with [ResilientSubscriber] and then subscribe it to the source [Observable]. + * + * In case we need to subscribe with a [SafeSubscriber] to the source [Observable] via [OnResilientSubscribe], we have to: + * 1. Declare a custom SafeSubscriber extending [SafeSubscriber]. + * 2. Wrap our [rx.Observer] -to be subscribed to the source [Observable]- with the custom SafeSubscriber. + * 3. Create a [OnResilientSubscribe] object with [strictMode] = false. + * 3. Call [Observable.unsafeCreate] passing in as argument the [OnResilientSubscribe]. + * 4. Subscribe to the returned [Observable] passing in as argument the custom SafeSubscriber. + */ +class OnResilientSubscribe(val source: Observable, private val strictMode: Boolean): Observable.OnSubscribe { + + override fun call(subscriber: Subscriber) { + if (isSafeSubscriber(subscriber)) { + source.unsafeSubscribe(ResilientSubscriber((subscriber as SafeSubscriber).actual)) + } else { + source.unsafeSubscribe(subscriber) + } + } + + private fun isSafeSubscriber(subscriber: Subscriber<*>): Boolean { + return if (strictMode) { + // In strictMode mode we capture SafeSubscriber subclasses as well + SafeSubscriber::class.java.isAssignableFrom(subscriber::class.java) + } else { + subscriber::class == SafeSubscriber::class + } + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt index ac7bc8383e..5cd4529c6d 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt @@ -1,34 +1,49 @@ package net.corda.node.services.vault import co.paralleluniverse.strands.concurrent.Semaphore +import com.r3.dbfailure.contracts.DbFailureContract import com.r3.dbfailure.workflows.CreateStateFlow -import com.r3.dbfailure.workflows.CreateStateFlow.Initiator import com.r3.dbfailure.workflows.CreateStateFlow.errorTargetsToNum import com.r3.dbfailure.workflows.DbListenerService import com.r3.dbfailure.workflows.DbListenerService.MakeServiceThrowErrorFlow +import com.r3.dbfailure.workflows.SendStateFlow import com.r3.transactionfailure.workflows.ErrorHandling import com.r3.transactionfailure.workflows.ErrorHandling.CheckpointAfterErrorFlow +import net.corda.core.CordaRuntimeException +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.UniqueIdentifier +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC +import net.corda.core.identity.Party import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.startFlow +import net.corda.core.node.services.Vault +import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.utilities.contextLogger import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds import net.corda.node.services.Permissions import net.corda.node.services.statemachine.StaffedFlowHospital import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.singleIdentity import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.NodeHandle import net.corda.testing.driver.OutOfProcess import net.corda.testing.driver.driver import net.corda.testing.node.User import net.corda.testing.node.internal.findCordapp +import org.assertj.core.api.Assertions import org.junit.After import org.junit.Assert import org.junit.Test import java.lang.IllegalStateException import java.sql.SQLException +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException import javax.persistence.PersistenceException +import kotlin.test.assertEquals import kotlin.test.assertFailsWith import kotlin.test.assertTrue @@ -49,6 +64,10 @@ class VaultObserverExceptionTest { StaffedFlowHospital.onFlowKeptForOvernightObservation.clear() StaffedFlowHospital.onFlowAdmitted.clear() DbListenerService.onError = null + DbListenerService.safeSubscription = true + DbListenerService.onNextVisited = {} + DbListenerService.onErrorVisited = null + DbListenerService.withCustomSafeSubscriber = false } /** @@ -74,9 +93,43 @@ class VaultObserverExceptionTest { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() aliceNode.rpc.startFlow( - ::Initiator, - "Syntax Error in Custom SQL", - CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError) + CreateStateFlow::Initiator, + "Syntax Error in Custom SQL", + CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError) + ).returnValue.then { testControlFuture.complete(false) } + val foundExpectedException = testControlFuture.getOrThrow(30.seconds) + + Assert.assertTrue(foundExpectedException) + } + } + + /** + * Causing an SqlException via a syntax error in a vault observer causes the flow to hit the + * DatabsaseEndocrinologist in the FlowHospital and being kept for overnight observation - Unsafe subscribe + */ + @Test(timeout=300_000) + fun unhandledSqlExceptionFromVaultObserverGetsHospitalisedUnsafeSubscription() { + DbListenerService.safeSubscription = false + val testControlFuture = openFuture().toCompletableFuture() + + StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { + when (it) { + is SQLException -> { + testControlFuture.complete(true) + } + } + false + } + + driver(DriverParameters( + startNodesInProcess = true, + cordappsForAllNodes = testCordapps())) { + val aliceUser = User("user", "foo", setOf(Permissions.all())) + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() + aliceNode.rpc.startFlow( + CreateStateFlow::Initiator, + "Syntax Error in Custom SQL", + CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError) ).returnValue.then { testControlFuture.complete(false) } val foundExpectedException = testControlFuture.getOrThrow(30.seconds) @@ -103,7 +156,7 @@ class VaultObserverExceptionTest { cordappsForAllNodes = testCordapps())) { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() - aliceNode.rpc.startFlow(::Initiator, "Exception", CreateStateFlow.errorTargetsToNum( + aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "Exception", CreateStateFlow.errorTargetsToNum( CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions, CreateStateFlow.ErrorTarget.FlowSwallowErrors)) waitUntilHospitalised.acquire() // wait here until flow gets hospitalised @@ -131,9 +184,9 @@ class VaultObserverExceptionTest { cordappsForAllNodes = testCordapps())) { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() - aliceNode.rpc.startFlow(::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum( - CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter, - CreateStateFlow.ErrorTarget.FlowSwallowErrors)) + aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum( + CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter, + CreateStateFlow.ErrorTarget.FlowSwallowErrors)) waitUntilHospitalised.acquire() // wait here until flow gets hospitalised } @@ -167,7 +220,7 @@ class VaultObserverExceptionTest { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() assertFailsWith("PersistenceException") { - aliceNode.rpc.startFlow(::Initiator, "EntityManager", errorTargetsToNum( + aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "EntityManager", errorTargetsToNum( CreateStateFlow.ErrorTarget.TxInvalidState)) .returnValue.getOrThrow(30.seconds) } @@ -200,7 +253,7 @@ class VaultObserverExceptionTest { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() val flowHandle = aliceNode.rpc.startFlow( - ::Initiator, "EntityManager", + CreateStateFlow::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum( CreateStateFlow.ErrorTarget.TxInvalidState, CreateStateFlow.ErrorTarget.FlowSwallowErrors)) @@ -228,7 +281,7 @@ class VaultObserverExceptionTest { cordappsForAllNodes = testCordapps())) { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() - val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum( + val flowHandle = aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum( CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError, CreateStateFlow.ErrorTarget.FlowSwallowErrors)) val flowResult = flowHandle.returnValue @@ -251,7 +304,7 @@ class VaultObserverExceptionTest { cordappsForAllNodes = testCordapps())) { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() - val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum( + val flowHandle = aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum( CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError, CreateStateFlow.ErrorTarget.ServiceSwallowErrors)) val flowResult = flowHandle.returnValue @@ -281,20 +334,20 @@ class VaultObserverExceptionTest { } driver(DriverParameters( - inMemoryDB = false, - startNodesInProcess = true, - isDebug = true, - cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), - findCordapp("com.r3.dbfailure.workflows"), - findCordapp("com.r3.transactionfailure.workflows"), - findCordapp("com.r3.dbfailure.schemas")))) { + inMemoryDB = false, + startNodesInProcess = true, + isDebug = true, + cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), + findCordapp("com.r3.dbfailure.workflows"), + findCordapp("com.r3.transactionfailure.workflows"), + findCordapp("com.r3.dbfailure.schemas")))) { val aliceUser = User("user", "foo", setOf(Permissions.all())) val node = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() node.rpc.startFlow(::CheckpointAfterErrorFlow, CreateStateFlow.errorTargetsToNum( CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions, // throw not persistence exception CreateStateFlow.ErrorTarget.FlowSwallowErrors - ) + ) ) waitUntilHospitalised.acquire() @@ -329,9 +382,9 @@ class VaultObserverExceptionTest { cordappsForAllNodes = testCordapps())) { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() - aliceNode.rpc.startFlow(::Initiator, "Exception", CreateStateFlow.errorTargetsToNum( - CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter, - CreateStateFlow.ErrorTarget.FlowSwallowErrors)) + aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "Exception", CreateStateFlow.errorTargetsToNum( + CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter, + CreateStateFlow.ErrorTarget.FlowSwallowErrors)) waitUntilHospitalised.acquire() // wait here until flow gets hospitalised } @@ -344,7 +397,7 @@ class VaultObserverExceptionTest { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser), startInSameProcess = false).getOrThrow() aliceNode.rpc.startFlow(::MakeServiceThrowErrorFlow).returnValue.getOrThrow() - aliceNode.rpc.startFlow(::Initiator, "UnrecoverableError", CreateStateFlow.errorTargetsToNum( + aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "UnrecoverableError", CreateStateFlow.errorTargetsToNum( CreateStateFlow.ErrorTarget.ServiceThrowUnrecoverableError)) val terminated = (aliceNode as OutOfProcess).process.waitFor(30, TimeUnit.SECONDS) @@ -363,4 +416,459 @@ class VaultObserverExceptionTest { } } + /** + * An error is thrown inside of the [VaultService.rawUpdates] observable while recording a transaction inside of the initiating node. + * + * This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction + * also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the + * counterparty node. + * + * More importantly, the observer listening to the [VaultService.rawUpdates] observable should not unsubscribe. + * + * Check onNext is visited the correct number of times. + * + * This test causes 2 failures inside of the observer to ensure that the observer is still subscribed. + */ + @Test(timeout=300_000) + fun `Throw user error in VaultService rawUpdates during FinalityFlow blows up the flow but does not break the Observer - onNext check`() { + var observationCounter = 0 + StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter } + + val rawUpdatesCount = ConcurrentHashMap() + DbListenerService.onNextVisited = { party -> + if (rawUpdatesCount.putIfAbsent(party, 1) != null) { + rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 } + } + } + + val user = User("user", "foo", setOf(Permissions.all())) + driver(DriverParameters(startNodesInProcess = true, + cordappsForAllNodes = listOf( + findCordapp("com.r3.dbfailure.contracts"), + findCordapp("com.r3.dbfailure.workflows"), + findCordapp("com.r3.dbfailure.schemas") + ),inMemoryDB = false) + ) { + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() + val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first() + + val startErrorInObservableWhenConsumingState = { + + val stateId = aliceNode.rpc.startFlow( + CreateStateFlow::Initiator, + "AllGood", + errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed) + ).returnValue.getOrThrow(30.seconds) + + println("Created new state") + + val flowHandle = aliceNode.rpc.startFlow( + SendStateFlow::PassErroneousOwnableState, // throws at consumed state -> should end up in hospital -> flow should hang + stateId, + errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError), + bobNode.nodeInfo.legalIdentities.first() + ) + + Assertions.assertThatExceptionOfType(TimeoutException::class.java) + .isThrownBy { flowHandle.returnValue.getOrThrow(20.seconds) } + + stateId + } + + assertEquals(0, notary.getNotarisedTransactionIds().size) + + println("First set of flows") + val stateId = startErrorInObservableWhenConsumingState() + assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size) + assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size) + assertEquals(1, notary.getNotarisedTransactionIds().size) + assertEquals(1, observationCounter) + assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()]) + assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0)) + + println("Second set of flows") + val stateId2 = startErrorInObservableWhenConsumingState() + assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size) + assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size) + assertEquals(2, notary.getNotarisedTransactionIds().size) + assertEquals(2, observationCounter) + assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()]) + assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0)) + } + } + + /** + * An error is thrown inside of the [VaultService.rawUpdates] observable while recording a transaction inside of the initiating node. + * + * This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction + * also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the + * counterparty node. + * + * More importantly, the observer listening to the [VaultService.rawUpdates] observable should not unsubscribe. + * + * Check onNext and onError are visited the correct number of times. + * + * This test causes 2 failures inside of the observer to ensure that the observer is still subscribed. + */ + @Test(timeout=300_000) + fun `Throw user error in VaultService rawUpdates during FinalityFlow blows up the flow but does not break the Observer - onNext and onError check`() { + var observationCounter = 0 + StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter } + + val rawUpdatesCount = ConcurrentHashMap() + DbListenerService.onNextVisited = { party -> + if (rawUpdatesCount.putIfAbsent(party, 1) != null) { + rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 } + } + } + + DbListenerService.onError = {/*just rethrow - we just want to check that onError gets visited by parties*/ throw it} + DbListenerService.onErrorVisited = { party -> + if (rawUpdatesCount.putIfAbsent(party, 1) != null) { + rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 } + } + } + + val user = User("user", "foo", setOf(Permissions.all())) + driver(DriverParameters(startNodesInProcess = true, + cordappsForAllNodes = listOf( + findCordapp("com.r3.dbfailure.contracts"), + findCordapp("com.r3.dbfailure.workflows"), + findCordapp("com.r3.dbfailure.schemas") + ), + inMemoryDB = false) + ) { + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() + val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first() + + val startErrorInObservableWhenConsumingState = { + + val stateId = aliceNode.rpc.startFlow( + CreateStateFlow::Initiator, + "AllGood", + // should be a hospital exception + errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed) + ).returnValue.getOrThrow(30.seconds) + + val flowHandle = aliceNode.rpc.startFlow( + SendStateFlow::PassErroneousOwnableState, + stateId, + errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError), + bobNode.nodeInfo.legalIdentities.first() + ) + + Assertions.assertThatExceptionOfType(TimeoutException::class.java) + .isThrownBy { flowHandle.returnValue.getOrThrow(20.seconds) } + + stateId + } + + assertEquals(0, notary.getNotarisedTransactionIds().size) + + val stateId = startErrorInObservableWhenConsumingState() + assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size) + assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size) + assertEquals(1, notary.getNotarisedTransactionIds().size) + assertEquals(1, observationCounter) + assertEquals(3, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()]) + assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0)) + + val stateId2 = startErrorInObservableWhenConsumingState() + assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size) + assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size) + assertEquals(2, notary.getNotarisedTransactionIds().size) + assertEquals(2, observationCounter) + assertEquals(6, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()]) + assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0)) + } + } + + /** + * An error is thrown inside of the [VaultService.rawUpdates] observable while recording a transaction inside of the counterparty node. + * + * This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. + * Observer events are recorded on both the initiating node and the counterparty node. + * + * More importantly, the observer listening to the [VaultService.rawUpdates] observable should not unsubscribe. + * + * This test causes 2 failures inside of the observer to ensure that the observer is still subscribed. + */ + @Test(timeout=300_000) + fun `Throw user error in VaultService rawUpdates during counterparty FinalityFlow blows up the flow but does not break the Observer`() { + var observationCounter = 0 + StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter } + + val rawUpdatesCount = ConcurrentHashMap() + DbListenerService.onNextVisited = { party -> + if (rawUpdatesCount.putIfAbsent(party, 1) != null) { + rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 } + } + } + + val user = User("user", "foo", setOf(Permissions.all())) + driver(DriverParameters(startNodesInProcess = true, + cordappsForAllNodes = listOf( + findCordapp("com.r3.dbfailure.contracts"), + findCordapp("com.r3.dbfailure.workflows"), + findCordapp("com.r3.dbfailure.schemas") + ), + inMemoryDB = false) + ) { + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() + val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first() + + val startErrorInObservableWhenCreatingSecondState = { + + val stateId = aliceNode.rpc.startFlow( + CreateStateFlow::Initiator, + "AllGood", + errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError) + ).returnValue.getOrThrow(30.seconds) + + aliceNode.rpc.startFlow( + SendStateFlow::PassErroneousOwnableState, + stateId, + errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError), + bobNode.nodeInfo.legalIdentities.first() + ).returnValue.getOrThrow(20.seconds) + + stateId + } + + assertEquals(0, notary.getNotarisedTransactionIds().size) + + val stateId = startErrorInObservableWhenCreatingSecondState() + assertEquals(1, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size) + assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size) + assertEquals(1, notary.getNotarisedTransactionIds().size) + assertEquals(1, observationCounter) + assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()]) + assertEquals(1, rawUpdatesCount[bobNode.nodeInfo.singleIdentity()]) + + val stateId2 = startErrorInObservableWhenCreatingSecondState() + assertEquals(1, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size) + assertEquals(2, aliceNode.getAllStates(Vault.StateStatus.CONSUMED).size) + assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size) + assertEquals(2, notary.getNotarisedTransactionIds().size) + assertEquals(2, observationCounter) + assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()]) + assertEquals(2, rawUpdatesCount[bobNode.nodeInfo.singleIdentity()]) + } + } + + /** + * An error is thrown inside of the [VaultService.updates] observable while recording a transaction inside of the initiating node. + * + * This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction + * also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the + * counterparty node. + * + * More importantly, the observer listening to the [VaultService.updates] observable should not unsubscribe. + * + * This test causes 2 failures inside of the [rx.Observer] to ensure that the Observer is still subscribed. + */ + @Test(timeout=300_000) + fun `Throw user error in VaultService rawUpdates during FinalityFlow blows up the flow but does not break the Observer`() { + var observationCounter = 0 + StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter } + + val rawUpdatesCount = ConcurrentHashMap() + DbListenerService.onNextVisited = { party -> + if (rawUpdatesCount.putIfAbsent(party, 1) != null) { + rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 } + } + } + + val user = User("user", "foo", setOf(Permissions.all())) + driver(DriverParameters(startNodesInProcess = true, + cordappsForAllNodes = listOf( + findCordapp("com.r3.dbfailure.contracts"), + findCordapp("com.r3.dbfailure.workflows"), + findCordapp("com.r3.dbfailure.schemas") + ), + inMemoryDB = false) + ) { + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() + val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first() + + val startErrorInObservableWhenConsumingState = { + + val stateId = aliceNode.rpc.startFlow( + CreateStateFlow::Initiator, + "AllGood", + errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed) + ).returnValue.getOrThrow(30.seconds) + + val flowHandle = aliceNode.rpc.startFlow( + SendStateFlow::PassErroneousOwnableState, + stateId, + errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError), + bobNode.nodeInfo.legalIdentities.first() + ) + + Assertions.assertThatExceptionOfType(TimeoutException::class.java) + .isThrownBy { flowHandle.returnValue.getOrThrow(20.seconds) } + + stateId + } + + assertEquals(0, notary.getNotarisedTransactionIds().size) + + val stateId = startErrorInObservableWhenConsumingState() + assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size) + assertEquals(1, aliceNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size) + assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size) + assertEquals(1, notary.getNotarisedTransactionIds().size) + assertEquals(1, observationCounter) + assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()]) + assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0)) + + val stateId2 = startErrorInObservableWhenConsumingState() + assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size) + assertEquals(2, aliceNode.getAllStates(Vault.StateStatus.UNCONSUMED).size) + assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size) + assertEquals(2, notary.getNotarisedTransactionIds().size) + assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()]) + assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0)) + } + } + + @Test(timeout=300_000) + fun `Accessing NodeVaultService rawUpdates from a flow is not allowed` () { + val user = User("user", "foo", setOf(Permissions.all())) + driver(DriverParameters(startNodesInProcess = true, + cordappsForAllNodes = listOf( + findCordapp("com.r3.dbfailure.contracts"), + findCordapp("com.r3.dbfailure.workflows"), + findCordapp("com.r3.dbfailure.schemas") + ), + inMemoryDB = false) + ) { + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + + val flowHandle = aliceNode.rpc.startFlow(::SubscribingRawUpdatesFlow) + + assertFailsWith( + "Flow ${SubscribingRawUpdatesFlow::class.java.name} tried to access VaultService.rawUpdates " + + "- Rx.Observables should only be accessed outside the context of a flow " + ) { + flowHandle.returnValue.getOrThrow(30.seconds) + } + } + } + + @Test(timeout=300_000) + fun `Failing Observer wrapped with ResilientSubscriber will survive and be re-called upon flow retry`() { + var onNextCount = 0 + var onErrorCount = 0 + DbListenerService.onNextVisited = { _ -> onNextCount++ } + DbListenerService.onError = {/*just rethrow - we just want to check that onError gets visited by parties*/ throw it} + DbListenerService.onErrorVisited = { _ -> onErrorCount++ } + + val user = User("user", "foo", setOf(Permissions.all())) + driver(DriverParameters(startNodesInProcess = true, + cordappsForAllNodes = listOf( + findCordapp("com.r3.dbfailure.contracts"), + findCordapp("com.r3.dbfailure.workflows"), + findCordapp("com.r3.transactionfailure.workflows"), + findCordapp("com.r3.dbfailure.schemas")), + inMemoryDB = false) + ) { + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + + assertFailsWith { + aliceNode.rpc.startFlow( + ErrorHandling::CheckpointAfterErrorFlow, + CreateStateFlow.errorTargetsToNum( + CreateStateFlow.ErrorTarget.ServiceConstraintViolationException, + CreateStateFlow.ErrorTarget.FlowSwallowErrors + ) + ).returnValue.getOrThrow(20.seconds) + } + + assertEquals(4, onNextCount) + assertEquals(4, onErrorCount) + } + } + + @Test(timeout=300_000) + fun `Users may subscribe to NodeVaultService rawUpdates with their own custom SafeSubscribers`() { + var onNextCount = 0 + DbListenerService.onNextVisited = { _ -> onNextCount++ } + + val user = User("user", "foo", setOf(Permissions.all())) + driver(DriverParameters(startNodesInProcess = true, + cordappsForAllNodes = listOf( + findCordapp("com.r3.dbfailure.contracts"), + findCordapp("com.r3.dbfailure.workflows"), + findCordapp("com.r3.transactionfailure.workflows"), + findCordapp("com.r3.dbfailure.schemas")), + inMemoryDB = false) + ) { + // Subscribing with custom SafeSubscriber; the custom SafeSubscriber will not get replaced by a ResilientSubscriber + // meaning that it will behave as a SafeSubscriber; it will get unsubscribed upon throwing an error. + // Because we throw a ConstraintViolationException, the Rx Observer will get unsubscribed but the flow will retry + // from previous checkpoint, however the Observer will no longer be there. + DbListenerService.withCustomSafeSubscriber = true + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + + aliceNode.rpc.startFlow( + ErrorHandling::CheckpointAfterErrorFlow, + CreateStateFlow.errorTargetsToNum( + CreateStateFlow.ErrorTarget.ServiceConstraintViolationException, + CreateStateFlow.ErrorTarget.FlowSwallowErrors + ) + ).returnValue.getOrThrow(20.seconds) + + assertEquals(1, onNextCount) + } + } + + private fun NodeHandle.getNotarisedTransactionIds(): List { + + @StartableByRPC + class NotarisedTxs : FlowLogic>() { + override fun call(): List { + val session = serviceHub.jdbcSession() + val statement = session.createStatement() + statement.execute("SELECT TRANSACTION_ID FROM NODE_NOTARY_COMMITTED_TXS;") + val result = mutableListOf() + while (statement.resultSet.next()) { + result.add(statement.resultSet.getString(1)) + } + return result + } + } + + return rpc.startFlowDynamic(NotarisedTxs::class.java).returnValue.getOrThrow() + } + + private fun NodeHandle.getStatesById(id: UniqueIdentifier?, status: Vault.StateStatus): List> { + return rpc.vaultQueryByCriteria( + QueryCriteria.LinearStateQueryCriteria( + linearId = if (id != null) listOf(id) else null, + status = status + ), DbFailureContract.TestState::class.java + ).states + } + + private fun NodeHandle.getAllStates(status: Vault.StateStatus): List> { + return getStatesById(null, status) + } + + @StartableByRPC + class SubscribingRawUpdatesFlow: FlowLogic() { + override fun call() { + logger.info("Accessing rawUpdates within a flow will throw! ") + val rawUpdates = serviceHub.vaultService.rawUpdates // throws + logger.info("Code flow should never reach this logging or the following segment! ") + rawUpdates.subscribe { + println("Code flow should never get in here!") + } + } + } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index e60fd10cb4..438e2c8dec 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -2,6 +2,7 @@ package net.corda.node.services.vault import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.strands.Strand +import net.corda.core.CordaRuntimeException import net.corda.core.contracts.* import net.corda.core.crypto.SecureHash import net.corda.core.crypto.containsAny @@ -13,6 +14,7 @@ import net.corda.core.node.StatesToRecord import net.corda.core.node.services.* import net.corda.core.node.services.Vault.ConstraintInfo.Companion.constraintInfo import net.corda.core.node.services.vault.* +import net.corda.core.observable.internal.OnResilientSubscribe import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.transactions.* @@ -209,7 +211,27 @@ class NodeVaultService( } override val rawUpdates: Observable> - get() = mutex.locked { _rawUpdatesPublisher } + get() = mutex.locked { + FlowStateMachineImpl.currentStateMachine()?.let { + // we are inside a flow; we cannot allow flows to subscribe Rx Observers, + // because the Observer could reference flow's properties, essentially fiber's properties then, + // since it does not unsubscribe on flow's/ fiber's completion, + // it could prevent the flow/ fiber -object- get garbage collected. + log.error( + "Flow ${it.logic::class.java.name} tried to access VaultService.rawUpdates " + + "- Rx.Observables should only be accessed outside the context of a flow " + + "- aborting the flow " + ) + + throw CordaRuntimeException( + "Flow ${it.logic::class.java.name} tried to access VaultService.rawUpdates " + + "- Rx.Observables should only be accessed outside the context of a flow " + ) + } + // we are not inside a flow, we are most likely inside a CordaService; + // we will expose, by default, subscribing of -non unsubscribing- rx.Observers to rawUpdates. + return _rawUpdatesPublisher.resilientOnError() + } override val updates: Observable> get() = mutex.locked { _updatesInDbTx } @@ -414,7 +436,7 @@ class NodeVaultService( HospitalizeFlowException(wrapped) } } - } ?: HospitalizeFlowException(e) + } ?: (e as? SQLException ?: (e as? PersistenceException ?: HospitalizeFlowException(e))) } } } @@ -795,4 +817,7 @@ class NodeVaultService( } return myTypes } -} \ No newline at end of file +} + +/** The Observable returned allows subscribing with custom SafeSubscribers to source [Observable]. */ +internal fun Observable.resilientOnError(): Observable = Observable.unsafeCreate(OnResilientSubscribe(this, false)) \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt b/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt index e9817111d3..13f92edb18 100644 --- a/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt +++ b/node/src/test/kotlin/net/corda/node/utilities/ObservablesTests.kt @@ -3,6 +3,10 @@ package net.corda.node.utilities import com.google.common.util.concurrent.SettableFuture import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.internal.tee +import net.corda.core.observable.internal.ResilientSubscriber +import net.corda.core.observable.internal.OnNextFailedException +import net.corda.core.observable.continueOnError +import net.corda.node.services.vault.resilientOnError import net.corda.nodeapi.internal.persistence.* import net.corda.testing.internal.configureDatabase import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties @@ -10,9 +14,17 @@ import org.assertj.core.api.Assertions.assertThat import org.junit.After import org.junit.Test import rx.Observable +import rx.Subscriber +import rx.exceptions.CompositeException +import rx.exceptions.OnErrorFailedException +import rx.exceptions.OnErrorNotImplementedException +import rx.internal.util.ActionSubscriber +import rx.observers.SafeSubscriber import rx.observers.Subscribers import rx.subjects.PublishSubject import java.io.Closeable +import java.lang.IllegalArgumentException +import java.lang.IllegalStateException import java.lang.RuntimeException import java.util.* import kotlin.test.assertEquals @@ -194,7 +206,7 @@ class ObservablesTests { * tee combines [PublishSubject]s under one PublishSubject. We need to make sure that they are not wrapped with a [SafeSubscriber]. * Otherwise, if a non Rx exception gets thrown from a subscriber under one of the PublishSubject it will get caught by the * SafeSubscriber wrapping that PublishSubject and will call [PublishSubject.PublishSubjectState.onError], which will - * eventually shut down all of the subscribers under that PublishSubjectState. + * eventually shut down all of the subscribers under that PublishSubject. */ @Test(timeout=300_000) fun `error in unsafe subscriber won't shutdown subscribers under same publish subject, after tee`() { @@ -214,6 +226,200 @@ class ObservablesTests { assertEquals(2, count) } + @Test(timeout=300_000) + fun `continueOnError subscribes ResilientSubscribers, wrapped Observers will survive errors from onNext`() { + var heartBeat1 = 0 + var heartBeat2 = 0 + val source = PublishSubject.create() + val continueOnError = source.continueOnError() + continueOnError.subscribe { runNo -> + // subscribes with a ResilientSubscriber + heartBeat1++ + if (runNo == 1) { + throw IllegalStateException() + } + } + continueOnError.subscribe { runNo -> + // subscribes with a ResilientSubscriber + heartBeat2++ + if (runNo == 2) { + throw IllegalStateException() + } + } + + assertFailsWith { + source.onNext(1) // first observer only will run and throw + } + assertFailsWith { + source.onNext(2) // first observer will run, second observer will run and throw + } + source.onNext(3) // both observers will run + assertEquals(3, heartBeat1) + assertEquals(2, heartBeat2) + } + + @Test(timeout=300_000) + fun `PublishSubject unsubscribes ResilientSubscribers only upon explicitly calling onError`() { + var heartBeat = 0 + val source = PublishSubject.create() + source.continueOnError().subscribe { heartBeat += it } + source.continueOnError().subscribe { heartBeat += it } + source.onNext(1) + // send an onError event + assertFailsWith { + source.onError(IllegalStateException()) // all ResilientSubscribers under PublishSubject get unsubscribed here + } + source.onNext(1) + assertEquals(2, heartBeat) + } + + @Test(timeout=300_000) + fun `PublishSubject wrapped with a SafeSubscriber shuts down the whole structure, if one of them is unsafe and it throws`() { + var heartBeat = 0 + val source = PublishSubject.create() + source.unsafeSubscribe(Subscribers.create { runNo -> // subscribes unsafe; It does not wrap with ResilientSubscriber + heartBeat++ + if (runNo == 1) { + throw IllegalStateException() + } + }) + source.continueOnError().subscribe { heartBeat += it } + // wrapping PublishSubject with a SafeSubscriber + val sourceWrapper = SafeSubscriber(Subscribers.from(source)) + assertFailsWith { + sourceWrapper.onNext(1) + } + sourceWrapper.onNext(2) + assertEquals(1, heartBeat) + } + + /** + * A [ResilientSubscriber] that is NOT a leaf in a subscribers structure will not call [onError] + * if an error occurs during its [onNext] event processing. + * + * The reason why it should not call its onError is: if it wraps a [PublishSubject], calling [ResilientSubscriber.onError] + * will then call [PublishSubject.onError] which will shut down all the subscribers under the [PublishSubject]. + */ + @Test(timeout=300_000) + fun `PublishSubject wrapped with a ResilientSubscriber will preserve the structure, if one of its children subscribers is unsafe and it throws`() { + var heartBeat = 0 + val source = PublishSubject.create() + source.unsafeSubscribe(Subscribers.create { runNo -> + heartBeat++ + if (runNo == 1) { + throw IllegalStateException() + } + }) + source.continueOnError().subscribe { heartBeat++ } + // wrap PublishSubject with a ResilientSubscriber + val sourceWrapper = ResilientSubscriber(Subscribers.from(source)) + assertFailsWith("Observer.onNext failed, this is a non leaf ResilientSubscriber, therefore onError will be skipped") { + sourceWrapper.onNext(1) + } + sourceWrapper.onNext(2) + assertEquals(3, heartBeat) + } + + @Test(timeout=300_000) + fun `throwing inside onNext of a ResilientSubscriber leaf subscriber will call onError`() { + var heartBeatOnNext = 0 + var heartBeatOnError = 0 + val source = PublishSubject.create() + // add a leaf ResilientSubscriber + source.continueOnError().subscribe({ + heartBeatOnNext++ + throw IllegalStateException() + }, { + heartBeatOnError++ + }) + + source.onNext(1) + source.onNext(1) + assertEquals(2, heartBeatOnNext) + assertEquals(2, heartBeatOnError) + } + + /** + * In this test ResilientSubscriber throws an OnNextFailedException which is a OnErrorNotImplementedException. + * Because its underlying subscriber is not an ActionSubscriber, it will not be considered as a leaf ResilientSubscriber. + */ + @Test(timeout=300_000) + fun `throwing ResilientSubscriber at onNext will wrap with a Rx OnErrorNotImplementedException`() { + val resilientSubscriber = ResilientSubscriber(Subscribers.create { throw IllegalStateException() }) + assertFailsWith { // actually fails with an OnNextFailedException + resilientSubscriber.onNext(1) + } + } + + @Test(timeout=300_000) + fun `throwing inside ResilientSubscriber onError will wrap with a Rx OnErrorFailedException`() { + val resilientSubscriber = ResilientSubscriber( + ActionSubscriber( + { throw IllegalStateException() }, + { throw IllegalStateException() }, + null + ) + ) + assertFailsWith { + resilientSubscriber.onNext(1) + } + } + + /** + * In this test we create a chain of Subscribers with this the following order: + * ResilientSubscriber_X -> PublishSubject -> ResilientSubscriber_Y + * + * ResilientSubscriber_Y.onNext throws an error, since ResilientSubscriber_Y.onError is not defined, + * it will throw a OnErrorNotImplementedException. Then it will be propagated back until ResilientSubscriber_X. + * ResilientSubscriber_X will identify it is a not leaf subscriber and therefore will rethrow it as OnNextFailedException. + */ + @Test(timeout=300_000) + fun `propagated Rx exception will be rethrown at ResilientSubscriber onError`() { + val source = PublishSubject.create() + source.continueOnError().subscribe { throw IllegalStateException("123") } // will give a leaf ResilientSubscriber + val sourceWrapper = ResilientSubscriber(Subscribers.from(source)) // will give an inner ResilientSubscriber + + assertFailsWith("Observer.onNext failed, this is a non leaf ResilientSubscriber, therefore onError will be skipped") { + // IllegalStateException will be wrapped and rethrown as a OnErrorNotImplementedException in leaf ResilientSubscriber, + // will be caught by inner ResilientSubscriber and just be rethrown + sourceWrapper.onNext(1) + } + } + + @Test(timeout=300_000) + fun `test OnResilientSubscribe strictMode = true replaces SafeSubscriber subclass`() { + var heartBeat = 0 + val customSafeSubscriber = CustomSafeSubscriber( + Subscribers.create { + heartBeat++ + throw IllegalArgumentException() + }) + + val source = PublishSubject.create() + source.continueOnError().subscribe(customSafeSubscriber) // it should replace CustomSafeSubscriber with ResilientSubscriber + + assertFailsWith { source.onNext(1) } + assertFailsWith { source.onNext(1) } + assertEquals(2, heartBeat) + } + + @Test(timeout=300_000) + fun `test OnResilientSubscribe strictMode = false will not replace SafeSubscriber subclass`() { + var heartBeat = 0 + val customSafeSubscriber = CustomSafeSubscriber( + Subscribers.create { + heartBeat++ + throw IllegalArgumentException() + }) + + val source = PublishSubject.create() + source.resilientOnError().subscribe(customSafeSubscriber) // it should not replace CustomSafeSubscriber with ResilientSubscriber + + assertFailsWith { source.onNext(1) } + source.onNext(1) + assertEquals(1, heartBeat) + } + @Test(timeout=300_000) fun `combine tee and bufferUntilDatabaseCommit`() { val database = createDatabase() @@ -359,4 +565,6 @@ class ObservablesTests { subscription3.unsubscribe() } + + class CustomSafeSubscriber(actual: Subscriber): SafeSubscriber(actual) } \ No newline at end of file diff --git a/testing/cordapps/dbfailure/dbfcontracts/src/main/kotlin/com/r3/dbfailure/contracts/DbFailureContract.kt b/testing/cordapps/dbfailure/dbfcontracts/src/main/kotlin/com/r3/dbfailure/contracts/DbFailureContract.kt index c344badebb..a9b3b45dce 100644 --- a/testing/cordapps/dbfailure/dbfcontracts/src/main/kotlin/com/r3/dbfailure/contracts/DbFailureContract.kt +++ b/testing/cordapps/dbfailure/dbfcontracts/src/main/kotlin/com/r3/dbfailure/contracts/DbFailureContract.kt @@ -1,12 +1,13 @@ package com.r3.dbfailure.contracts import com.r3.dbfailure.schemas.DbFailureSchemaV1 +import net.corda.core.contracts.CommandAndState import net.corda.core.contracts.CommandData import net.corda.core.contracts.Contract import net.corda.core.contracts.LinearState +import net.corda.core.contracts.OwnableState import net.corda.core.contracts.UniqueIdentifier import net.corda.core.identity.AbstractParty -import net.corda.core.identity.Party import net.corda.core.schemas.MappedSchema import net.corda.core.schemas.PersistentState import net.corda.core.schemas.QueryableState @@ -21,23 +22,31 @@ class DbFailureContract : Contract { class TestState( override val linearId: UniqueIdentifier, - val particpant: Party, + override val participants: List, val randomValue: String?, - val errorTarget: Int = 0 - ) : LinearState, QueryableState { + val errorTarget: Int = 0, + override val owner: AbstractParty + ) : LinearState, QueryableState, OwnableState { - override val participants: List = listOf(particpant) override fun supportedSchemas(): Iterable = listOf(DbFailureSchemaV1) override fun generateMappedObject(schema: MappedSchema): PersistentState { return if (schema is DbFailureSchemaV1){ - DbFailureSchemaV1.PersistentTestState( particpant.name.toString(), randomValue, errorTarget, linearId.id) + DbFailureSchemaV1.PersistentTestState( participants.toString(), randomValue, errorTarget, linearId.id) } else { throw IllegalArgumentException("Unsupported schema $schema") } } + + override fun withNewOwner(newOwner: AbstractParty): CommandAndState { + return CommandAndState(Commands.Send(), TestState(this.linearId, this.participants.plus(newOwner).toSet().toList(), this.randomValue, this.errorTarget, newOwner)) + } + + fun withNewOwnerAndErrorTarget(newOwner: AbstractParty, errorTarget: Int): CommandAndState { + return CommandAndState(Commands.Send(), TestState(this.linearId, this.participants.plus(newOwner).toSet().toList(), this.randomValue, errorTarget, newOwner)) + } } override fun verify(tx: LedgerTransaction) { @@ -46,5 +55,6 @@ class DbFailureContract : Contract { interface Commands : CommandData{ class Create: Commands + class Send : Commands } } \ No newline at end of file diff --git a/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/CreateStateFlow.kt b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/CreateStateFlow.kt index 98dc5e201d..af1d9a20bd 100644 --- a/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/CreateStateFlow.kt +++ b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/CreateStateFlow.kt @@ -20,14 +20,16 @@ object CreateStateFlow { // 1000s control exception handlling in the service/vault listener enum class ErrorTarget(val targetNumber: Int) { NoError(0), - ServiceSqlSyntaxError(1), - ServiceNullConstraintViolation(2), - ServiceValidUpdate(3), - ServiceReadState(4), - ServiceCheckForState(5), - ServiceThrowInvalidParameter(6), - ServiceThrowMotherOfAllExceptions(7), - ServiceThrowUnrecoverableError(8), + ServiceSqlSyntaxError(10000), + ServiceNullConstraintViolation(20000), + ServiceValidUpdate(30000), + ServiceReadState(40000), + ServiceCheckForState(50000), + ServiceThrowInvalidParameter(60000), + ServiceThrowMotherOfAllExceptions(70000), + ServiceThrowUnrecoverableError(80000), + ServiceSqlSyntaxErrorOnConsumed(90000), + ServiceConstraintViolationException(1000000), TxInvalidState(10), FlowSwallowErrors(100), ServiceSwallowErrors(1000) @@ -40,7 +42,7 @@ object CreateStateFlow { private val targetMap = ErrorTarget.values().associateBy(ErrorTarget::targetNumber) fun getServiceTarget(target: Int?): ErrorTarget { - return target?.let { targetMap.getValue(it % 10) } ?: CreateStateFlow.ErrorTarget.NoError + return target?.let { targetMap.getValue(((it/10000) % 1000)*10000) } ?: CreateStateFlow.ErrorTarget.NoError } fun getServiceExceptionHandlingTarget(target: Int?): ErrorTarget { @@ -69,10 +71,11 @@ object CreateStateFlow { val txTarget = getTxTarget(errorTarget) logger.info("Test flow: The tx error target is $txTarget") val state = DbFailureContract.TestState( - UniqueIdentifier(), - ourIdentity, - if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else randomValue, - errorTarget) + UniqueIdentifier(), + listOf(ourIdentity), + if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else randomValue, + errorTarget, ourIdentity + ) val txCommand = Command(DbFailureContract.Commands.Create(), ourIdentity.owningKey) logger.info("Test flow: tx builder") diff --git a/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/DbListenerService.kt b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/DbListenerService.kt index af14b50307..d28f9f9bd1 100644 --- a/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/DbListenerService.kt +++ b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/DbListenerService.kt @@ -4,33 +4,145 @@ import com.r3.dbfailure.contracts.DbFailureContract import net.corda.core.contracts.ContractState import net.corda.core.flows.FlowLogic import net.corda.core.flows.StartableByRPC +import net.corda.core.identity.Party import net.corda.core.node.AppServiceHub import net.corda.core.node.services.CordaService import net.corda.core.node.services.Vault import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.contextLogger +import org.hibernate.exception.ConstraintViolationException +import rx.Subscriber +import rx.observers.SafeSubscriber +import rx.observers.Subscribers +import java.lang.IllegalStateException import java.security.InvalidParameterException +import java.sql.SQLException @CordaService class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() { companion object { val log = contextLogger() + var onError: ((Throwable) -> Unit)? = null // make the service throw an unrecoverable error (should be executed in an outOfProcess node so that it wont halt testing jvm) var throwUnrecoverableError = false + + var safeSubscription = true + var withCustomSafeSubscriber = false + + var onNextVisited: (Party) -> Unit = {} + var onErrorVisited: ((Party) -> Unit)? = null } init { val onNext: (Vault.Update) -> Unit = - { (_, produced) -> - produced.forEach { - val contractState = it.state.data as? DbFailureContract.TestState - @Suppress("TooGenericExceptionCaught") // this is fully intentional here, to allow twiddling with exceptions - try { + { (consumed, produced) -> + + onNextVisited(services.myInfo.legalIdentities.first()) + + produced.forEach { + val contractState = it.state.data as? DbFailureContract.TestState + @Suppress("TooGenericExceptionCaught") // this is fully intentional here, to allow twiddling with exceptions + try { + when (CreateStateFlow.getServiceTarget(contractState?.errorTarget)) { + CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError -> { + log.info("Fail with syntax error on raw statement") + val session = services.jdbcSession() + val statement = session.createStatement() + statement.execute( + "UPDATE FAIL_TEST_STATES \n" + + "BLAAA RANDOM_VALUE = NULL\n" + + "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" + ) + log.info("SQL result: ${statement.resultSet}") + } + CreateStateFlow.ErrorTarget.ServiceNullConstraintViolation -> { + log.info("Fail with null constraint violation on raw statement") + val session = services.jdbcSession() + val statement = session.createStatement() + statement.execute( + "UPDATE FAIL_TEST_STATES \n" + + "SET RANDOM_VALUE = NULL\n" + + "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" + ) + log.info("SQL result: ${statement.resultSet}") + } + CreateStateFlow.ErrorTarget.ServiceValidUpdate -> { + log.info("Update current statement") + val session = services.jdbcSession() + val statement = session.createStatement() + statement.execute( + "UPDATE FAIL_TEST_STATES \n" + + "SET RANDOM_VALUE = '${contractState!!.randomValue} Updated by service'\n" + + "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" + ) + log.info("SQL result: ${statement.resultSet}") + } + CreateStateFlow.ErrorTarget.ServiceReadState -> { + log.info("Read current state from db") + val session = services.jdbcSession() + val statement = session.createStatement() + statement.execute( + "SELECT * FROM FAIL_TEST_STATES \n" + + "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" + ) + log.info("SQL result: ${statement.resultSet}") + } + CreateStateFlow.ErrorTarget.ServiceCheckForState -> { + log.info("Check for currently written state in the db") + val session = services.jdbcSession() + val statement = session.createStatement() + val rs = statement.executeQuery( + "SELECT COUNT(*) FROM FAIL_TEST_STATES \n" + + "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" + ) + val numOfRows = if (rs.next()) rs.getInt("COUNT(*)") else 0 + log.info( + "Found a state with tx:ind ${it.ref.txhash}:${it.ref.index} in " + + "TEST_FAIL_STATES: ${if (numOfRows > 0) "Yes" else "No"}" + ) + } + CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter -> { + log.info("Throw InvalidParameterException") + throw InvalidParameterException("Toys out of pram") + } + CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions -> { + log.info("Throw Exception") + throw Exception("Mother of all exceptions") + } + CreateStateFlow.ErrorTarget.ServiceThrowUnrecoverableError -> { + // this bit of code should only work in a OutOfProcess node, + // otherwise it will kill the testing jvm (including the testing thread) + if (throwUnrecoverableError) { + log.info("Throw Unrecoverable error") + throw OutOfMemoryError("Unrecoverable error") + } + } + CreateStateFlow.ErrorTarget.ServiceConstraintViolationException -> { + log.info("Throw ConstraintViolationException") + throw ConstraintViolationException("Dummy Hibernate Exception ", SQLException(), " Will cause flow retry!") + } + else -> { + // do nothing, everything else must be handled elsewhere + } + } + } catch (t: Throwable) { + if (CreateStateFlow.getServiceExceptionHandlingTarget(contractState?.errorTarget) + == CreateStateFlow.ErrorTarget.ServiceSwallowErrors + ) { + log.warn("Service not letting errors escape", t) + } else { + throw t + } + } + } + consumed.forEach { + val contractState = it.state.data as? DbFailureContract.TestState + log.info("Test Service: Got state ${if (contractState == null) "null" else " test state with error target ${contractState.errorTarget}"}") when (CreateStateFlow.getServiceTarget(contractState?.errorTarget)) { - CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError -> { + CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed -> { log.info("Fail with syntax error on raw statement") val session = services.jdbcSession() val statement = session.createStatement() @@ -41,85 +153,33 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() { ) log.info("SQL result: ${statement.resultSet}") } - CreateStateFlow.ErrorTarget.ServiceNullConstraintViolation -> { - log.info("Fail with null constraint violation on raw statement") - val session = services.jdbcSession() - val statement = session.createStatement() - statement.execute( - "UPDATE FAIL_TEST_STATES \n" + - "SET RANDOM_VALUE = NULL\n" + - "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" - ) - log.info("SQL result: ${statement.resultSet}") - } - CreateStateFlow.ErrorTarget.ServiceValidUpdate -> { - log.info("Update current statement") - val session = services.jdbcSession() - val statement = session.createStatement() - statement.execute( - "UPDATE FAIL_TEST_STATES \n" + - "SET RANDOM_VALUE = '${contractState!!.randomValue} Updated by service'\n" + - "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" - ) - log.info("SQL result: ${statement.resultSet}") - } - CreateStateFlow.ErrorTarget.ServiceReadState -> { - log.info("Read current state from db") - val session = services.jdbcSession() - val statement = session.createStatement() - statement.execute( - "SELECT * FROM FAIL_TEST_STATES \n" + - "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" - ) - log.info("SQL result: ${statement.resultSet}") - } - CreateStateFlow.ErrorTarget.ServiceCheckForState -> { - log.info("Check for currently written state in the db") - val session = services.jdbcSession() - val statement = session.createStatement() - val rs = statement.executeQuery( - "SELECT COUNT(*) FROM FAIL_TEST_STATES \n" + - "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" - ) - val numOfRows = if (rs.next()) rs.getInt("COUNT(*)") else 0 - log.info("Found a state with tx:ind ${it.ref.txhash}:${it.ref.index} in " + - "TEST_FAIL_STATES: ${if (numOfRows > 0) "Yes" else "No"}") - } - CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter -> { - log.info("Throw InvalidParameterException") - throw InvalidParameterException("Toys out of pram") - } - CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions -> { - log.info("Throw Exception") - throw Exception("Mother of all exceptions") - } - CreateStateFlow.ErrorTarget.ServiceThrowUnrecoverableError -> { - // this bit of code should only work in a OutOfProcess node, - // otherwise it will kill the testing jvm (including the testing thread) - if (throwUnrecoverableError) { - log.info("Throw Unrecoverable error") - throw OutOfMemoryError("Unrecoverable error") - } - } else -> { // do nothing, everything else must be handled elsewhere } } - } catch (t: Throwable) { - if (CreateStateFlow.getServiceExceptionHandlingTarget(contractState?.errorTarget) - == CreateStateFlow.ErrorTarget.ServiceSwallowErrors) { - log.warn("Service not letting errors escape", t) - } else { - throw t - } } } - } if (onError != null) { - services.vaultService.rawUpdates.subscribe(onNext, onError) // onError is defined + val onErrorWrapper: ((Throwable) -> Unit)? = { + onErrorVisited?.let { + it(services.myInfo.legalIdentities.first()) + } + onError!!(it) + } + services.vaultService.rawUpdates.subscribe(onNext, onErrorWrapper) // onError is defined + } else if (onErrorVisited != null) { + throw IllegalStateException("A DbListenerService.onError needs to be defined!") } else { - services.vaultService.rawUpdates.subscribe(onNext) + if (safeSubscription) { + if (withCustomSafeSubscriber) { + services.vaultService.rawUpdates.subscribe(CustomSafeSubscriber(Subscribers.create(onNext))) + } else { + services.vaultService.rawUpdates.subscribe(onNext) + } + } else { + services.vaultService.rawUpdates.unsafeSubscribe(Subscribers.create(onNext)) + } } } @@ -130,4 +190,6 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() { throwUnrecoverableError = true } } + + class CustomSafeSubscriber(actual: Subscriber): SafeSubscriber(actual) } \ No newline at end of file diff --git a/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/SendStateFlow.kt b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/SendStateFlow.kt new file mode 100644 index 0000000000..f1b02a9729 --- /dev/null +++ b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/SendStateFlow.kt @@ -0,0 +1,88 @@ +package com.r3.dbfailure.workflows + +import co.paralleluniverse.fibers.Suspendable +import com.r3.dbfailure.contracts.DbFailureContract +import net.corda.core.contracts.UniqueIdentifier +import net.corda.core.flows.FinalityFlow +import net.corda.core.flows.FlowException +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.ReceiveFinalityFlow +import net.corda.core.flows.StartableByRPC +import net.corda.core.identity.Party +import net.corda.core.node.services.Vault +import net.corda.core.node.services.vault.QueryCriteria +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.unwrap + +object SendStateFlow { + + /** + * Creates a [DbFailureContract.TestState], signs it, collects a signature from a separate node and then calls [FinalityFlow] flow. + * Can throw in various stages + */ + @StartableByRPC + @InitiatingFlow + class PassErroneousOwnableState(private val stateId: UniqueIdentifier, private val errorTarget: Int, private val counterParty: Party) : + FlowLogic() { + + @Suspendable + override fun call() { + logger.info("Test flow: starting") + val notary = serviceHub.networkMapCache.notaryIdentities[0] + logger.info("Test flow: create counterparty session") + val recipientSession = initiateFlow(counterParty) + + val queryCriteria = QueryCriteria.LinearStateQueryCriteria(linearId = listOf(stateId), status = Vault.StateStatus.UNCONSUMED) + val inputState = serviceHub.vaultService.queryBy(DbFailureContract.TestState::class.java, queryCriteria).states.singleOrNull() + ?: throw FlowException("Failed to find single state for linear id $stateId") + + logger.info("Test flow: tx builder") + val commandAndState = inputState.state.data.withNewOwnerAndErrorTarget(counterParty, errorTarget) + val txBuilder = TransactionBuilder(notary) + .addInputState(inputState) + .addOutputState(commandAndState.ownableState) + .addCommand(commandAndState.command, listOf(ourIdentity.owningKey, counterParty.owningKey)) + + + logger.info("Test flow: verify") + txBuilder.verify(serviceHub) + + val signedTx = serviceHub.signInitialTransaction(txBuilder) + + logger.info("Test flow: send for counterparty signing") + recipientSession.send(signedTx) + logger.info("Test flow: Waiting to receive counter signed transaction") + val counterSignedTx = recipientSession.receive().unwrap { it } + logger.info("Test flow: Received counter sigend transaction, invoking finality") + subFlow(FinalityFlow(counterSignedTx, recipientSession)) + + logger.info("Test flow: Finishing") + } + } + + @InitiatedBy(PassErroneousOwnableState::class) + class PassErroneousOwnableStateReceiver(private val otherSide: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + logger.info("Test flow counterparty: starting") + val signedTx = otherSide.receive().unwrap { it } + logger.info("Test flow counterparty: received TX, signing") + val counterSignedTx = serviceHub.addSignature(signedTx) + logger.info("Test flow counterparty: calling hookBeforeCounterPartyAnswers") + logger.info("Test flow counterparty: Answer with countersigned transaction") + otherSide.send(counterSignedTx) + logger.info("Test flow counterparty: calling hookAfterCounterPartyAnswers") + // Not ideal that we have to do this check, but we must as FinalityFlow does not send locally + if (!serviceHub.myInfo.isLegalIdentity(otherSide.counterparty)) { + logger.info("Test flow counterparty: Waiting for finality") + subFlow(ReceiveFinalityFlow(otherSide)) + } + logger.info("Test flow counterparty: Finishing") + } + } + +} \ No newline at end of file diff --git a/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/transactionfailure/workflows/ErrorHandling.kt b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/transactionfailure/workflows/ErrorHandling.kt index edabddc0df..7a3ea10d82 100644 --- a/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/transactionfailure/workflows/ErrorHandling.kt +++ b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/transactionfailure/workflows/ErrorHandling.kt @@ -30,9 +30,10 @@ object ErrorHandling { val txTarget = CreateStateFlow.getTxTarget(errorTarget) val state = DbFailureContract.TestState( UniqueIdentifier(), - ourIdentity, + listOf(ourIdentity), if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else "valid hibernate value", - errorTarget) + errorTarget, + ourIdentity) val txCommand = Command(DbFailureContract.Commands.Create(), ourIdentity.owningKey) val txBuilder = TransactionBuilder(notary).addOutputState(state).addCommand(txCommand) val signedTx = serviceHub.signInitialTransaction(txBuilder) @@ -50,5 +51,4 @@ object ErrorHandling { hookAfterSecondCheckpoint.invoke() // should be never executed } } - } \ No newline at end of file