From 97ec3a18b1f7add016e4fe2a37fd43ad0e76915c Mon Sep 17 00:00:00 2001 From: Kyriakos Tharrouniatis Date: Thu, 13 Feb 2020 13:54:40 +0000 Subject: [PATCH] Make rawUpdates Rx.Observers not unsubscribe when accessed from CordaServices - Do not allow rawUpdates subscribing from flows --- .../corda/core/utilities/FlowSafeSubject.kt | 13 + .../vault/VaultObserverExceptionTest.kt | 430 +++++++++++++++++- .../node/services/vault/NodeVaultService.kt | 20 +- .../dbfailure/contracts/DbFailureContract.kt | 22 +- .../r3/dbfailure/workflows/CreateStateFlow.kt | 10 +- .../dbfailure/workflows/DbListenerService.kt | 193 ++++---- .../r3/dbfailure/workflows/SendStateFlow.kt | 101 ++++ .../workflows/ErrorHandling.kt | 16 +- 8 files changed, 689 insertions(+), 116 deletions(-) create mode 100644 testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/SendStateFlow.kt diff --git a/core/src/main/kotlin/net/corda/core/utilities/FlowSafeSubject.kt b/core/src/main/kotlin/net/corda/core/utilities/FlowSafeSubject.kt index 4d38fbd3a2..900dcbdcf6 100644 --- a/core/src/main/kotlin/net/corda/core/utilities/FlowSafeSubject.kt +++ b/core/src/main/kotlin/net/corda/core/utilities/FlowSafeSubject.kt @@ -26,6 +26,19 @@ class FlowSafeSubject(private val actual: Subject) : Observer by } }) { + override fun hasObservers(): Boolean { + return actual.hasObservers() + } +} + +/** + * The [PreventSubscriptionsSubject] is used to prevent any subscriptions to a [Subject]. + */ +class PreventSubscriptionsSubject(private val actual: Subject, errorAction: () -> Unit) : Observer by actual, + Subject(OnSubscribe { _ -> + errorAction() + }) { + override fun hasObservers(): Boolean { return actual.hasObservers() } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt index fb96c3b80b..93bb394416 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt @@ -1,34 +1,47 @@ package net.corda.node.services.vault import co.paralleluniverse.strands.concurrent.Semaphore +import com.r3.dbfailure.contracts.DbFailureContract import com.r3.dbfailure.workflows.CreateStateFlow -import com.r3.dbfailure.workflows.CreateStateFlow.Initiator import com.r3.dbfailure.workflows.CreateStateFlow.errorTargetsToNum import com.r3.dbfailure.workflows.DbListenerService import com.r3.dbfailure.workflows.DbListenerService.MakeServiceThrowErrorFlow +import com.r3.dbfailure.workflows.SendStateFlow import com.r3.transactionfailure.workflows.ErrorHandling import com.r3.transactionfailure.workflows.ErrorHandling.CheckpointAfterErrorFlow +import net.corda.core.CordaRuntimeException +import net.corda.core.contracts.StateAndRef +import net.corda.core.contracts.UniqueIdentifier +import net.corda.core.identity.Party import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.startFlow +import net.corda.core.node.services.Vault +import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.utilities.contextLogger import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds import net.corda.node.services.Permissions import net.corda.node.services.statemachine.StaffedFlowHospital import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.singleIdentity import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.NodeHandle import net.corda.testing.driver.OutOfProcess import net.corda.testing.driver.driver import net.corda.testing.node.User import net.corda.testing.node.internal.findCordapp +import org.assertj.core.api.Assertions import org.junit.After import org.junit.Assert import org.junit.Test import java.lang.IllegalStateException import java.sql.SQLException +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException import javax.persistence.PersistenceException +import kotlin.test.assertEquals import kotlin.test.assertFailsWith import kotlin.test.assertTrue @@ -50,6 +63,8 @@ class VaultObserverExceptionTest { StaffedFlowHospital.onFlowAdmitted.clear() DbListenerService.onError = null DbListenerService.safeSubscription = true + DbListenerService.onNextVisited = {} + DbListenerService.onErrorVisited = null } /** @@ -75,9 +90,9 @@ class VaultObserverExceptionTest { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() aliceNode.rpc.startFlow( - ::Initiator, - "Syntax Error in Custom SQL", - CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError) + CreateStateFlow::Initiator, + "Syntax Error in Custom SQL", + CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError) ).returnValue.then { testControlFuture.complete(false) } val foundExpectedException = testControlFuture.getOrThrow(30.seconds) @@ -109,7 +124,7 @@ class VaultObserverExceptionTest { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() aliceNode.rpc.startFlow( - ::Initiator, + CreateStateFlow::Initiator, "Syntax Error in Custom SQL", CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError) ).returnValue.then { testControlFuture.complete(false) } @@ -138,7 +153,7 @@ class VaultObserverExceptionTest { cordappsForAllNodes = testCordapps())) { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() - aliceNode.rpc.startFlow(::Initiator, "Exception", CreateStateFlow.errorTargetsToNum( + aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "Exception", CreateStateFlow.errorTargetsToNum( CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions, CreateStateFlow.ErrorTarget.FlowSwallowErrors)) waitUntilHospitalised.acquire() // wait here until flow gets hospitalised @@ -166,9 +181,9 @@ class VaultObserverExceptionTest { cordappsForAllNodes = testCordapps())) { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() - aliceNode.rpc.startFlow(::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum( - CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter, - CreateStateFlow.ErrorTarget.FlowSwallowErrors)) + aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum( + CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter, + CreateStateFlow.ErrorTarget.FlowSwallowErrors)) waitUntilHospitalised.acquire() // wait here until flow gets hospitalised } @@ -202,7 +217,7 @@ class VaultObserverExceptionTest { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() assertFailsWith("PersistenceException") { - aliceNode.rpc.startFlow(::Initiator, "EntityManager", errorTargetsToNum( + aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "EntityManager", errorTargetsToNum( CreateStateFlow.ErrorTarget.TxInvalidState)) .returnValue.getOrThrow(30.seconds) } @@ -235,7 +250,7 @@ class VaultObserverExceptionTest { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() val flowHandle = aliceNode.rpc.startFlow( - ::Initiator, "EntityManager", + CreateStateFlow::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum( CreateStateFlow.ErrorTarget.TxInvalidState, CreateStateFlow.ErrorTarget.FlowSwallowErrors)) @@ -263,7 +278,7 @@ class VaultObserverExceptionTest { cordappsForAllNodes = testCordapps())) { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() - val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum( + val flowHandle = aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum( CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError, CreateStateFlow.ErrorTarget.FlowSwallowErrors)) val flowResult = flowHandle.returnValue @@ -286,7 +301,7 @@ class VaultObserverExceptionTest { cordappsForAllNodes = testCordapps())) { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() - val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum( + val flowHandle = aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum( CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError, CreateStateFlow.ErrorTarget.ServiceSwallowErrors)) val flowResult = flowHandle.returnValue @@ -316,20 +331,20 @@ class VaultObserverExceptionTest { } driver(DriverParameters( - inMemoryDB = false, - startNodesInProcess = true, - isDebug = true, - cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), - findCordapp("com.r3.dbfailure.workflows"), - findCordapp("com.r3.transactionfailure.workflows"), - findCordapp("com.r3.dbfailure.schemas")))) { + inMemoryDB = false, + startNodesInProcess = true, + isDebug = true, + cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"), + findCordapp("com.r3.dbfailure.workflows"), + findCordapp("com.r3.transactionfailure.workflows"), + findCordapp("com.r3.dbfailure.schemas")))) { val aliceUser = User("user", "foo", setOf(Permissions.all())) val node = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() node.rpc.startFlow(::CheckpointAfterErrorFlow, CreateStateFlow.errorTargetsToNum( CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions, // throw not persistence exception CreateStateFlow.ErrorTarget.FlowSwallowErrors - ) + ) ) waitUntilHospitalised.acquire() @@ -364,9 +379,9 @@ class VaultObserverExceptionTest { cordappsForAllNodes = testCordapps())) { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow() - aliceNode.rpc.startFlow(::Initiator, "Exception", CreateStateFlow.errorTargetsToNum( - CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter, - CreateStateFlow.ErrorTarget.FlowSwallowErrors)) + aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "Exception", CreateStateFlow.errorTargetsToNum( + CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter, + CreateStateFlow.ErrorTarget.FlowSwallowErrors)) waitUntilHospitalised.acquire() // wait here until flow gets hospitalised } @@ -379,7 +394,7 @@ class VaultObserverExceptionTest { val aliceUser = User("user", "foo", setOf(Permissions.all())) val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser), startInSameProcess = false).getOrThrow() aliceNode.rpc.startFlow(::MakeServiceThrowErrorFlow).returnValue.getOrThrow() - aliceNode.rpc.startFlow(::Initiator, "UnrecoverableError", CreateStateFlow.errorTargetsToNum( + aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "UnrecoverableError", CreateStateFlow.errorTargetsToNum( CreateStateFlow.ErrorTarget.ServiceThrowUnrecoverableError)) val terminated = (aliceNode as OutOfProcess).process.waitFor(30, TimeUnit.SECONDS) @@ -398,4 +413,369 @@ class VaultObserverExceptionTest { } } + /** + * An error is thrown inside of the [VaultService.rawUpdates] observable while recording a transaction inside of the initiating node. + * + * This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction + * also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the + * counterparty node. + * + * More importantly, the observer listening to the [VaultService.rawUpdates] observable should not unsubscribe. + * + * Check onNext is visited the correct number of times. + * + * This test causes 2 failures inside of the observer to ensure that the observer is still subscribed. + */ + @Test + fun `Throw user error in VaultService rawUpdates during FinalityFlow blows up the flow but does not break the Observer - onNext check`() { + var observationCounter = 0 + StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter } + + val rawUpdatesCount = ConcurrentHashMap() + DbListenerService.onNextVisited = { party -> + if (rawUpdatesCount.putIfAbsent(party, 1) != null) { + rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 } + } + } + + val user = User("user", "foo", setOf(Permissions.all())) + driver(DriverParameters(startNodesInProcess = true, + cordappsForAllNodes = listOf( + findCordapp("com.r3.dbfailure.contracts"), + findCordapp("com.r3.dbfailure.workflows"), + findCordapp("com.r3.dbfailure.schemas") + ),inMemoryDB = false) + ) { + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() + val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first() + + val startErrorInObservableWhenConsumingState = { + + val stateId = aliceNode.rpc.startFlow( + CreateStateFlow::Initiator, + "AllGood", + errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed) + ).returnValue.getOrThrow(30.seconds) + + println("Created new state") + + val flowHandle = aliceNode.rpc.startFlow( + SendStateFlow::PassErroneousOwnableState, // throws at consumed state -> should end up in hospital -> flow should hang + stateId, + errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError), + bobNode.nodeInfo.legalIdentities.first() + ) + + Assertions.assertThatExceptionOfType(TimeoutException::class.java) + .isThrownBy { flowHandle.returnValue.getOrThrow(20.seconds) } + + stateId + } + + assertEquals(0, notary.getNotarisedTransactionIds().size) + + println("First set of flows") + val stateId = startErrorInObservableWhenConsumingState() + assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size) + assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size) + assertEquals(1, notary.getNotarisedTransactionIds().size) + assertEquals(1, observationCounter) + assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()]) + assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0)) + + println("Second set of flows") + val stateId2 = startErrorInObservableWhenConsumingState() + assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size) + assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size) + assertEquals(2, notary.getNotarisedTransactionIds().size) + assertEquals(2, observationCounter) + assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()]) + assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0)) + } + } + + /** + * An error is thrown inside of the [VaultService.rawUpdates] observable while recording a transaction inside of the initiating node. + * + * This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction + * also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the + * counterparty node. + * + * More importantly, the observer listening to the [VaultService.rawUpdates] observable should not unsubscribe. + * + * Check onNext and onError are visited the correct number of times. + * + * This test causes 2 failures inside of the observer to ensure that the observer is still subscribed. + */ + @Test + fun `Throw user error in VaultService rawUpdates during FinalityFlow blows up the flow but does not break the Observer - onNext and onError check`() { + var observationCounter = 0 + StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter } + + val rawUpdatesCount = ConcurrentHashMap() + DbListenerService.onNextVisited = { party -> + if (rawUpdatesCount.putIfAbsent(party, 1) != null) { + rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 } + } + } + + DbListenerService.onError = {/*just rethrow - we just want to check that onError gets visited by parties*/ throw it} + DbListenerService.onErrorVisited = { party -> + if (rawUpdatesCount.putIfAbsent(party, 1) != null) { + rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 } + } + } + + val user = User("user", "foo", setOf(Permissions.all())) + driver(DriverParameters(startNodesInProcess = true, + cordappsForAllNodes = listOf( + findCordapp("com.r3.dbfailure.contracts"), + findCordapp("com.r3.dbfailure.workflows"), + findCordapp("com.r3.dbfailure.schemas") + ), + inMemoryDB = false) + ) { + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() + val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first() + + val startErrorInObservableWhenConsumingState = { + + val stateId = aliceNode.rpc.startFlow( + CreateStateFlow::Initiator, + "AllGood", + // should be a hospital exception + errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed) + ).returnValue.getOrThrow(30.seconds) + + val flowHandle = aliceNode.rpc.startFlow( + SendStateFlow::PassErroneousOwnableState, + stateId, + errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError), + bobNode.nodeInfo.legalIdentities.first() + ) + + Assertions.assertThatExceptionOfType(TimeoutException::class.java) + .isThrownBy { flowHandle.returnValue.getOrThrow(20.seconds) } + + stateId + } + + assertEquals(0, notary.getNotarisedTransactionIds().size) + + val stateId = startErrorInObservableWhenConsumingState() + assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size) + assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size) + assertEquals(1, notary.getNotarisedTransactionIds().size) + assertEquals(1, observationCounter) + assertEquals(3, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()]) + assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0)) + + val stateId2 = startErrorInObservableWhenConsumingState() + assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size) + assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size) + assertEquals(2, notary.getNotarisedTransactionIds().size) + assertEquals(2, observationCounter) + assertEquals(6, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()]) + assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0)) + } + } + + /** + * An error is thrown inside of the [VaultService.rawUpdates] observable while recording a transaction inside of the counterparty node. + * + * This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. + * Observer events are recorded on both the initiating node and the counterparty node. + * + * More importantly, the observer listening to the [VaultService.rawUpdates] observable should not unsubscribe. + * + * This test causes 2 failures inside of the observer to ensure that the observer is still subscribed. + */ + @Test + fun `Throw user error in VaultService rawUpdates during counterparty FinalityFlow blows up the flow but does not break the Observer`() { + var observationCounter = 0 + StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter } + + val rawUpdatesCount = ConcurrentHashMap() + DbListenerService.onNextVisited = { party -> + if (rawUpdatesCount.putIfAbsent(party, 1) != null) { + rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 } + } + } + + val user = User("user", "foo", setOf(Permissions.all())) + driver(DriverParameters(startNodesInProcess = true, + cordappsForAllNodes = listOf( + findCordapp("com.r3.dbfailure.contracts"), + findCordapp("com.r3.dbfailure.workflows"), + findCordapp("com.r3.dbfailure.schemas") + ), + inMemoryDB = false) + ) { + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() + val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first() + + val startErrorInObservableWhenCreatingSecondState = { + + val stateId = aliceNode.rpc.startFlow( + CreateStateFlow::Initiator, + "AllGood", + errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError) + ).returnValue.getOrThrow(30.seconds) + + aliceNode.rpc.startFlow( + SendStateFlow::PassErroneousOwnableState, + stateId, + errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError), + bobNode.nodeInfo.legalIdentities.first() + ).returnValue.getOrThrow(20.seconds) + + stateId + } + + assertEquals(0, notary.getNotarisedTransactionIds().size) + + val stateId = startErrorInObservableWhenCreatingSecondState() + assertEquals(1, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size) + assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size) + assertEquals(1, notary.getNotarisedTransactionIds().size) + assertEquals(1, observationCounter) + assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()]) + assertEquals(1, rawUpdatesCount[bobNode.nodeInfo.singleIdentity()]) + + val stateId2 = startErrorInObservableWhenCreatingSecondState() + assertEquals(1, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size) + assertEquals(2, aliceNode.getAllStates(Vault.StateStatus.CONSUMED).size) + assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size) + assertEquals(2, notary.getNotarisedTransactionIds().size) + assertEquals(2, observationCounter) + assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()]) + assertEquals(2, rawUpdatesCount[bobNode.nodeInfo.singleIdentity()]) + } + } + + /** + * An error is thrown inside of the [VaultService.updates] observable while recording a transaction inside of the initiating node. + * + * This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction + * also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the + * counterparty node. + * + * More importantly, the observer listening to the [VaultService.updates] observable should not unsubscribe. + * + * This test causes 2 failures inside of the [rx.Observer] to ensure that the Observer is still subscribed. + */ + @Test + fun `Throw user error in VaultService rawUpdates during FinalityFlow blows up the flow but does not break the Observer`() { + var observationCounter = 0 + StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter } + + val rawUpdatesCount = ConcurrentHashMap() + DbListenerService.onNextVisited = { party -> + if (rawUpdatesCount.putIfAbsent(party, 1) != null) { + rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 } + } + } + + val user = User("user", "foo", setOf(Permissions.all())) + driver(DriverParameters(startNodesInProcess = true, + cordappsForAllNodes = listOf( + findCordapp("com.r3.dbfailure.contracts"), + findCordapp("com.r3.dbfailure.workflows"), + findCordapp("com.r3.dbfailure.schemas") + ), + inMemoryDB = false) + ) { + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() + val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first() + + val startErrorInObservableWhenConsumingState = { + + val stateId = aliceNode.rpc.startFlow( + CreateStateFlow::Initiator, + "AllGood", + errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed) + ).returnValue.getOrThrow(30.seconds) + + val flowHandle = aliceNode.rpc.startFlow( + SendStateFlow::PassErroneousOwnableState, + stateId, + errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError), + bobNode.nodeInfo.legalIdentities.first() + ) + + Assertions.assertThatExceptionOfType(TimeoutException::class.java) + .isThrownBy { flowHandle.returnValue.getOrThrow(20.seconds) } + + stateId + } + + assertEquals(0, notary.getNotarisedTransactionIds().size) + + val stateId = startErrorInObservableWhenConsumingState() + assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size) + assertEquals(1, aliceNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size) + assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size) + assertEquals(1, notary.getNotarisedTransactionIds().size) + assertEquals(1, observationCounter) + assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()]) + assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0)) + + val stateId2 = startErrorInObservableWhenConsumingState() + assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size) + assertEquals(2, aliceNode.getAllStates(Vault.StateStatus.UNCONSUMED).size) + assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size) + assertEquals(2, notary.getNotarisedTransactionIds().size) + assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()]) + assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0)) + } + } + + @Test + fun `Subscribing to NodeVaultService rawUpdates from a flow is not allowed` () { + + val user = User("user", "foo", setOf(Permissions.all())) + driver(DriverParameters(startNodesInProcess = true, + cordappsForAllNodes = listOf( + findCordapp("com.r3.dbfailure.contracts"), + findCordapp("com.r3.dbfailure.workflows"), + findCordapp("com.r3.dbfailure.schemas") + ), + inMemoryDB = false) + ) { + val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + + val future = aliceNode.rpc.startFlow(ErrorHandling::SubscribingRawUpdatesFlow).returnValue + + assertFailsWith("Cannot subscribe to NodeVaultService.rawUpdates from a flow!") { + future.getOrThrow(30.seconds) + } + } + } + + //TODO add retry from checkpoint test + @Test + fun `Failing Observer wrapped with FlowSafeSubscriber will remain and re-called upon flow retry`() { + + } + + private fun NodeHandle.getNotarisedTransactionIds(): List { + return rpc.startFlowDynamic(SendStateFlow.NotarisedTxs::class.java).returnValue.getOrThrow() + } + + private fun NodeHandle.getStatesById(id: UniqueIdentifier?, status: Vault.StateStatus): List> { + return rpc.vaultQueryByCriteria( + QueryCriteria.LinearStateQueryCriteria( + linearId = if (id != null) listOf(id) else null, + status = status + ), DbFailureContract.TestState::class.java + ).states + } + + private fun NodeHandle.getAllStates(status: Vault.StateStatus): List> { + return getStatesById(null, status) + } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index d68d558cd1..fa4bc5106c 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -5,6 +5,7 @@ import co.paralleluniverse.strands.Strand import net.corda.core.contracts.* import net.corda.core.crypto.SecureHash import net.corda.core.crypto.containsAny +import net.corda.core.flows.FlowException import net.corda.core.flows.HospitalizeFlowException import net.corda.core.internal.* import net.corda.core.messaging.DataFeed @@ -209,7 +210,24 @@ class NodeVaultService( } override val rawUpdates: Observable> - get() = mutex.locked { _rawUpdatesPublisher } + get() = mutex.locked { + if (FlowStateMachineImpl.currentStateMachine() != null) { + // we are inside a flow! we cannot allow flows to subscribe observers, + // because if a flow adds a subscriber; if the observer under the subscriber holds references to + // flow's properties, essentially to fiber's properties then, since it does not unsubscribes on flow's/ fiber's completion, + // it could prevent the flow/ fiber swapped our of memory. + PreventSubscriptionsSubject(_rawUpdatesPublisher) { + log.error("Cannot subscribe to NodeVaultService.rawUpdates from a flow! " + + "- hospitalising the flow ") + + throw FlowException("Cannot subscribe to NodeVaultService.rawUpdates from a flow! ") + } + } else { + // we are not inside a flow; we are most likely inside a CordaService, + // we will wrap with 'safe subscriptions' here, namely add -not unsubscribing- subscribers. + FlowSafeSubject(_rawUpdatesPublisher) + } + } override val updates: Observable> get() = mutex.locked { _updatesInDbTx } diff --git a/testing/cordapps/dbfailure/dbfcontracts/src/main/kotlin/com/r3/dbfailure/contracts/DbFailureContract.kt b/testing/cordapps/dbfailure/dbfcontracts/src/main/kotlin/com/r3/dbfailure/contracts/DbFailureContract.kt index c344badebb..a9b3b45dce 100644 --- a/testing/cordapps/dbfailure/dbfcontracts/src/main/kotlin/com/r3/dbfailure/contracts/DbFailureContract.kt +++ b/testing/cordapps/dbfailure/dbfcontracts/src/main/kotlin/com/r3/dbfailure/contracts/DbFailureContract.kt @@ -1,12 +1,13 @@ package com.r3.dbfailure.contracts import com.r3.dbfailure.schemas.DbFailureSchemaV1 +import net.corda.core.contracts.CommandAndState import net.corda.core.contracts.CommandData import net.corda.core.contracts.Contract import net.corda.core.contracts.LinearState +import net.corda.core.contracts.OwnableState import net.corda.core.contracts.UniqueIdentifier import net.corda.core.identity.AbstractParty -import net.corda.core.identity.Party import net.corda.core.schemas.MappedSchema import net.corda.core.schemas.PersistentState import net.corda.core.schemas.QueryableState @@ -21,23 +22,31 @@ class DbFailureContract : Contract { class TestState( override val linearId: UniqueIdentifier, - val particpant: Party, + override val participants: List, val randomValue: String?, - val errorTarget: Int = 0 - ) : LinearState, QueryableState { + val errorTarget: Int = 0, + override val owner: AbstractParty + ) : LinearState, QueryableState, OwnableState { - override val participants: List = listOf(particpant) override fun supportedSchemas(): Iterable = listOf(DbFailureSchemaV1) override fun generateMappedObject(schema: MappedSchema): PersistentState { return if (schema is DbFailureSchemaV1){ - DbFailureSchemaV1.PersistentTestState( particpant.name.toString(), randomValue, errorTarget, linearId.id) + DbFailureSchemaV1.PersistentTestState( participants.toString(), randomValue, errorTarget, linearId.id) } else { throw IllegalArgumentException("Unsupported schema $schema") } } + + override fun withNewOwner(newOwner: AbstractParty): CommandAndState { + return CommandAndState(Commands.Send(), TestState(this.linearId, this.participants.plus(newOwner).toSet().toList(), this.randomValue, this.errorTarget, newOwner)) + } + + fun withNewOwnerAndErrorTarget(newOwner: AbstractParty, errorTarget: Int): CommandAndState { + return CommandAndState(Commands.Send(), TestState(this.linearId, this.participants.plus(newOwner).toSet().toList(), this.randomValue, errorTarget, newOwner)) + } } override fun verify(tx: LedgerTransaction) { @@ -46,5 +55,6 @@ class DbFailureContract : Contract { interface Commands : CommandData{ class Create: Commands + class Send : Commands } } \ No newline at end of file diff --git a/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/CreateStateFlow.kt b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/CreateStateFlow.kt index 98dc5e201d..c24bd5a07e 100644 --- a/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/CreateStateFlow.kt +++ b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/CreateStateFlow.kt @@ -28,6 +28,7 @@ object CreateStateFlow { ServiceThrowInvalidParameter(6), ServiceThrowMotherOfAllExceptions(7), ServiceThrowUnrecoverableError(8), + ServiceSqlSyntaxErrorOnConsumed(9), TxInvalidState(10), FlowSwallowErrors(100), ServiceSwallowErrors(1000) @@ -69,10 +70,11 @@ object CreateStateFlow { val txTarget = getTxTarget(errorTarget) logger.info("Test flow: The tx error target is $txTarget") val state = DbFailureContract.TestState( - UniqueIdentifier(), - ourIdentity, - if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else randomValue, - errorTarget) + UniqueIdentifier(), + listOf(ourIdentity), + if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else randomValue, + errorTarget, ourIdentity + ) val txCommand = Command(DbFailureContract.Commands.Create(), ourIdentity.owningKey) logger.info("Test flow: tx builder") diff --git a/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/DbListenerService.kt b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/DbListenerService.kt index df8445c8e0..128b003d71 100644 --- a/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/DbListenerService.kt +++ b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/DbListenerService.kt @@ -4,12 +4,14 @@ import com.r3.dbfailure.contracts.DbFailureContract import net.corda.core.contracts.ContractState import net.corda.core.flows.FlowLogic import net.corda.core.flows.StartableByRPC +import net.corda.core.identity.Party import net.corda.core.node.AppServiceHub import net.corda.core.node.services.CordaService import net.corda.core.node.services.Vault import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.contextLogger import rx.observers.Subscribers +import java.lang.IllegalStateException import java.security.InvalidParameterException @CordaService @@ -17,22 +19,120 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() { companion object { val log = contextLogger() + var onError: ((Throwable) -> Unit)? = null // make the service throw an unrecoverable error (should be executed in an outOfProcess node so that it wont halt testing jvm) var throwUnrecoverableError = false var safeSubscription = true + + var onNextVisited: (Party) -> Unit = {} + var onErrorVisited: ((Party) -> Unit)? = null } init { val onNext: (Vault.Update) -> Unit = - { (_, produced) -> - produced.forEach { - val contractState = it.state.data as? DbFailureContract.TestState - @Suppress("TooGenericExceptionCaught") // this is fully intentional here, to allow twiddling with exceptions - try { + { (consumed, produced) -> + + onNextVisited(services.myInfo.legalIdentities.first()) + + produced.forEach { + val contractState = it.state.data as? DbFailureContract.TestState + @Suppress("TooGenericExceptionCaught") // this is fully intentional here, to allow twiddling with exceptions + try { + when (CreateStateFlow.getServiceTarget(contractState?.errorTarget)) { + CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError -> { + log.info("Fail with syntax error on raw statement") + val session = services.jdbcSession() + val statement = session.createStatement() + statement.execute( + "UPDATE FAIL_TEST_STATES \n" + + "BLAAA RANDOM_VALUE = NULL\n" + + "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" + ) + log.info("SQL result: ${statement.resultSet}") + } + CreateStateFlow.ErrorTarget.ServiceNullConstraintViolation -> { + log.info("Fail with null constraint violation on raw statement") + val session = services.jdbcSession() + val statement = session.createStatement() + statement.execute( + "UPDATE FAIL_TEST_STATES \n" + + "SET RANDOM_VALUE = NULL\n" + + "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" + ) + log.info("SQL result: ${statement.resultSet}") + } + CreateStateFlow.ErrorTarget.ServiceValidUpdate -> { + log.info("Update current statement") + val session = services.jdbcSession() + val statement = session.createStatement() + statement.execute( + "UPDATE FAIL_TEST_STATES \n" + + "SET RANDOM_VALUE = '${contractState!!.randomValue} Updated by service'\n" + + "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" + ) + log.info("SQL result: ${statement.resultSet}") + } + CreateStateFlow.ErrorTarget.ServiceReadState -> { + log.info("Read current state from db") + val session = services.jdbcSession() + val statement = session.createStatement() + statement.execute( + "SELECT * FROM FAIL_TEST_STATES \n" + + "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" + ) + log.info("SQL result: ${statement.resultSet}") + } + CreateStateFlow.ErrorTarget.ServiceCheckForState -> { + log.info("Check for currently written state in the db") + val session = services.jdbcSession() + val statement = session.createStatement() + val rs = statement.executeQuery( + "SELECT COUNT(*) FROM FAIL_TEST_STATES \n" + + "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" + ) + val numOfRows = if (rs.next()) rs.getInt("COUNT(*)") else 0 + log.info( + "Found a state with tx:ind ${it.ref.txhash}:${it.ref.index} in " + + "TEST_FAIL_STATES: ${if (numOfRows > 0) "Yes" else "No"}" + ) + } + CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter -> { + log.info("Throw InvalidParameterException") + throw InvalidParameterException("Toys out of pram") + } + CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions -> { + log.info("Throw Exception") + throw Exception("Mother of all exceptions") + } + CreateStateFlow.ErrorTarget.ServiceThrowUnrecoverableError -> { + // this bit of code should only work in a OutOfProcess node, + // otherwise it will kill the testing jvm (including the testing thread) + if (throwUnrecoverableError) { + log.info("Throw Unrecoverable error") + throw OutOfMemoryError("Unrecoverable error") + } + } + else -> { + // do nothing, everything else must be handled elsewhere + } + } + } catch (t: Throwable) { + if (CreateStateFlow.getServiceExceptionHandlingTarget(contractState?.errorTarget) + == CreateStateFlow.ErrorTarget.ServiceSwallowErrors + ) { + log.warn("Service not letting errors escape", t) + } else { + throw t + } + } + } + consumed.forEach { + val contractState = it.state.data as? DbFailureContract.TestState + log.info("Test Service: Got state ${if (contractState == null) "null" else " test state with error target ${contractState.errorTarget}"}") when (CreateStateFlow.getServiceTarget(contractState?.errorTarget)) { - CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError -> { + CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed -> { log.info("Fail with syntax error on raw statement") val session = services.jdbcSession() val statement = session.createStatement() @@ -43,83 +143,20 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() { ) log.info("SQL result: ${statement.resultSet}") } - CreateStateFlow.ErrorTarget.ServiceNullConstraintViolation -> { - log.info("Fail with null constraint violation on raw statement") - val session = services.jdbcSession() - val statement = session.createStatement() - statement.execute( - "UPDATE FAIL_TEST_STATES \n" + - "SET RANDOM_VALUE = NULL\n" + - "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" - ) - log.info("SQL result: ${statement.resultSet}") - } - CreateStateFlow.ErrorTarget.ServiceValidUpdate -> { - log.info("Update current statement") - val session = services.jdbcSession() - val statement = session.createStatement() - statement.execute( - "UPDATE FAIL_TEST_STATES \n" + - "SET RANDOM_VALUE = '${contractState!!.randomValue} Updated by service'\n" + - "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" - ) - log.info("SQL result: ${statement.resultSet}") - } - CreateStateFlow.ErrorTarget.ServiceReadState -> { - log.info("Read current state from db") - val session = services.jdbcSession() - val statement = session.createStatement() - statement.execute( - "SELECT * FROM FAIL_TEST_STATES \n" + - "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" - ) - log.info("SQL result: ${statement.resultSet}") - } - CreateStateFlow.ErrorTarget.ServiceCheckForState -> { - log.info("Check for currently written state in the db") - val session = services.jdbcSession() - val statement = session.createStatement() - val rs = statement.executeQuery( - "SELECT COUNT(*) FROM FAIL_TEST_STATES \n" + - "WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};" - ) - val numOfRows = if (rs.next()) rs.getInt("COUNT(*)") else 0 - log.info("Found a state with tx:ind ${it.ref.txhash}:${it.ref.index} in " + - "TEST_FAIL_STATES: ${if (numOfRows > 0) "Yes" else "No"}") - } - CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter -> { - log.info("Throw InvalidParameterException") - throw InvalidParameterException("Toys out of pram") - } - CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions -> { - log.info("Throw Exception") - throw Exception("Mother of all exceptions") - } - CreateStateFlow.ErrorTarget.ServiceThrowUnrecoverableError -> { - // this bit of code should only work in a OutOfProcess node, - // otherwise it will kill the testing jvm (including the testing thread) - if (throwUnrecoverableError) { - log.info("Throw Unrecoverable error") - throw OutOfMemoryError("Unrecoverable error") - } - } - else -> { - // do nothing, everything else must be handled elsewhere - } - } - } catch (t: Throwable) { - if (CreateStateFlow.getServiceExceptionHandlingTarget(contractState?.errorTarget) - == CreateStateFlow.ErrorTarget.ServiceSwallowErrors) { - log.warn("Service not letting errors escape", t) - } else { - throw t } } } - } if (onError != null) { - services.vaultService.rawUpdates.subscribe(onNext, onError) // onError is defined + val onErrorWrapper: ((Throwable) -> Unit)? = { + onErrorVisited?.let { + it(services.myInfo.legalIdentities.first()) + } + onError!!(it) + } + services.vaultService.rawUpdates.subscribe(onNext, onErrorWrapper) // onError is defined + } else if (onErrorVisited != null) { + throw IllegalStateException("A DbListenerService.onError needs to be defined!") } else { if (safeSubscription) { services.vaultService.rawUpdates.subscribe(onNext) diff --git a/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/SendStateFlow.kt b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/SendStateFlow.kt new file mode 100644 index 0000000000..9f7bdd0941 --- /dev/null +++ b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/dbfailure/workflows/SendStateFlow.kt @@ -0,0 +1,101 @@ +package com.r3.dbfailure.workflows + +import co.paralleluniverse.fibers.Suspendable +import com.r3.dbfailure.contracts.DbFailureContract +import net.corda.core.contracts.UniqueIdentifier +import net.corda.core.flows.FinalityFlow +import net.corda.core.flows.FlowException +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.ReceiveFinalityFlow +import net.corda.core.flows.StartableByRPC +import net.corda.core.identity.Party +import net.corda.core.node.services.Vault +import net.corda.core.node.services.vault.QueryCriteria +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.unwrap + +object SendStateFlow { + + /** + * Creates a [DbFailureContract.TestState], signs it, collects a signature from a separate node and then calls [FinalityFlow] flow. + * Can throw in various stages + */ + @StartableByRPC + @InitiatingFlow + class PassErroneousOwnableState(private val stateId: UniqueIdentifier, private val errorTarget: Int, private val counterParty: Party) : + FlowLogic() { + + @Suspendable + override fun call() { + logger.info("Test flow: starting") + val notary = serviceHub.networkMapCache.notaryIdentities[0] + logger.info("Test flow: create counterparty session") + val recipientSession = initiateFlow(counterParty) + + val queryCriteria = QueryCriteria.LinearStateQueryCriteria(linearId = listOf(stateId), status = Vault.StateStatus.UNCONSUMED) + val inputState = serviceHub.vaultService.queryBy(DbFailureContract.TestState::class.java, queryCriteria).states.singleOrNull() + ?: throw FlowException("Failed to find single state for linear id $stateId") + + logger.info("Test flow: tx builder") + val commandAndState = inputState.state.data.withNewOwnerAndErrorTarget(counterParty, errorTarget) + val txBuilder = TransactionBuilder(notary) + .addInputState(inputState) + .addOutputState(commandAndState.ownableState) + .addCommand(commandAndState.command, listOf(ourIdentity.owningKey, counterParty.owningKey)) + + + logger.info("Test flow: verify") + txBuilder.verify(serviceHub) + + val signedTx = serviceHub.signInitialTransaction(txBuilder) + + logger.info("Test flow: send for counterparty signing") + recipientSession.send(signedTx) + logger.info("Test flow: Waiting to receive counter signed transaction") + val counterSignedTx = recipientSession.receive().unwrap { it } + logger.info("Test flow: Received counter sigend transaction, invoking finality") + subFlow(FinalityFlow(counterSignedTx, recipientSession)) + + logger.info("Test flow: Finishing") + } + } + + @InitiatedBy(PassErroneousOwnableState::class) + class PassErroneousOwnableStateReceiver(private val otherSide: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + logger.info("Test flow counterparty: starting") + val signedTx = otherSide.receive().unwrap { it } + logger.info("Test flow counterparty: received TX, signing") + val counterSignedTx = serviceHub.addSignature(signedTx) + logger.info("Test flow counterparty: calling hookBeforeCounterPartyAnswers") + logger.info("Test flow counterparty: Answer with countersigned transaction") + otherSide.send(counterSignedTx) + logger.info("Test flow counterparty: calling hookAfterCounterPartyAnswers") + // Not ideal that we have to do this check, but we must as FinalityFlow does not send locally + if (!serviceHub.myInfo.isLegalIdentity(otherSide.counterparty)) { + logger.info("Test flow counterparty: Waiting for finality") + subFlow(ReceiveFinalityFlow(otherSide)) + } + logger.info("Test flow counterparty: Finishing") + } + } + + @StartableByRPC + class NotarisedTxs : FlowLogic>() { + override fun call(): List { + val session = serviceHub.jdbcSession() + val statement = session.createStatement() + statement.execute("SELECT TRANSACTION_ID FROM NODE_NOTARY_COMMITTED_TXS;") + val result = mutableListOf() + while (statement.resultSet.next()) { + result.add(statement.resultSet.getString(1)) + } + return result + } + } +} \ No newline at end of file diff --git a/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/transactionfailure/workflows/ErrorHandling.kt b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/transactionfailure/workflows/ErrorHandling.kt index edabddc0df..b11fea0cff 100644 --- a/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/transactionfailure/workflows/ErrorHandling.kt +++ b/testing/cordapps/dbfailure/dbfworkflows/src/main/kotlin/com/r3/transactionfailure/workflows/ErrorHandling.kt @@ -30,9 +30,10 @@ object ErrorHandling { val txTarget = CreateStateFlow.getTxTarget(errorTarget) val state = DbFailureContract.TestState( UniqueIdentifier(), - ourIdentity, + listOf(ourIdentity), if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else "valid hibernate value", - errorTarget) + errorTarget, + ourIdentity) val txCommand = Command(DbFailureContract.Commands.Create(), ourIdentity.owningKey) val txBuilder = TransactionBuilder(notary).addOutputState(state).addCommand(txCommand) val signedTx = serviceHub.signInitialTransaction(txBuilder) @@ -51,4 +52,15 @@ object ErrorHandling { } } + @StartableByRPC + class SubscribingRawUpdatesFlow: FlowLogic() { + override fun call() { + val rawUpdates = serviceHub.vaultService.rawUpdates + logger.info("Accessing rawUpdates in a flow is fine! ") + rawUpdates.subscribe { + println("However, adding a subscription will make the flow fail!") + } + } + } + } \ No newline at end of file